# Install Semantic Link

In [None]:
!pip install --upgrade semantic-link --q

# Set Lakehouse Names

In [None]:
raw_lakehouse = "lh_raw"
clean_lakehouse = "lh_clean"

# Import Libraries

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from urllib.request import urlretrieve
from notebookutils import mssparkutils
from sempy import fabric
from sempy.fabric.exceptions import FabricHTTPException
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import pathlib
from zipfile import ZipFile
import os
import datetime
import pytz
import time

# Function to get Lakehouse ID

In [None]:
def get_lakehouse_id(lakehouse_name):
    return fabric.list_items("Lakehouse").query(f"`Display Name` == '{lakehouse_name}'").Id.values[0]

# Function to create Lakehouse

In [None]:
def create_lakehouse_if_not_exists(lh_name):
    try:
        return fabric.create_lakehouse(lh_name)
    except FabricHTTPException as exc:
        return get_lakehouse_id(lh_name)

# Get Lakehouse and Workspace IDs

In [None]:
raw_lakehouse_id = create_lakehouse_if_not_exists(raw_lakehouse)
clean_lakehouse_id = create_lakehouse_if_not_exists(clean_lakehouse)
workspace_id = fabric.get_workspace_id()
abfss_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com"
raw_abfss_path = abfss_path + "/" + raw_lakehouse_id
clean_abfss_path = abfss_path + "/" + clean_lakehouse_id
table_folder = "/Tables/"
raw_table_path = raw_abfss_path + table_folder
clean_table_path = clean_abfss_path + table_folder
mount_point = "/lakehouse/default"

# Function to Create Mount Point

In [None]:
def create_mount_point(lakehouse_id):
    mssparkutils.fs.mount(f"{abfss_path}/{lakehouse_id}", mount_point)
    return [m.localPath for m in mssparkutils.fs.mounts() if m.mountPoint == mount_point][0]

# Function to check if Delta Table Exists

In [None]:
def delta_table_exists(path, tbl):
  try:
    DeltaTable.forPath(spark, path + tbl)
    return True
  except Exception as e:
      return False

# Function for Creating or Replacing a Table

In [None]:
def read_delta_table(path, tbl):
    return (
        spark
        .read
        .format('delta')
        .load(path + tbl)
        )

# Function for Creating or Replacing a Table

In [None]:
def create_or_replace_delta_table(df, path, tbl):
    return (
        df
        .write
        .mode('overwrite')
        .option("mergeSchema", "true")
        .format("delta")
        .save(path + tbl)
        )

# Function to unzip files from an archive

In [None]:
def unzip_files(zip_filename, filenames, path):
    # open the zip file
    with ZipFile(zip_filename, 'r') as handle:
        # unzip a batch of files
        handle.extractall(path=path, members=filenames)

# Function to parallel unzip a large number of files

In [None]:
def unzip_parallel(path, zip_filename):
    # open the zip file
    with ZipFile(zip_filename, 'r') as handle:
        # list of all files to unzip
        files = handle.namelist()
    # determine chunksize
    n_workers = 80
    chunksize = round(len(files) / n_workers)
    # start the thread pool
    with ProcessPoolExecutor(n_workers) as exe:
        # split the copy operations into chunks
        for i in range(0, len(files), chunksize):
            # select a chunk of filenames
            filenames = files[i:(i + chunksize)]
            # submit the batch copy task
            _ = exe.submit(unzip_files, zip_filename, filenames, path)

# Define Function for getting first team to Bat or Field

In [None]:
def first_team(batOrField):
  teams = F.map_keys(F.from_json(F.col("team_players"), team_player_schema))
  first_team = teams[0]
  second_team = teams[1]
  return (F.when(F.col("toss_decision")==batOrField,F.col("toss_winner"))
           .when(F.col("toss_winner") == first_team, second_team)
           .otherwise(first_team))

# Function to convert a string of refresh items to json

In [None]:
def convert_to_json(refresh_objects):
    result = []
    if refresh_objects == "All":
        return result
    for item in refresh_objects.split("|"):
        tble, *partitions = item.split(":")
        if partitions:
            for partition in ",".join(partitions).split(","):
                result.append({"table": table, "partition": partition.strip()})
        else:
            result.append({"table": table})
    return result

# Function to call Enhanced Refresh API

In [None]:
def start_enhanced_refresh(dataset_name, workspace_name = fabric.get_workspace_id(), refresh_objects = "All", refresh_type = "full", commit_mode = "transactional", max_parallelism = 10, retry_count = 0, apply_refresh_policy = False, effective_date = datetime.date.today()):
    objects_to_refresh = convert_to_json(refresh_objects)
    return fabric.refresh_dataset(
        workspace = workspace_name,
        dataset = dataset_name,
        objects = objects_to_refresh,
        refresh_type = refresh_type,
        max_parallelism = max_parallelism,
        commit_mode = commit_mode,
        retry_count = retry_count,
        apply_refresh_policy = apply_refresh_policy,
        effective_date = effective_date
        )

# Function to get Refresh Details

In [None]:
def get_enhanced_refresh_details(dataset_name, refresh_request_id, workspace_name = fabric.get_workspace_id(), tzInfo = pytz.timezone('Asia/Kolkata'), date_format = "%d-%b-%Y, %H:%M"):
    refresh_details = fabric.get_refresh_execution_details(workspace = workspace_name, dataset = dataset_name, refresh_request_id= refresh_request_id)
    df = pd.DataFrame()
    df['workspace'] = [workspace_name]
    df['dataset'] = [dataset_name]
    df['start_time'] = [refresh_details.start_time.astimezone(tzInfo).strftime(date_format)]
    df['end_time'] = [refresh_details.end_time.astimezone(tzInfo).strftime(date_format)]
    df['status'] = [refresh_details.status]
    df['extended_status'] = [refresh_details.extended_status]
    df['number_of_attempts'] = [refresh_details.number_of_attempts]
    df['key'] = 0
    df_object = refresh_details.objects
    df_object['key'] = 0
    df_msg = refresh_details.messages
    df_msg['key'] = 0
    df.merge(df_object, how = 'outer').merge(df_msg, how='outer').drop(columns = ['key'])
    return df

# Function for synchronous refresh of dataset

In [None]:
def refresh_and_wait(dataset_name):
    request_id = start_enhanced_refresh(dataset_name)
    request_status = "Unknown"
    while request_status == "Unknown":
        time.sleep(30)
        request_status_df = get_enhanced_refresh_details(dataset_name, request_id)
        display(request_status_df)
        request_status = request_status_df['status'].iloc[0]