**'Scheduled_Trigger_Notebook_Progression' Notebook :**

The scheduler notebook runs daily as part of the pipeline **'pl_silver_to_gold'**. This notebook is a scheduler notebook that triggers biweekly unseen prediction based on the dates mentioned in config. The notebook can be scheduled as per user's convenience, but the functions will be executed only on the mentioned dates, and on the other days, running the notebook won't trigger any function for execution.


**1. Importing necessary packages and defining log function :**

>This codeblock imports the necessary packages to run this notebook, also defines the function that's used to generate log.

In [1]:
import logging
from enum import Enum
import pandas as pd
from notebookutils import mssparkutils
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
import json
from datetime import datetime, timedelta

# Set up logging configuration

class LogLevel(Enum):
    INFO = "INFO"
    ERROR = "ERROR"
    WARNING = "WARNING"

log_array = []

# Notebook name for log entries
notebook_name = "Scheduled_Trigger_Notebook_Progression"

# Logging function with status options: In Progress, Successful, Failed
def logging(type_, message, status):
    msg = {
        "timestamp": str(pd.Timestamp.now()),  # Timestamp column
        "log_type": type_,                     # Log type column
        "status": status,                      # Status column
        "description": message,                # Description column
        "notebook": notebook_name              # Notebook name column
    }
    log_array.append(msg)
    print(msg)


# Function for storing log table
def createLogfile():
    try:
        # Initialize Spark session
        spark = SparkSession.builder.appName("ScheduleTriggerNotebook").getOrCreate()

        # Convert the log_array to a Spark DataFrame
        log_rows = [Row(**item) for item in log_array]  # Unpack the dictionary for Row entries
        new_log_df = spark.createDataFrame(log_rows)

        # Define the existing log table name
        with open("/lakehouse/default/Files/progression_config_template/progression_config.json") as config_file:
            config = json.load(config_file)
        
        # Define the existing log table name
        existing_log_table_name =config["progression_log"]
    
        # Write the new logs to the table (overwrite)
        new_log_df.write.mode("append").saveAsTable(existing_log_table_name)

        logging(LogLevel.INFO.value,f"Logging information saved in table '{existing_log_table_name}'",status="Successful")

        print(f"Logging information saved in table '{existing_log_table_name}'.")
        
    except Exception as e:
        logging(LogLevel.ERROR.value,f"Error appending log table: {e}",status="Failed")

StatementMeta(, 98cabff6-499e-4ae9-919c-8fd87bcf6101, 3, Finished, Available, Finished)

**2. Fetching Config File :**

> This codeblock fetches the config file. The config file is necessary as it contains scheduled dates for Bi-weekly unseen prediction.

In [2]:
# Load the Config JSON file
try:
    logging(LogLevel.INFO.value,f"Running Scheduled Trigger Notebook for Progression.",status="Successful")
    with open("/lakehouse/default/Files/progression_config_template/progression_config.json") as time_config:
                config = json.load(time_config)
except Exception as e:
    logging(LogLevel.ERROR.value,f"Error loading config file: {e}",status="Failed")
    createLogfile()
    raise e                

StatementMeta(, 98cabff6-499e-4ae9-919c-8fd87bcf6101, 4, Finished, Available, Finished)

{'timestamp': '2024-12-03 14:35:18.809625', 'log_type': 'INFO', 'status': 'Successful', 'description': 'Running Scheduled Trigger Notebook for Progression.', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}


**3. Bi-weekly Unseen Batch Prediction Trigger :**

>From config, the codeblock fetches the date for "bl_last_biweekly_run" and "dl_last_biweekly_run". Then it gets today's date. If today's date and last_biweekly_run date has a difference of less than 14 days, the function doesn't get executed.

>In case today's date and last_biweekly_run date has a difference of more than 14 days, the codeblock runs corresponding unseen prediction notebook and new predictions are generated for the unseen student records. In the end "bl_last_biweekly_run" and "dl_last_biweekly_run" gets updated with today's date, and same cycle runs again 14 days later.

In [3]:
# Extract the last biweekly run date
bl_last_biweekly_run = datetime.strptime(config["pipeline_schedule"]["bl_last_biweekly_run"], '%Y-%m-%d')

# Get today's date
today = datetime.today()

# Check if two weeks (14 days) have passed since the last biweekly run
if today.date() >= (bl_last_biweekly_run.date() + timedelta(days=14)):
    logging(LogLevel.INFO.value,f"Day for new unseen batch prediction : {today}. Starting Batch Prediction notebook...",status="Successful")
    ####################################
    # Execute Notebook: Unseen Prediction
    ####################################
    execute_notebook_name = "FT_Progression_Analysis_Unseen"
    try:
        logging(LogLevel.INFO.value,f"Starting '{execute_notebook_name}' notebook...",status="Successful")

        # Run the data_extraction_validation notebook
        result = mssparkutils.notebook.run(execute_notebook_name, 86000)
        
        logging(LogLevel.INFO.value,f"'{execute_notebook_name}' notebook completed successfully.",status="Successful")
        logging(LogLevel.INFO.value,f"New predictions generated for new students.",status="Successful")

        # Update the last_biweekly_run date to today and save it back to the JSON file
        config["pipeline_schedule"]["bl_last_biweekly_run"] = today.strftime('%Y-%m-%d')

        with open("/lakehouse/default/Files/progression_config_template/progression_config.json", "w") as bi_weekly_config:
            json.dump(config, bi_weekly_config)
            bi_weekly_config.close()

        logging(LogLevel.INFO.value,f"Bi-weekly run date updated for BL.",status="Successful")

    except Exception as e:
        logging(LogLevel.ERROR.value,f"Error running '{execute_notebook_name}' notebook: {e}",status="Failed")
        createLogfile()
        raise e

