In [0]:
%run ../Includes/School-Setup

In [0]:
files  = dbutils.fs.ls(f"{dataset_school}/enrollments-json-raw")
display(files)

In [0]:
import pyspark.sql.functions as F

(
    spark.readStream.format("cloudFiles") 
    .option("cloudFiles.format", "json") 
    .option("cloudFiles.schemaLocation", f"{checkpoint_path}/enrollments_bronze") 
    .option("cloudFiles.inferColumnTypes", "true")
    .load(f"{dataset_school}/enrollments-json-raw")
    .select(
        "*",
        F.current_timestamp().alias("arrival_time"),
        F.input_file_name().alias("surce_file"))
    .writeStream.format("delta")
    .option("checkpointLocation", f"{checkpoint_path}/enrollments_bronze")
    .outputMode("append")
    .table("enrollments_bronze")
)

In [0]:
%sql
select * from enrollments_bronze

In [0]:
students_lookup_df = (spark.read
                      .format("json")
                      .load(f"{dataset_school}/students-json"))


In [0]:
display(students_lookup_df)

## Silver Layer

In [0]:
enrollments_enriched_df = (spark.readStream
                           .table("enrollments_bronze")
                            .where("quantity > 0")
                            .withColumn("formattedt_timestamp", F.from_unixtime("enroll_timestamp", "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
                           .join(students_lookup_df, "student_id")
                           .select("enroll_id","quantity", "student_id","email","formattedt_timestamp","courses")
                           )

In [0]:
(enrollments_enriched_df.writeStream
 .format("delta")
 .option("checkpointLocation", f"{checkpoint_path}/enrollments_silver")
 .outputMode("append")
 .table("enrollments_silver"))

In [0]:
%sql
select * from enrollments_silver order by enroll_id;

In [0]:
load_new_data()

## Gold Layer

In [0]:
enrollments_agg_df =(spark.readStream
    .table("enrollments_silver")
    .withColumn("day",F.date_trunc("DD","formattedt_timestamp"))
    .groupBy("student_id","email","day")
    .agg(F.sum("quantity").alias("courses_counts"))
    .select("student_id","email","day","courses_counts")
)

In [0]:
(enrollments_agg_df.writeStream
    .format("delta")
    .option("checkpointLocation",f"{checkpoint_path}/daily_student_courses")
    .outputMode("complete")
    .trigger(availableNow=True)
    .table("daily_student_courses"))

In [0]:
%sql
select * from daily_student_courses