In [None]:
#  Pip install
!pip install pandas
!pip install azure-storage-file-datalake

In [None]:
STORAGE_ACCOUNT                               = "ChangeMe"                    
TENANT_ID                                     = "ChangeMe"
SCOPE_NAME                                    = "ChangeMe"
AAD_APPID                                     = "ChangeMe"
WORKSPACE_ID                                  = "ChangeMe"
SERVICE_CREDENTIAL_KEY_NAME_STORAGE_KEY       = "ChangeMe"
SERVICE_CREDENTIAL_KEY_NAME_DATALAKE          = "ChangeMe"
SERVICE_CREDENTIAL_KEY_NAME_SERVICE_PRINCIPAL = "ChangeMe"

tables = ["ChangeMe","ChangeMe","ChangeMe"]
interval = 15
days_ago = 1

In [None]:
# Imports
import pandas as pd
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from datetime import datetime, timedelta
import requests
import hashlib
import hmac
import base64
import logging
import urllib3
import json


In [None]:
def initialize_storage_account(storage_account_name, storage_account_key):
    
    try:  
        global service_client

        service_client = DataLakeServiceClient(account_url=f"{'https'}://{storage_account_name}.dfs.core.windows.net",
                                               credential=storage_account_key)
    
    except Exception as e:
        print(e)

In [None]:
# Unmount storage account containers. 
for table_name in tables:
  unmount = "/mnt/{0}/{1}".format(STORAGE_ACCOUNT,"export-{}".format(table_name.lower()))
  
  try:
    dbutils.fs.unmount(unmount)
  except Exception as e:
        print(e)


In [None]:
service_credential_key_name = SERVICE_CREDENTIAL_KEY_NAME_DATALAKE

configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": AAD_APPID,
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope=SCOPE_NAME,key=service_credential_key_name),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/{}/oauth2/token".format(TENANT_ID)}

# Re-Mount storage account containers. 
for table_name in tables:
  storagecontainerNameExport  =  "export-{}".format(table_name.lower())
  mount_point_var_export      = f"/mnt/{STORAGE_ACCOUNT}/{storagecontainerNameExport}"
  try:
    dbutils.fs.mount( 
        source = f"abfss://{storagecontainerNameExport}@{STORAGE_ACCOUNT}.dfs.core.windows.net/", 
        mount_point = mount_point_var_export, 
        extra_configs = configs)
  except ExecutionError as e:
    print (e)



In [None]:
def time_hop_generator(start_time,days_ago,interval):
  '''
  This generator recieves a start time(datatime), number of days(int) and time interval(minutes) string.
  Yields earliest and latest times for each interval.
  '''
  
  loop_count = int(days_ago * 24 * (60/interval))
  for i in range(0, loop_count):
    latest = start_time - timedelta(minutes=(interval * i))
    earliest = start_time - timedelta(minutes=(interval * (i+1)))
    yield (earliest, latest)

In [None]:
def get_file_path(start_time):
  '''This function recieves a pandas timestamp object
     Returns the formatted data path'''
  year = str(start_time.year)
   
  # Month case.
  if(start_time.month < 10):
    month = "0" + str(start_time.month)
  else:
    month = str(start_time.month)
   
  # Day case.
  if(start_time.day < 10):
    day = "0" + str(start_time.day)
  else:
    day = str(start_time.day)
  
  # Hour case.
  if(start_time.hour < 10):
    hour = "0" + str(start_time.hour)
  else:
    hour = str(start_time.hour)
  return f"y={year}/m={month}/d={day}/h={hour}"

