# #️⃣ Parameters

In [None]:
pm_batch_id = None
pm_log_lakehouse = None
pm_master_notebook = None

# 📦 Pip
Pip installs reqired specifically for this template should occur here

In [None]:
import importlib

jsonpickle_loader = importlib.find_loader('jsonpickle')
if jsonpickle_loader is None:
    print("Install jsonpickle")
    !pip install jsonpickle
else:
    print("jsonpickle Already Installed")

# 🔗 Imports

In [None]:
from notebookutils import mssparkutils # type: ignore
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import json
import time
import jsonpickle # type: ignore
import json
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
from datetime import datetime

# #️⃣ Functions

In [None]:
notebook_files1 = {{ notebook_files }}
run_order1 = {{ run_order }}

# Define a function to execute a notebook and return the results
@dataclass
class NotebookResult:    
    notebook: str
    start_time: datetime
    status: str
    error: str
    execution_time: int
    run_order: int

def execute_notebook(notebook_file):
    start_time = datetime.now()

    try:
        mssparkutils.notebook.run(notebook_file)
        status = 'success'
        error = None
    except Exception as e:
        status = 'error'
        error = str(e)

    end_time = datetime.now()
    execution_time_seconds = (end_time - start_time).total_seconds()  # Calculate execution time in seconds
    execution_time = int(execution_time_seconds)  # Convert to integer for whole number representation
    run_order = run_order1

    result = NotebookResult(notebook_file, start_time, status, error, execution_time,run_order)
    return result

@dataclass
class FileListing:
    """Class for Files - Attributes: name, directory"""
    name: str
    directory: str

def get_file_content_using_notebookutils(file):
    """Get the content of a file using notebookutils."""
    #return self.mssparkutils.fs.head(file, 1000000000)
    data = spark.sparkContext.wholeTextFiles(file).collect() # type: ignore

    # data is a list of tuples, where the first element is the file path and the second element is the content of the file
    file_content = data[0][1]

    return file_content

def create_path_using_notebookutils(path):
    """Create a path using notebookutils."""
    mssparkutils.fs.mkdirs(path)

def walk_directory_using_notebookutils(path):
    """Walk a directory using notebookutils."""
    # List the files in the directory
    files = mssparkutils.fs.ls(path)

    # Initialize the list of all files
    all_files = []

    # Iterate over the files
    for file in files:
        # If the file is a directory, recursively walk the directory
        if file.isDir:
            all_files.extend(
                walk_directory_using_notebookutils(file.path))
        else:
            # If the file is not a directory, add it to the list of all files
            directory = os.path.dirname(file.path)
            name = file.name
            all_files.append(FileListing(
                name=name, directory=directory))

    return all_files


# Load the Execution Log


In [None]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("notebook", StringType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("status", StringType(), True),
    StructField("error", StringType(), True),
    StructField("execution_time", IntegerType(), True),
    StructField("run_order", IntegerType(), True),
    StructField("batch_id", StringType(), True)
])

# Create an empty DataFrame with the defined schema
failed_results = spark.createDataFrame([], schema=schema)
# Read the log for this batch execution
df_execution_log = spark.sql(f"SELECT * FROM {pm_log_lakehouse}.execution_log WHERE batch_id = '{pm_batch_id}' AND master_notebook = '{pm_master_notebook}'")
if df_execution_log.count() > 0:
    
    # Check if any have not succeeded
    failed_results = df_execution_log.filter(col("status") != "success")

    # Print the failed results
    for row in failed_results.collect():
        print(f"Notebook {row['notebook']} failed with error: {row['error']}")

    # Check if have succeeded
    succeeded_results = df_execution_log.filter(col("status") == "success")

    # Print the succeeded results
    for row in succeeded_results.collect():
        print(f"Notebook {row['notebook']} succeeded")

# Execute Notebooks 

In [None]:
# Define the schema for the Log DataFrame
schema = StructType([
    StructField("notebook", StringType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("status", StringType(), True),
    StructField("error", StringType(), True),
    StructField("execution_time", IntegerType(), True),
    StructField("run_order", IntegerType(), True)
])

if failed_results.count() == 0:
    new_results = []
    # Use a ThreadPoolExecutor to run the notebooks in parallel
    # Execute the notebooks and collect the results
    with ThreadPoolExecutor(max_workers={{ max_worker }}) as executor:
        new_results = list(executor.map(execute_notebook, notebook_files1))

    # Write the results to the log file
    df_log = spark.createDataFrame(new_results, schema=schema)
    df_log = df_log.withColumn("batch_id", lit(f'{pm_batch_id}'))
    df_log = df_log.withColumn("master_notebook", lit(f'{pm_master_notebook}'))
    df_log.write.format("delta").mode("append").saveAsTable("{pm_log_lakehouse}.execution_log")
else:
    print("Failures in previous run_order... supressing execution")
    raise Exception("Failures in previous run_order... supressing execution")