# Master notebook
#### This is the master notebook which contains the code to process the data from the shortcut and call the child notebooks to create the temporary delta tables. The code contains the following steps:
1. List the files in the input directory.
2. Filters the files which are required to be processed.
3. Group the files based on the number of jobs that can be run in parallel.
4. Call the child notebooks to process the data in parallel.
5. Merge the temporary delta tables to create the final delta table.
6. Save the final delta table as a table in the Lakehouse.

In [None]:
# declaring the variables
input_file_path = f"Files/nycyellotaxi-backup"
output_path = "Files/parquet-to-delta-table-fabric"
keywords_to_be_considered =['2022','2021','2020','2019']
no_of_parallel_jobs = 6


In [None]:
# Listing the files in the  shortcut path
from notebookutils import mssparkutils
_input_files_path =   mssparkutils.fs.ls(f"{input_file_path}")
input_files_path = []
for fileinfo in _input_files_path:
    input_files_path.append(fileinfo.path)

# Filtering the files based on the keywords.
files_path = []
filtered_list_of_path = []
for keyword in keywords_to_be_considered:
    filtered_list_of_path = [i for i in input_files_path if keyword in i]
    for f in filtered_list_of_path:
        files_path.append(f)

In [None]:
len(files_path)

In [None]:
# Group the files based on the number of jobs that can be run in parallel.

def chunkIt(seq, num):
    avg = len(seq) / float(num)
    out = []
    last = 0.0

    while last < len(seq):
        out.append(seq[int(last):int(last + avg)])
        last += avg

    return out

In [None]:
files_list_part = chunkIt(filtered_list_of_path, no_of_parallel_jobs)

In [None]:
# creating the list of notebooks to be executed in parallel with the parameters that are required for the child notebook.
notebooks = []
for i in range(0, no_of_parallel_jobs):
    notebook = {"path": "/child_notebook_parallelism", "params": {"files_list_part": f"{files_list_part[i]}", "output_path" : f"{output_path}/temp/batch{i}"}}
    notebooks.append(notebook)

In [None]:
# execute the child notebooks in parallel
from concurrent.futures import ThreadPoolExecutor
timeout = 1800 # 3600 seconds = 1 hour
# notebooks = [
#     {"path": "/childnotebook", "params": {"files_list_part": f"{files_list_part[0]}", "output_path" : f"{output_path}/temp/batch0"}},
#     {"path": "/childnotebook", "params": {"files_list_part": f"{files_list_part[1]}", "output_path" : f"{output_path}/temp/batch1"}},
#     {"path": "/childnotebook", "params": {"files_list_part": f"{files_list_part[2]}", "output_path" : f"{output_path}/temp/batch2"}},
# ]

with ThreadPoolExecutor() as ec:
    for notebook in notebooks:
        f = ec.submit(mssparkutils.notebook.run, notebook["path"], timeout, notebook["params"])
        # print(f"notebook-path - {notebook['path']}")
        # print(f"notebook-params - {notebook['params']}")

In [None]:
table_delta_file_location = f"Tables/test2_merge"
table_full_name = "test2_merge"
merge_join_condition = "source.hash_key = target.hash_key"

In [None]:
def create_delta_table(
    df,
    table_full_name,
    table_delta_file_location
):
    isDeltaTableAlreadyPresent = 0
    try:
        mssparkutils.fs.ls(table_delta_file_location)
        isDeltaTableAlreadyPresent = 1
    except:
    #writing the delta table into the curated location
        df.write.format("delta").mode("overwrite").save(table_delta_file_location)
        sqltext = f"CREATE TABLE IF NOT EXISTS {table_full_name} USING DELTA LOCATION '{table_delta_file_location}'"
        # print(sqltext)
        spark.sql(sqltext)
    return  isDeltaTableAlreadyPresent

In [None]:
def mergeDeltaTable(
    table_full_name,
    df,
    merge_join_condition
):
    df.createOrReplaceTempView("temp_vw_new_data")
    sqltext = (f'''

    MERGE INTO {table_full_name} as source
    USING temp_vw_new_data as target
    ON {merge_join_condition}    
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *

    
    
    ''')
    # print(sqltext)
    spark.sql(sqltext)

## The below code is merging the delta tables that are being created by the child notebooks in the above parallel execution.


In [None]:
import time
import timeit
import functools
from pyspark.sql import DataFrame
from pyspark.sql.functions import expr, col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

start_time = time.time()
output_dfs = []
# reading all the output parquet files from the parallel jobs output
for i in range(0, no_of_parallel_jobs):
    input_path = f"{output_path}/temp/batch{i}"
    df = spark.read.parquet(input_path)
    output_dfs.append(df)

# union all the dataframes into one dataframe
df_output = functools.reduce(DataFrame.unionAll, output_dfs)
# removing the duplicates from the dataframe
windowSpec = Window.partitionBy("hash_key").orderBy("hash_key")
df_output = df_output.withColumn("row_num", row_number().over(windowSpec)).filter("row_num=1")
df_output = df_output.drop("row_num")
# creating the delta table if it is not present
isDeltaTableAlreadyPresent = create_delta_table(
        df=df_output,
        table_full_name=table_full_name,
        table_delta_file_location=table_delta_file_location
)
print (f"isDeltaTableAlreadyPresent = {isDeltaTableAlreadyPresent} [[ 0= Not Present, so we created the delta table. 1= present ]], we skip creation of the delta table")

# merging the new dataframe with the delta table
if (isDeltaTableAlreadyPresent==1):
    print(" We are going to merge the new dataframe with the delta table")
    mergeDeltaTable(
            merge_join_condition=merge_join_condition,
            df=df_output,
            table_full_name=table_full_name
    )
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution time: {elapsed_time}")

In [None]:
%%sql

select count(*) from test2_merge