###Bronze Layer

In [0]:
df = spark.table("Learning_Outcome_Gaps")
print(df.columns)



['student_id', 'name', 'age', 'gender', 'caste_category', 'parent_education', 'family_income_monthly_thousands', 'distance_to_school_km', 'attendance_rate', 'school_type', 'state', 'math_score', 'reading_score', 'language_score', 'grade', 'dropout_risk']


In [0]:
# ⚙️ (Add)
from pyspark.sql.functions import input_file_name, current_timestamp



In [0]:
df.show(5)
df.printSchema()


+----------+--------------------+---+------+--------------+----------------+-------------------------------+---------------------+------------------+----------------+--------------+-----------------+------------------+-----------------+-----+------------+
|student_id|                name|age|gender|caste_category|parent_education|family_income_monthly_thousands|distance_to_school_km|   attendance_rate|     school_type|         state|       math_score|     reading_score|   language_score|grade|dropout_risk|
+----------+--------------------+---+------+--------------+----------------+-------------------------------+---------------------+------------------+----------------+--------------+-----------------+------------------+-----------------+-----+------------+
|         1|        शिवाली चोपरा|  7|Female|           OBC|         Primary|                           10.5|   0.5445508899810357| 44.76729003199387|Rural Government|        Odisha|47.53031372607806| 78.35436574191397|64.09309470713

###Silver Layer

In [0]:
df = df.fillna({
    "Teacher_Qualification": "Unknown",
    "Infrastructure_Score": 0.0,
    "Learning_Score": 0.0,
    "Attendance_Percentage": 0.0,
    "Dropout_Rate": 0.0
})


####Feature Engineering

In [0]:
from pyspark.sql.functions import avg, round, col

agg_df = df.groupBy("state", "school_type") \
    .agg(
        avg(col("language_score")).alias("Avg_Learning_Score"),
        avg(col("attendance_rate")).alias("Avg_Attendance"),
        avg(col("dropout_risk")).alias("Avg_Dropout_Rate")
    ) \
    .withColumn("Learning_Gap", round(col("Avg_Attendance") - col("Avg_Learning_Score"), 2))


In [0]:
display(agg_df)

agg_df.write.mode("overwrite").saveAsTable("Processed_Learning_Outcome_Gaps")


state,school_type,Avg_Learning_Score,Avg_Attendance,Avg_Dropout_Rate,Learning_Gap
Odisha,Rural Government,54.75134509800682,28.6809203778593,0.1466666666666666,-26.07
Madhya Pradesh,Rural Government,55.01314773511085,28.56697889342991,0.146584345747859,-26.45
Bihar,Rural Government,55.16891229173888,28.55898105916301,0.1469519781040059,-26.61
Uttar Pradesh,Rural Government,55.06770536217072,28.446766046103345,0.1528134017313241,-26.62
Rajasthan,Rural Government,55.16528225159257,28.758745691481405,0.1485260770975056,-26.41


In [0]:
from pyspark.sql.types import FloatType

df = df.withColumn("Infrastructure_Score", col("Infrastructure_Score").cast(FloatType())) \
       .withColumn("Learning_Score", col("Learning_Score").cast(FloatType())) \
       .withColumn("Attendance_Percentage", col("Attendance_Percentage").cast(FloatType())) \
       .withColumn("Dropout_Rate", col("Dropout_Rate").cast(FloatType()))


In [0]:
clean_df = df.select(
    col("state").alias("State"),
    col("school_type").alias("School_Type"),
    col("language_score").alias("Learning_Score"),
    col("attendance_rate").alias("Attendance_Percentage"),
    col("dropout_risk").alias("Dropout_Rate")
)
clean_df.createOrReplaceTempView("Learning_Outcome_Gaps_Clean")


In [0]:
from pyspark.sql.functions import round, when

df = df.withColumn("Learning_Gap_Index", 
                   round((col("Infrastructure_Score") + col("Attendance_Percentage")) / 2 - col("Learning_Score"), 2)) \
       .withColumn("Risk_Level", 
                   when(col("Dropout_Rate") > 20, "High")
                   .when(col("Dropout_Rate") > 10, "Medium")
                   .otherwise("Low"))


In [0]:
from pyspark.sql.functions import avg

agg_df = df.groupBy("State", "District") \
           .agg(
               avg("Learning_Score").alias("Avg_Learning_Score"),
               avg("Infrastructure_Score").alias("Avg_Infrastructure"),
               avg("Attendance_Percentage").alias("Avg_Attendance"),
               avg("Dropout_Rate").alias("Avg_Dropout")
           ) \
           .withColumn("Learning_Gap", round(col("Avg_Infrastructure") - col("Avg_Learning_Score"), 2))


In [0]:
agg_df = df.groupBy("State", "District") \
           .agg(
               avg("Learning_Score").alias("Avg_Learning_Score"),
               avg("Infrastructure_Score").alias("Avg_Infrastructure"),
               avg("Attendance_Percentage").alias("Avg_Attendance"),
               avg("Dropout_Rate").alias("Avg_Dropout")
           ) \
           .withColumn("Learning_Gap", round(col("Avg_Infrastructure") - col("Avg_Learning_Score"), 2))


