# Install/Import necessary libraries

In [0]:
from datetime import date, datetime, timedelta
from pyspark.sql.types import StringType, DateType
import requests
import json
from pathlib import Path

# Create Function to Get Access Token and Expiry Time

In [0]:
def GetAccessToken(client_id, client_secret, authority_url, scope):
    # Get Token
    data = { 'grant_type':'client_credentials',
     'client_id': client_id,
     'client_secret': client_secret,
     'resource': scope }
    result = requests.post(url=authority_url, data=data).json()
    #Extract token info
    access_token = result['token_type'] + " " + result['access_token']
    expires_in = int(result['expires_in']) - 300
    expires_at = datetime.now() + timedelta(seconds=expires_in)
    return access_token, expires_at

# UDF for Calling REST API

In [0]:
@udf(returnType=StringType())
def do_requests(rawpath, fileName, access_token, RunDate):
    # Initialize variables
    activityDate = RunDate.strftime("%Y-%m-%d")
    activityYear = RunDate.strftime("%Y")
    activityMonth = RunDate.strftime("%m")
    activityDay = RunDate.strftime("%d")
    url = "https://api.powerbi.com/v1.0/myorg/admin/activityevents?startDateTime='" + activityDate + "T00:00:00.000'&endDateTime='" + activityDate + "T23:59:59.999'"
    incrementPath = f"{rawpath}/{activityYear}/{activityMonth}/{activityDay}"
    writePath = incrementPath[1:]
    
    # Get latest Power BI Activities & Set continuation URL
    header = {'Content-Type':'application/json', 'Authorization':f'{access_token}'}
    api_call = requests.get(url=url, headers=header).json()
    activityEventEntities = api_call.get('activityEventEntities', [])
    contUrl = api_call.get('continuationUri')

    # Call Continuation URL as long as results get one back to get all activities through the day
    while contUrl is not None:   
        api_call_cont = requests.get(url=contUrl, headers=header).json()
        contUrl = api_call_cont['continuationUri']
        activityEventEntities = activityEventEntities + api_call_cont['activityEventEntities']

    if activityEventEntities:
        # Serializing json 
        json_object = json.dumps(activityEventEntities, indent = 4)
        
        #Create folder
        Path(writePath).mkdir(parents=True, exist_ok=True)
        
        # Wrting to json file
        with open(f'{writePath}/{fileName}', "w") as outfile:
            outfile.write(json_object)
        
        return incrementPath

# Calling UDF in Parallel via Spark Dataframe

In [0]:
def parallel_request(dateList, basePath, rawpath, fileName, access_token):
    from pyspark.sql.functions import lit, col
    path = [data[0] for data in spark
                     .createDataFrame(dateList, DateType())
                     .select(do_requests(lit(rawpath), lit(fileName), lit(access_token), col("value")).alias("result"))
                     .filter(col("result").isNotNull())
                     .collect()]
    dbutils.fs.cp(f"file:///databricks/driver{basePath}", f"dbfs:{basePath}", True)
    dbutils.fs.rm(f"file:///databricks/driver{basePath}", True)
    return path

# Create Function to generate date list between start and end date

In [0]:
def generate_date_list(startDate, endDate):
    return [startDate+timedelta(days=x) for x in range((endDate-startDate).days+1)]

# Create Function to check if Delta table exists

In [0]:
def delta_table_exists(path):
    from delta.tables import DeltaTable
    try:
        DeltaTable.forPath(spark, path)
        return True
    except Exception as e:
        return False

# Function to read json Data

In [0]:
def dfRead(fileType, path, schema=None):
    if fileType.lower() == 'delta':
        return (spark
          .read
          .format(fileType)
          .option("multiline", "true")
          .load(path))
    else:
        return (spark
          .read
          .format(fileType)
          .option("inferSchema", "false")
          .schema(schema)
          .option("recursiveFileLookup","true")
          .option("multiline", "true")
          .load(path))

# Function to Optimize and Vacuum table

In [0]:
def OptimizeAndVacuum(tableName, ZOrder=None, RetainHours=0):
    spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled",False)
    optimizeBase = f"OPTIMIZE {tableName}"
    ZOrderBase = f" ZORDER BY ({ZOrder})" if ZOrder is not None else ""
    optimizeFull = optimizeBase + ZOrderBase
    VacuumFull = f"VACUUM {tableName} RETAIN {RetainHours} HOURS"
    spark.sql(optimizeFull)
    spark.sql(VacuumFull)