# 📌 Attach Default Lakehouse
❗**Note the code in the cell that follows is required to programatically attach the lakehouse and enable the running of spark.sql(). If this cell fails simply restart your session as this cell MUST be the first command executed on session start.**

In [None]:
%%configure
{
    "defaultLakehouse": {  
        "name": "{{lakehouse_name}}",
    }
}

# 📦 Pip
Pip installs reqired specifically for this template should occur here

In [None]:
!pip install jsonpickle
!pip install tabulate

# 🔗 Imports

In [None]:
from notebookutils import mssparkutils # type: ignore
from dataclasses import dataclass
import jsonpickle # type: ignore
import pandas as pd # type: ignore
from tabulate import tabulate # type: ignore
from pyspark.sql.functions import *
import os

# #️⃣ Functions

In [None]:
@dataclass
class NotebookResult:    
    notebook: str
    start_time: float
    status: str
    error: str
    execution_time: float
    run_order: int
    
@dataclass
class FileListing:
    """Class for Files - Attributes: name, directory"""
    name: str
    directory: str

def get_file_content_using_notebookutils(file):
    """Get the content of a file using notebookutils."""
    #return self.mssparkutils.fs.head(file, 1000000000)
    data = spark.sparkContext.wholeTextFiles(file).collect() # type: ignore

    # data is a list of tuples, where the first element is the file path and the second element is the content of the file
    file_content = data[0][1]

    return file_content

def remove_file_using_notebookutils(file):
    """Remove a file using notebookutils."""
    try:
        mssparkutils.fs.rm(file, True)
    except:
        pass


def create_path_using_notebookutils(path):
    """Create a path using notebookutils."""
    mssparkutils.fs.mkdirs(path)

def walk_directory_using_notebookutils(path):
    """Walk a directory using notebookutils."""
    # List the files in the directory
    files = mssparkutils.fs.ls(path)

    # Initialize the list of all files
    all_files = []

    # Iterate over the files
    for file in files:
        # If the file is a directory, recursively walk the directory
        if file.isDir:
            all_files.extend(
                walk_directory_using_notebookutils(file.path))
        else:
            # If the file is not a directory, add it to the list of all files
            directory = os.path.dirname(file.path)
            name = file.name
            all_files.append(FileListing(
                name=name, directory=directory))

    return all_files

# Prepare

In [None]:
sql = '''
CREATE TABLE IF NOT EXISTS execution_log (
  notebook STRING,
  start_time DOUBLE,
  status STRING,
  error STRING,
  execution_time DOUBLE,
  run_order INT,
  batch_id INT
)
USING DELTA
'''
spark.sql(sql)

In [None]:
sql = '''
CREATE TABLE IF NOT EXISTS batch (
  batch_id INT,
  start_time LONG,
  status STRING
)
USING DELTA
'''
spark.sql(sql)

In [None]:
# Execute the SQL query to find the latest open batch
latest_batch_id = spark.sql("SELECT MAX(batch_id) AS LatestBatchID FROM batch WHERE status = 'open'").collect()[0]['LatestBatchID']

# Check if there is an open batch and raise an error if there is
if latest_batch_id is not None:
    raise ValueError(f"There is an open batch with BatchID {latest_batch_id}")

In [None]:
# Define the SQL query
sql_query = "SELECT COALESCE(MAX(batch_id), 0) + 1 AS batch_id, UNIX_TIMESTAMP() AS start_time, 'open' AS status FROM batch"

# Execute the SQL query and store the output in a DataFrame
df = spark.sql(sql_query)

# Append the DataFrame to the existing table 'batch'
df.write.format("delta").mode("append").saveAsTable("batch")

# Executions for Each Run Order Below:

# Execution Report

In [None]:
# Get current Open Batch
latest_batch_id = spark.sql("SELECT MAX(batch_id) AS LatestBatchID FROM batch WHERE status = 'open'").collect()[0]['LatestBatchID']
# Create a DataFrame with the updated status
df = spark.table("batch").where((col("batch_id") == latest_batch_id) & (col("status") == "open")).withColumn("status", lit("closed"))
# Update the Delta Lake table with the new status
df.write.format("delta").mode("overwrite").option("replaceWhere", "batch_id = {}".format(latest_batch_id)).save("Tables/batch")

In [None]:
#Check if batch exists
failed_results = []
# Get latest Batch
latest_batch_id = spark.sql("SELECT MAX(batch_id) AS LatestBatchID FROM batch").collect()[0]['LatestBatchID']
# Read the log for this batch execution
df_execution_log = spark.table("execution_log").where((col("batch_id") == latest_batch_id))
if df_execution_log.count() > 0:

    # Check if have succeeded
    all_results = df_execution_log

    # Print the succeeded results
    for row in all_results.select("notebook", "status").collect():
        print(f"Notebook {row['notebook']} execution status: {row['status']}")

# 🛑 Execution Stop

In [None]:
#Exit to prevent spark sql debug cell running 
mssparkutils.notebook.exit("value string")

# Close The Batch Code

In [None]:
#Make sure that the config, pip install and import tasks have been executed before running this code

# Get current Open Batch
latest_batch_id = spark.sql("SELECT MAX(batch_id) AS LatestBatchID FROM batch WHERE status = 'open'").collect()[0]['LatestBatchID']
# Create a DataFrame with the updated status
df = spark.table("batch").where((col("batch_id") == latest_batch_id) & (col("status") == "open")).withColumn("status", lit("closed"))
# Update the Delta Lake table with the new status
df.write.format("delta").mode("overwrite").option("replaceWhere", "batch_id = {}".format(latest_batch_id)).save("Tables/batch")