In [0]:
df = spark.table("Learning_Outcome_Gaps_Clean")


In [0]:
from pyspark.sql.functions import round, expr

df = df.withColumn("Overall_Academic_Score", 
                   round((col("math_score") + col("reading_score") + col("language_score")) / 3, 2)) \
       .withColumn("Learning_Gap_Index", 
                   round(col("attendance_rate") - col("Overall_Academic_Score"), 2))


In [0]:
from pyspark.sql.functions import when

df = df.withColumn("Risk_Level", 
                   when(col("dropout_risk") > 0.7, "High")
                   .when(col("dropout_risk") > 0.4, "Medium")
                   .otherwise("Low"))


In [0]:
from pyspark.sql.functions import avg

agg_df = df.groupBy("state", "school_type") \
           .agg(
               avg("attendance_rate").alias("Avg_Attendance"),
               avg("Overall_Academic_Score").alias("Avg_Academic_Score"),
               avg("dropout_risk").alias("Avg_Dropout_Risk")
           ) \
           .withColumn("Learning_Gap", round(col("Avg_Attendance") - col("Avg_Academic_Score"), 2))


In [0]:
agg_df = df.groupBy(col("state"), col("school_type")) \
           .agg(
               avg(col("attendance_rate")).alias("Avg_Attendance"),
               avg(col("language_score")).alias("Avg_Learning_Score"),
               avg(col("dropout_risk")).alias("Avg_Dropout_Rate")
           ) \
           .withColumn("Learning_Gap", round(col("Avg_Attendance") - col("Avg_Learning_Score"), 2))




In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# 1. Cast Score Columns to Double (in case inferSchema missed precision)
# This creates a new DataFrame 'df_clean'
df_clean = df \
    .withColumn("math_score", col("math_score").cast(DoubleType())) \
    .withColumn("reading_score", col("reading_score").cast(DoubleType())) \
    .withColumn("language_score", col("language_score").cast(DoubleType()))


In [0]:
# 2. Handle Missing Values (Impute with the mean for numerical scores)
# NOTE: In a real-world scenario, you would calculate the mean first
# For simplicity here, we will fill null scores with 0
df_clean = df_clean.na.fill(0, subset=['math_score', 'reading_score', 'language_score', 'dropout_risk'])

In [0]:
from pyspark.sql.functions import when, lit, mean, round

# 1. Calculate the Average Score for each student
df_features = df_clean.withColumn(
    "average_score",
    round(
        (col("math_score") + col("reading_score") + col("language_score")) / 3,
        2
    )
)



In [0]:

# 2. Create a categorical feature for overall performance gap (e.g., threshold < 50)
df_features = df_features.withColumn(
    "overall_gap_status",
    when(col("average_score") < 50, lit("High Gap Risk"))
    .when((col("average_score") >= 50) & (col("average_score") < 70), lit("Moderate Gap Risk"))
    .otherwise(lit("Low Gap Risk"))
)


In [0]:
from pyspark.sql.functions import count, avg, sum

# Group by 'state' and 'caste_category' to calculate average performance and gap metrics
df_aggregated = df_features.groupBy("state", "caste_category").agg(
    count("student_id").alias("total_students"),
    round(avg("average_score"), 2).alias("avg_overall_score"),
    round(avg("math_gap_metric"), 2).alias("avg_math_gap"),
    round(sum(when(col("dropout_risk") == 1, 1).otherwise(0)) / count("student_id") * 100, 2).alias("dropout_rate_pct")
).orderBy("state", "avg_overall_score", ascending=False)

###Gold Layer

In [0]:
gold_df = spark.table("gold.learning_outcome_summary")


In [0]:
heatmap_df = spark.sql("""
SELECT
    state,
    school_type,
    ROUND(AVG(Learning_Gap), 2) AS avg_learning_gap
FROM workspace.default.processed_learning_outcome_gaps
GROUP BY state, school_type
ORDER BY avg_learning_gap DESC
""")

display(heatmap_df)

state,school_type,avg_learning_gap
Odisha,Rural Government,-26.07
Rajasthan,Rural Government,-26.41
Madhya Pradesh,Rural Government,-26.45
Bihar,Rural Government,-26.61
Uttar Pradesh,Rural Government,-26.62


Databricks visualization. Run in Databricks to view.

In [0]:
trend_df = spark.sql("""
SELECT
    state,
    school_type,
    ROUND(Avg_Learning_Score, 2) AS avg_learning_score,
    ROUND(Avg_Attendance, 2) AS avg_attendance,
    ROUND(Learning_Gap, 2) AS learning_gap
FROM workspace.default.processed_learning_outcome_gaps
ORDER BY state, school_type
""")

display(trend_df)

state,school_type,avg_learning_score,avg_attendance,learning_gap
Bihar,Rural Government,55.17,28.56,-26.61
Madhya Pradesh,Rural Government,55.01,28.57,-26.45
Odisha,Rural Government,54.75,28.68,-26.07
Rajasthan,Rural Government,55.17,28.76,-26.41
Uttar Pradesh,Rural Government,55.07,28.45,-26.62
