# **Silver Transformation**
Transform our raw data from the Bronze Zone (default schema) into clean, refined Dimension tables in the Silver Zone (silver `schema`)


In [0]:
spark.sql("SELECT current_catalog()").show()


+------------------+
| current_catalog()|
+------------------+
|databricks_student|
+------------------+



In [0]:
from pyspark.sql.functions import col, upper, trim, when, lit, floor

spark.sql("CREATE SCHEMA IF NOT EXISTS silver;")


DataFrame[]

**_Creating silver.Dim_Student_**

In [0]:
df_bronze_info = spark.table("default.student_info")

df_silver_dim_student = df_bronze_info.select(
    col("id_student"),
    
    upper(trim(when(col("gender").isNull(), "UNKNOWN")
               .otherwise(col("gender"))))
               .alias("gender"),
    
    upper(trim(when(col("region").isNull(), "UNKNOWN")
               .otherwise(col("region"))))
               .alias("region"),
    
    upper(trim(when(col("highest_education").isNull(), "UNKNOWN")
               .otherwise(col("highest_education"))))
               .alias("highest_education"),
    
    upper(trim(when(col("imd_band").isNull(), "UNKNOWN")
               .otherwise(col("imd_band"))))
               .alias("imd_band"),
               
    upper(trim(when(col("age_band").isNull(), "UNKNOWN")
               .otherwise(col("age_band"))))
               .alias("age_band"),
               
    upper(trim(when(col("disability").isNull(), "UNKNOWN")
               .otherwise(col("disability"))))
               .alias("disability")
)

# 3. Drop duplicates
df_silver_dim_student = df_silver_dim_student.dropDuplicates(["id_student"])

# 4. write
df_silver_dim_student.write.format("delta").mode("overwrite").saveAsTable("silver.Dim_Student")

# 5. verify
print("Table 'silver.Dim_Student' created.")
display(df_silver_dim_student.limit(10))

Table 'silver.Dim_Student' created.


id_student,gender,region,highest_education,imd_band,age_band,disability
11391,M,EAST ANGLIAN REGION,HE QUALIFICATION,90-100%,55<=,N
28400,F,SCOTLAND,HE QUALIFICATION,20-30%,35-55,N
30268,F,NORTH WESTERN REGION,A LEVEL OR EQUIVALENT,30-40%,35-55,Y
31604,F,SOUTH EAST REGION,A LEVEL OR EQUIVALENT,50-60%,35-55,N
32885,F,WEST MIDLANDS REGION,LOWER THAN A LEVEL,50-60%,0-35,N
38053,M,WALES,A LEVEL OR EQUIVALENT,80-90%,35-55,N
45462,M,SCOTLAND,HE QUALIFICATION,30-40%,0-35,N
45642,F,NORTH WESTERN REGION,A LEVEL OR EQUIVALENT,90-100%,0-35,N
52130,F,EAST ANGLIAN REGION,A LEVEL OR EQUIVALENT,70-80%,0-35,N
53025,M,NORTH REGION,POST GRADUATE QUALIFICATION,UNKNOWN,55<=,N


**Creating silver.Dim_Module**


In [0]:
from pyspark.sql.functions import col, trim, upper, concat, lit

# 1.Import
df_bronze_courses = spark.table("default.courses")

# 2.clean and transform
df_silver_dim_module = df_bronze_courses.select(
    trim(upper(col("code_module"))).alias("code_module"),
    trim(upper(col("code_presentation"))).alias("code_presentation"),
    col("module_presentation_length").cast("int")
).withColumn(
    # New combined key
    "module_key", concat(col("code_module"), lit("_"), col("code_presentation"))
)

# 3. Drop duplicates 
df_silver_dim_module = df_silver_dim_module.dropDuplicates(["module_key"])

# 4. Write
df_silver_dim_module.write.format("delta").mode("overwrite").saveAsTable("silver.Dim_Module")

# 5. Verify
print("Table 'silver.Dim_Module'")
display(df_silver_dim_module.limit(10))

Table 'silver.Dim_Module' (Corrigée avec module_key) créée