In [None]:
def get_token(tenant, sp_id, sp_secret):
    """Obtain authentication token using a Service Principal"""
    login_url = "https://login.microsoftonline.com/"+tenant+"/oauth2/token"
    resource = "https://api.loganalytics.io"

    payload = {
        'grant_type': 'client_credentials',
        'client_id': sp_id,
        'client_secret': sp_secret,
        'Content-Type': 'x-www-form-urlencoded',
        'resource': resource
    }
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    try:
        response = requests.post(login_url, data=payload, verify=False)
        
    except Exception as error:
        logging.error(error)
    
    if (response.status_code >= 200 and response.status_code <= 299):
        logging.info('Token obtained')
        token = json.loads(response.content)["access_token"]
        return {"Authorization": str("Bearer "+ token)}
    else:
        logging.error("Unable to Read: " + format(response.status_code))

In [None]:
def get_data(query, token, azure_log_customer_id):
    """Executes a KQL on a Azure Log Analytics Workspace
    
    Keyword arguments:
    query -- Kusto query to execute on Azure Log Analytics
    token -- Authentication token generated using get_token
    azure_log_customer_id -- Workspace ID obtained from Advanced Settings
    """
    
    az_url = "https://api.loganalytics.io/v1/workspaces/"+ azure_log_customer_id + "/query"
    query = {"query": query}

    try:
        response = requests.get(az_url, params=query, headers=token)
    except Exception as error:
        logging.error(error)
    
    if (response.status_code >= 200 and response.status_code <= 299):
        logging.info('Query ran successfully')
        return response
      
    else:
        logging.error("Unable to Read: " + format(response.status_code))

In [None]:
def response_to_dataframe(response):
  """
  """
  response_json = response.json()
  formated_data = response_json['tables'][0]
  rows, columns = formated_data['rows'], formated_data['columns']
  df = pd.DataFrame(rows, columns=[col["name"] for col in columns])
  return df

In [None]:
def export_table(table_name,days_ago):
  '''
  This function recieves table name and number of days ago. 
  Exports the data to a storage account.
  '''
  service_credential_key_name_sp = SERVICE_CREDENTIAL_KEY_NAME_SERVICE_PRINCIPAL 
  service_credential_key_name_sa = SERVICE_CREDENTIAL_KEY_NAME_STORAGE_KEY
  storage_account_key = dbutils.secrets.get(scope=SCOPE_NAME,key=service_credential_key_name_sa)
  aad_appkey = dbutils.secrets.get(scope=SCOPE_NAME,key=service_credential_key_name_sp)
  container_name = "export-{}".format(table_name.lower())
  
  initialize_storage_account(STORAGE_ACCOUNT, storage_account_key)
  sp_token = get_token(TENANT_ID, sp_id=AAD_APPID, sp_secret=aad_appkey)
  file_system_client = service_client.get_file_system_client(file_system=container_name)
 
  now = datetime.now()
  start_time = datetime(now.year, now.month, now.day, 00, 00, 00, 00000)
  time_hops = time_hop_generator(start_time,days_ago,interval)
  
  # Export the data for every time interval per table.
  for time_hop in time_hops:
    query = f"""{table_name} | where TimeGenerated between (todatetime('{time_hop[0]}') .. todatetime('{time_hop[1]}'))"""
    response = get_data(query=query,token=sp_token, azure_log_customer_id=WORKSPACE_ID)
    df = response_to_dataframe(response)
    start_time = pd.to_datetime(df['TimeGenerated'].min(),infer_datetime_format=True)
    directory_client = file_system_client.get_directory_client(get_file_path(start_time))
   
    # Check if not empty.
    if df.empty:
      continue
    
    try:
      directory_client.create_file(f"df_{start_time}")
      df.to_parquet(f"/dbfs/mnt/{STORAGE_ACCOUNT}/{container_name}/{get_file_path(start_time)}/df_{start_time}", index=False)
     
    except Exception as e:
       print(e)

In [None]:
# Exporting the tables.
for table_name in tables:
  export_table(table_name,days_ago)

In [None]:
## Reading the data.
# testing_df = spark.read.option("header",True) \
#      .parquet(f"/mnt/{STORAGE_ACCOUNT}/export-{tables[0]}/y=2022/m=09/d=10/*")
# testing_df.limit(20).toPandas()