# Gold Layer

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType,StructField,StringType,IntegerType, DoubleType, DateType
from delta import DeltaTable
from pyspark.errors import AnalysisException

In [None]:
processing_date = "2026-01-01"
workspace="fabric_DEV"

In [None]:
# fabric config
fabric_silver_path=f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/lms_LH_Silver.Lakehouse/Tables/student_table"
fabric_gold_path=f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/lms_LH_Gold.Lakehouse/Tables"

## Read Silver Table

In [None]:
spark.sql("USE lms_LH_Silver")

In [None]:
# read only today's data from silver lakehouse
df = spark.read.format('delta')\
    .load(fabric_silver_path).filter(col("Processing_Date") == str(processing_date))

In [None]:
spark.sql("USE lms_LH_Gold")

## Create Dimension Tables

In [None]:
def get_metrics(df_delta):
    # Get the history of the Delta table to extract metrics
    history_df = df_delta.history(1)  # Get the latest operation

    # Extract metrics from the history DataFrame
    operation_metrics = history_df.select("operationMetrics").collect()[0][0]

    # Extract specific metrics
    rows_inserted = operation_metrics.get('numTargetRowsInserted', 0)
    rows_updated = operation_metrics.get('numTargetRowsUpdated', 0)
    rows_deleted = operation_metrics.get('numTargetRowsDeleted', 0)
    rows_affected = int(rows_inserted) + int(rows_updated) + int(rows_deleted)

    print('Total rows of table: ',df_delta.toDF().count())
    print("Merge Metrics:")
    print(f"Rows inserted: {rows_inserted}")
    print(f"Rows updated: {rows_updated}")
    print(f"Rows deleted: {rows_deleted}")
    print(f"Total rows affected: {rows_affected}")

### 1. Dimension Table: dim_course

In [None]:
# Dim_Course Schema
dim_course_schema = StructType([
    StructField("Course_ID", StringType(), True),
    StructField("Course_Name", StringType(), True),
    StructField("Grade_Level", StringType(), True)
])
dim_course_name = "dim_course"
DeltaTable.createIfNotExists(spark)\
        .tableName(dim_course_name)\
        .location(f"{fabric_gold_path}/{dim_course_name}")\
        .addColumns(dim_course_schema)\
        .execute()

In [None]:
# load data into table
df_selected_dim_course = df.select("Course_ID","Course_Name","Grade_Level")
path = f"{fabric_gold_path}/{dim_course_name}"
if DeltaTable.isDeltaTable(spark,path):
    print("Table exists")
    delta_tbl = DeltaTable.forPath(spark,path)
    (
        delta_tbl.alias("target").merge(
        df_selected_dim_course.alias("source"),
        "target.Course_ID = source.Course_ID"
        ).whenMatchedUpdate(set={
            "Course_ID": "source.Course_ID",
            "Course_Name": "source.Course_Name",
            "Grade_Level": "source.Grade_Level"
        }).whenNotMatchedInsert(values={
            "Course_ID": "source.Course_ID",
            "Course_Name": "source.Course_Name",
            "Grade_Level": "source.Grade_Level"
        }).execute()
    )
    # get metric
    get_metrics(delta_tbl)

else:
    raise AnalysisException

### 2. Dimension Table: dim_student

In [None]:
# Define schemas
dim_student_schema = StructType([
    StructField("Student_ID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Demographic_Group", StringType(), True),
    StructField("Internet_Access", StringType(), True),
    StructField("Learning_Disabilities", StringType(), True),
    StructField("Preferred_Learning_Style", StringType(), True),
    StructField("Language_Proficiency", StringType(), True),
    StructField("Parent_Involvement", StringType(), True)
])
dim_student_name = "dim_student"
DeltaTable.createIfNotExists(spark)\
        .tableName(dim_student_name)\
        .location(f"{fabric_gold_path}/{dim_student_name}")\
        .addColumns(dim_student_schema)\
        .execute()

In [None]:
# load data into table
df_selected_dim_student = (
    df.select( 
        "Student_ID", 
        "Name", 
        "Age", 
        "Gender", 
        "Demographic_Group", 
        "Internet_Access", 
        "Learning_Disabilities", 
        "Preferred_Learning_Style", 
        "Language_Proficiency", 
        "Parent_Involvement")
    )