code_module,code_presentation,module_presentation_length,module_key
AAA,2013J,268,AAA_2013J
AAA,2014J,269,AAA_2014J
BBB,2013J,268,BBB_2013J
BBB,2014J,262,BBB_2014J
BBB,2013B,240,BBB_2013B
BBB,2014B,234,BBB_2014B
CCC,2014J,269,CCC_2014J
CCC,2014B,241,CCC_2014B
DDD,2013J,261,DDD_2013J
DDD,2014J,262,DDD_2014J


**Creating silver.Dim_Assessment**

In [0]:


print("Creating silver.Dim_Assessment (with module_key fix)...")
df_bronze_assessments = spark.table("default.assessments")

df_silver_dim_assessment = df_bronze_assessments.select(
    col("id_assessment"),
    upper(trim(when(col("assessment_type").isNull(), "UNKNOWN").otherwise(col("assessment_type")))).alias("assessment_type"),
    col("date").cast("int"),
    col("weight").cast("float"),

    trim(upper(col("code_module"))).alias("code_module"),
    trim(upper(col("code_presentation"))).alias("code_presentation")

).withColumn(
    "module_key", concat(col("code_module"), lit("_"), col("code_presentation"))

).dropDuplicates(["id_assessment"])

# Écrire avec l'option overwriteSchema
df_silver_dim_assessment.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("databricks_student.silver.Dim_Assessment")

print("Table 'silver.Dim_Assessment' created.")
display(df_silver_dim_assessment.limit(5))


Creating silver.Dim_Assessment (with module_key fix)...
Table 'silver.Dim_Assessment' (Corrected) created.


id_assessment,assessment_type,date,weight,code_module,code_presentation,module_key
1752,TMA,19,10.0,AAA,2013J,AAA_2013J
1753,TMA,54,20.0,AAA,2013J,AAA_2013J
1754,TMA,117,20.0,AAA,2013J,AAA_2013J
1755,TMA,166,20.0,AAA,2013J,AAA_2013J
1756,TMA,215,30.0,AAA,2013J,AAA_2013J


**Creating silver.Dim_VLE_Activity**

In [0]:
# 1. import
df_bronze_vle = spark.table("default.vle")

# 2. Clean
df_silver_dim_vle = df_bronze_vle.select(
    col("id_site"),
    
    upper(trim(when(col("activity_type").isNull(), "UNKNOWN")
               .otherwise(col("activity_type"))))
               .alias("activity_type"),
    
    col("week_from").cast("int"),
    col("week_to").cast("int")
)

# 3. Drop duplicates
df_silver_dim_vle = df_silver_dim_vle.dropDuplicates(["id_site"])

# 4. write
df_silver_dim_vle.write.format("delta").mode("overwrite").saveAsTable("silver.Dim_VLE_Activity")

# 5. Verify
print("Table 'silver.Dim_VLE_Activity' Created.")
display(df_silver_dim_vle.limit(10))

Table 'silver.Dim_VLE_Activity' Created.


id_site,activity_type,week_from,week_to
546943,RESOURCE,,
546712,OUCONTENT,,
546998,RESOURCE,,
546888,URL,,
547035,RESOURCE,,
546614,HOMEPAGE,,
546897,URL,,
546678,OUCONTENT,,
546933,RESOURCE,,
546708,OUCONTENT,,


**Creating silver.Dim_Time**

In [0]:

# 1. Define the range of relative days

start_day = -100
end_day = 400

# 2. Generate the sequence of days (integers)
df_days = spark.sql(f"SELECT explode(sequence({start_day}, {end_day}, 1)) AS relative_day")

# 3. Create attributes
df_dim_time = df_days.select(
    col("relative_day").alias("day_key"),  # Primary key
    
    # Calculate the relative week (7 days per week)
    floor(col("relative_day") / 7).alias("relative_week"),
    
    # Create a simple category
    when(col("relative_day") < 0, "Before start")
        .when(col("relative_day") == 0, "Start day")
        .otherwise("During course")
        .alias("module_phase")
)

# 4. Write to Delta table
df_dim_time.write.format("delta").mode("overwrite").saveAsTable("silver.Dim_Time")

# 5. Verify
print("Table 'silver.Dim_Time'.")
display(df_dim_time.filter("day_key BETWEEN -2 AND 8"))


Table 'silver.Dim_Time' (Relative) successfully created.