StatementMeta(, 98cabff6-499e-4ae9-919c-8fd87bcf6101, 5, Finished, Available, Finished)

{'timestamp': '2024-12-03 14:35:19.559425', 'log_type': 'INFO', 'status': 'Successful', 'description': 'Day for new unseen batch prediction : 2024-12-03 14:35:19.559155. Starting Batch Prediction notebook...', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
{'timestamp': '2024-12-03 14:35:19.559510', 'log_type': 'INFO', 'status': 'Successful', 'description': "Starting 'FT_Progression_Analysis_Unseen' notebook...", 'notebook': 'Scheduled_Trigger_Notebook_Progression'}


{'timestamp': '2024-12-03 14:36:33.967817', 'log_type': 'INFO', 'status': 'Successful', 'description': "'FT_Progression_Analysis_Unseen' notebook completed successfully.", 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
{'timestamp': '2024-12-03 14:36:33.967870', 'log_type': 'INFO', 'status': 'Successful', 'description': 'New predictions generated for new students.', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
{'timestamp': '2024-12-03 14:36:34.041938', 'log_type': 'INFO', 'status': 'Successful', 'description': 'Bi-weekly run date updated for BL.', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}


In [4]:
# Extract the last biweekly run date
dl_last_biweekly_run = datetime.strptime(config["pipeline_schedule"]["dl_last_biweekly_run"], '%Y-%m-%d')

# Get today's date
today = datetime.today()

# Check if two weeks (14 days) have passed since the last biweekly run
if today.date() >= (dl_last_biweekly_run.date() + timedelta(days=14)):
    logging(LogLevel.INFO.value,f"Day for new unseen batch prediction : {today}. Starting Batch Prediction notebook...",status="Successful")
    ####################################
    # Execute Notebook: Unseen Prediction
    ####################################
    execute_notebook_name = "PT_Progression_Analysis_Unseen"
    try:
        logging(LogLevel.INFO.value,f"Starting '{execute_notebook_name}' notebook...",status="Successful")

        # Run the data_extraction_validation notebook
        result = mssparkutils.notebook.run(execute_notebook_name, 86000)
        
        logging(LogLevel.INFO.value,f"'{execute_notebook_name}' notebook completed successfully.",status="Successful")
        logging(LogLevel.INFO.value,f"New predictions generated for new students.",status="Successful")

        # Update the last_biweekly_run date to today and save it back to the JSON file
        config["pipeline_schedule"]["dl_last_biweekly_run"] = today.strftime('%Y-%m-%d')

        with open("/lakehouse/default/Files/progression_config_template/progression_config.json", "w") as bi_weekly_config:
            json.dump(config, bi_weekly_config)
            bi_weekly_config.close()

        logging(LogLevel.INFO.value,f"Bi-weekly run date updated for DL.",status="Successful")

    except Exception as e:
        logging(LogLevel.ERROR.value,f"Error running '{execute_notebook_name}' notebook: {e}",status="Failed")
        createLogfile()
        raise e

StatementMeta(, 98cabff6-499e-4ae9-919c-8fd87bcf6101, 6, Finished, Available, Finished)

{'timestamp': '2024-12-03 14:36:36.453318', 'log_type': 'INFO', 'status': 'Successful', 'description': 'Day for new unseen batch prediction : 2024-12-03 14:36:36.453121. Starting Batch Prediction notebook...', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
{'timestamp': '2024-12-03 14:36:36.453386', 'log_type': 'INFO', 'status': 'Successful', 'description': "Starting 'PT_Progression_Analysis_Unseen' notebook...", 'notebook': 'Scheduled_Trigger_Notebook_Progression'}


{'timestamp': '2024-12-03 14:37:24.389893', 'log_type': 'INFO', 'status': 'Successful', 'description': "'PT_Progression_Analysis_Unseen' notebook completed successfully.", 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
{'timestamp': '2024-12-03 14:37:24.389970', 'log_type': 'INFO', 'status': 'Successful', 'description': 'New predictions generated for new students.', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
{'timestamp': '2024-12-03 14:37:24.469830', 'log_type': 'INFO', 'status': 'Successful', 'description': 'Bi-weekly run date updated for DL.', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}


In [5]:
logging(LogLevel.INFO.value,f"Daily Scheduled Trigger Notebook run for Progression complete.",status="Successful")
createLogfile()

StatementMeta(, 98cabff6-499e-4ae9-919c-8fd87bcf6101, 7, Finished, Available, Finished)

{'timestamp': '2024-12-03 14:37:25.376410', 'log_type': 'INFO', 'status': 'Successful', 'description': 'Daily Scheduled Trigger Notebook run for Progression complete.', 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
{'timestamp': '2024-12-03 14:37:26.604762', 'log_type': 'INFO', 'status': 'Successful', 'description': "Logging information saved in table 'SilverData.progression_log_table'", 'notebook': 'Scheduled_Trigger_Notebook_Progression'}
Logging information saved in table 'SilverData.progression_log_table'.


In [None]:
# display(spark.sql("select * from silverdata.progression_log_table order by timestamp desc"))