path = f"{fabric_gold_path}/{dim_student_name}"
if DeltaTable.isDeltaTable(spark,path):
    print("Table exists")
    delta_tbl = DeltaTable.forPath(spark,path)
    (
        delta_tbl.alias("target").merge(
        df_selected_dim_student.alias("source"),
        "target.Student_ID = source.Student_ID"
        ).whenMatchedUpdate(set={
            "Name": "source.Name",
            "Age": "source.Age",
            "Gender": "source.Gender",
            "Demographic_Group": "source.Demographic_Group",
            "Internet_Access": "source.Internet_Access",
            "Learning_Disabilities": "source.Learning_Disabilities",
            "Preferred_Learning_Style": "source.Preferred_Learning_Style",
            "Language_Proficiency": "source.Language_Proficiency",
            "Parent_Involvement": "source.Parent_Involvement"
        }).whenNotMatchedInsert(values={
            "Student_ID": "source.Student_ID",
            "Name": "source.Name",
            "Age": "source.Age",
            "Gender": "source.Gender",
            "Demographic_Group": "source.Demographic_Group",
            "Internet_Access": "source.Internet_Access",
            "Learning_Disabilities": "source.Learning_Disabilities",
            "Preferred_Learning_Style": "source.Preferred_Learning_Style",
            "Language_Proficiency": "source.Language_Proficiency",
            "Parent_Involvement": "source.Parent_Involvement"
        }).execute()
    )
    # get metric
    get_metrics(delta_tbl)

else:
    raise AnalysisException

## Creating Fact Table: fact_student_performance

In [None]:
# Define schema 
fact_student_performance_schema = StructType([
    StructField("Student_ID", StringType(), True),
    StructField("Course_ID", StringType(), True),
    StructField("Enrollment_Date", DateType(), True),
    StructField("Completion_Date", DateType(), True),
    StructField("Status", StringType(), False),
    StructField("Final_Grade", StringType(), False),
    StructField("Attendance_Rate", DoubleType(), False),
    StructField("Time_Spent_on_Course_hrs", DoubleType(), False),
    StructField("Assignments_Completed", IntegerType(), False),
    StructField("Quizzes_Completed", IntegerType(), False),
    StructField("Forum_Posts", IntegerType(), False),
    StructField("Messages_Sent", IntegerType(), False),
    StructField("Quiz_Average_Score", DoubleType(), False),
    StructField("Assignment_Scores", StringType(), True),
    StructField("Assignment_Average_Score", DoubleType(), False),
    StructField("Project_Score", DoubleType(), False),
    StructField("Extra_Credit", DoubleType(), False),
    StructField("Overall_Performance", DoubleType(), False),
    StructField("Feedback_Score", DoubleType(), False),
    StructField("Completion_Time_Days", IntegerType(), True),
    StructField("Performance_Score", DoubleType(), False),
    StructField("Course_Completion_Rate", StringType(), False),
    StructField("Processing_Date", DateType(), True)
])
fact_student_performance_name = "fact_student_performance"
DeltaTable.createIfNotExists(spark)\
        .location(f"{fabric_gold_path}/{fact_student_performance_name}")\
        .tableName(fact_student_performance_name)\
        .addColumns(fact_student_performance_schema)\
        .execute()

In [None]:
# load data into table
df_selected_fact_student_performance = (df.select( 
    "Student_ID",
    "Course_ID",
    "Enrollment_Date",
    "Completion_Date",
    "Status",
    "Final_Grade",
    "Attendance_Rate",
    "Time_Spent_on_Course_hrs",
    "Assignments_Completed",
    "Quizzes_Completed",
    "Forum_Posts",
    "Messages_Sent",
    "Quiz_Average_Score",
    "Assignment_Scores",
    "Assignment_Average_Score",
    "Project_Score",
    "Extra_Credit",
    "Overall_Performance",
    "Feedback_Score",
    "Completion_Time_Days",
    "Performance_Score",
    "Course_Completion_Rate",
    "Processing_Date"
))
path = f"{fabric_gold_path}/{fact_student_performance_name}"
if DeltaTable.isDeltaTable(spark,path):
    print("Table exists")
    delta_tbl = DeltaTable.forPath(spark,path)
    (
        delta_tbl.alias("target").merge(
        df_selected_fact_student_performance.alias("source"),
        "target.Student_ID = source.Student_ID"
        ).whenMatchedUpdateAll()
        .whenNotMatchedInsertAll().execute()
    )
    # get metric
    get_metrics(delta_tbl)

else:
    raise AnalysisException