day_key,relative_week,module_phase
-2,-1,Before start
-1,-1,Before start
0,0,Start day
1,0,During course
2,0,During course
3,0,During course
4,0,During course
5,0,During course
6,0,During course
7,1,During course


**Clean student_assessment**

In [0]:
# 1. Read Bronze table
df_bronze_sa = spark.table("default.student_assessment")

# 2. Clean and Transform
df_silver_sa_cleaned = df_bronze_sa.select(
    col("id_assessment"),
    col("id_student"),
    col("date_submitted").cast("int"), 
    col("is_banked").cast("int"),
    col("score").cast("int")  
).filter(
    col("id_assessment").isNotNull() & col("id_student").isNotNull()
) 

# 3. Write to Silver zone
df_silver_sa_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver.student_assessment_cleaned")

# 4. Verify
print("Table 'silver.student_assessment_cleaned' created.")
display(df_silver_sa_cleaned.limit(5))

Table 'silver.student_assessment_cleaned' created.


id_assessment,id_student,date_submitted,is_banked,score
1752,11391,18,0,78
1752,28400,22,0,70
1752,31604,17,0,72
1752,32885,26,0,69
1752,38053,19,0,79


**Clean student_vle**


In [0]:

# 1. import
df_bronze_sv = spark.table("default.student_vle")

# 2. clean
df_silver_sv_cleaned = df_bronze_sv.select(
    trim(upper(col("code_module"))).alias("code_module"), 
    trim(upper(col("code_presentation"))).alias("code_presentation"), # 
    col("id_student"),
    col("id_site"),
    col("date").cast("int"), 
    when(col("sum_click").isNull(), 0)
        .otherwise(col("sum_click"))
        .cast("int")
        .alias("sum_click")
).withColumn(
    "module_key", concat(col("code_module"), lit("_"), col("code_presentation"))
).filter(
    col("id_student").isNotNull() & col("id_site").isNotNull()
) 

# 3. Write to Silver zone
df_silver_sv_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver.student_vle_cleaned")

# 4. Verify
print("Table 'silver.student_vle_cleaned' with module kkey created")
display(df_silver_sv_cleaned.limit(5))

Table 'silver.student_vle_cleaned' with module kkey created


code_module,code_presentation,id_student,id_site,date,sum_click,module_key
FFF,2013J,605571,716327,105,53,FFF_2013J
FFF,2013J,605571,716643,105,1,FFF_2013J
FFF,2013J,605571,716253,105,4,FFF_2013J
FFF,2013J,605571,716654,105,1,FFF_2013J
FFF,2013J,605571,716433,105,1,FFF_2013J


**Clean student_registration**

In [0]:

# 1. import
df_bronze_reg = spark.table("default.student_registration")
df_bronze_info = spark.table("default.student_info") 

# 2. clean
df_silver_reg_cleaned = df_bronze_reg.join(
    df_bronze_info,
    ["code_module", "code_presentation", "id_student"],
    "left"
).select(
    trim(upper(df_bronze_reg.code_module)).alias("code_module"), 
    trim(upper(df_bronze_reg.code_presentation)).alias("code_presentation"), 
    df_bronze_reg.id_student,
    col("date_registration").cast("int"),
    col("date_unregistration").cast("int"), 
    upper(trim(when(col("final_result").isNull(), "UNKNOWN")
               .otherwise(col("final_result"))))
               .alias("final_result")
).withColumn(
    
    "module_key", concat(col("code_module"), lit("_"), col("code_presentation"))
)

# 3. write to silver layer
df_silver_reg_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver.student_registration_cleaned")

# 4. Verify
print("Table 'silver.student_registration_cleaned' (avec module_key) créée.")
display(df_silver_reg_cleaned.limit(5))

Table 'silver.student_registration_cleaned' (avec module_key) créée.


code_module,code_presentation,id_student,date_registration,date_unregistration,final_result,module_key
AAA,2013J,11391,-159,,PASS,AAA_2013J
AAA,2013J,28400,-53,,PASS,AAA_2013J
AAA,2013J,30268,-92,12.0,WITHDRAWN,AAA_2013J
AAA,2013J,31604,-52,,PASS,AAA_2013J
AAA,2013J,32885,-176,,PASS,AAA_2013J
