In [0]:
df_bio = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("/Volumes/workspace/default/clean_dataset/district_max_biometric/")

In [0]:
df_bio.printSchema()


root
 |-- district: string (nullable = true)
 |-- date: date (nullable = true)
 |-- bio_age_5_17: integer (nullable = true)
 |-- bio_age_17_: integer (nullable = true)
 |-- pincode: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- state: string (nullable = true)



In [0]:
from pyspark.sql import functions as F

df_bio_zone = (
    df_bio
    # 1. normalize state
    .withColumn(
        "state_norm",
        F.initcap(
            F.trim(
                F.regexp_replace(F.col("state"), r"\s+", " ")
            )
        )
    )

    # 2. fix special UT names
    .withColumn(
        "state_norm",
        F.when(
            F.col("state_norm").like("Dadra And Nagar%"),
            "Dadra And Nagar Haveli And Daman And Diu"
        )
        .when(
            F.col("state_norm").like("Andaman And Nicobar%"),
            "Andaman And Nicobar Islands"
        )
        .otherwise(F.col("state_norm"))
    )

    # 3. add zone column
    .withColumn(
        "zone",
        F.when(F.col("state_norm").isin(
            "Jammu And Kashmir", "Himachal Pradesh", "Punjab",
            "Haryana", "Delhi", "Uttarakhand", "Uttar Pradesh",
            "Chandigarh", "Ladakh"
        ), "North")

        .when(F.col("state_norm").isin(
            "Rajasthan", "Gujarat", "Goa", "Maharashtra",
            "Dadra And Nagar Haveli And Daman And Diu"
        ), "West")

        .when(F.col("state_norm").isin(
            "Tamil Nadu", "Kerala", "Karnataka", "Telangana",
            "Andhra Pradesh", "Puducherry", "Lakshadweep",
            "Andaman And Nicobar Islands"
        ), "South")

        .when(F.col("state_norm").isin(
            "West Bengal", "Odisha", "Bihar", "Jharkhand",
            "Assam", "Tripura", "Meghalaya", "Manipur",
            "Nagaland", "Mizoram", "Arunachal Pradesh",
            "Sikkim"
        ), "East")

        .when(F.col("state_norm").isin(
            "Madhya Pradesh", "Chhattisgarh"
        ), "Central")

        .otherwise("Unknown")
    )

    # 4. drop helper column
    .drop("state_norm")
)


In [0]:
df_bio_zone.printSchema()
df_bio_zone.groupBy("zone").count().show()


root
 |-- district: string (nullable = true)
 |-- date: date (nullable = true)
 |-- bio_age_5_17: integer (nullable = true)
 |-- bio_age_17_: integer (nullable = true)
 |-- pincode: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zone: string (nullable = false)

+-------+------+
|   zone| count|
+-------+------+
|   West|314374|
|   East|409039|
|  North|300757|
|  South|650584|
|Central| 97575|
+-------+------+



In [0]:
df_zone_agg = (
    df_bio_zone
    .groupBy("zone")
    .agg(
        F.sum("bio_age_5_17").alias("age_5_17"),
        F.sum("bio_age_17_").alias("age_17_plus")
    )
)


In [0]:
df_zone_agg.show()
(
    df_zone_agg
    .coalesce(1)                      # single CSV file
    .write
    .mode("overwrite")
    .option("header", "true")         # include column names
    .csv("/Volumes/workspace/default/clean_dataset/max_zone_bio")
)



+-------+--------+-----------+
|   zone|age_5_17|age_17_plus|
+-------+--------+-----------+
|   West| 6995465|    9293198|
|   East| 6296924|    7248128|
|  North| 9110621|    6976943|
|  South| 7178142|    7007036|
|Central| 3989239|    4375431|
+-------+--------+-----------+



Enroll

In [0]:
df_enorol = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("/Volumes/workspace/default/clean_dataset/district_max_enrollement/")


In [0]:
df_enorol.select("state").distinct().show(1000)

+--------------------+
|               state|
+--------------------+
|              Kerala|
|              Orissa|
|                NULL|
|      Andhra Pradesh|
|              Sikkim|
|                 Goa|
|    Himachal Pradesh|
|             Haryana|
|The Dadra And Nag...|
|         Lakshadweep|
|           Rajasthan|
|          Chandigarh|
|              Odisha|
|               Delhi|
|          Puducherry|
|         Maharashtra|
|         West Bengal|
|              Ladakh|
|           Telangana|
|               Bihar|
|      Madhya Pradesh|
|         Uttarakhand|
|   Arunachal Pradesh|
|               Assam|
|             Gujarat|
|              Punjab|
|           Karnataka|
|             Manipur|
|       Uttar Pradesh|
|            Nagaland|
|          Tamil Nadu|
|Andaman And Nicob...|
|        Chhattisgarh|
|             Mizoram|
|           Jharkhand|
|   Jammu And Kashmir|
|             Tripura|
|           Meghalaya|
|         Pondicherry|
+--------------------+



In [0]:
from pyspark.sql import functions as F

df_clean = (
    df_enorol

    # 1. Handle NULL states
    .withColumn(
        "state",
        F.when(F.col("state").isNull(), "Unknown")
         .otherwise(F.col("state"))
    )

    # 2. Normalize formatting
    .withColumn(
        "state_norm",
        F.initcap(
            F.trim(
                F.regexp_replace(F.col("state"), r"\s+", " ")
            )
        )
    )

    # 3. Fix known name variants (CRITICAL)
    .withColumn(
        "state_norm",
        F.when(F.col("state_norm").isin("Orissa"), "Odisha")
         .when(F.col("state_norm").isin("Pondicherry"), "Puducherry")
         .when(F.col("state_norm").like("The Dadra And Nagar%"),
               "Dadra And Nagar Haveli And Daman And Diu")
         .when(F.col("state_norm").like("Dadra And Nagar%"),
               "Dadra And Nagar Haveli And Daman And Diu")
         .when(F.col("state_norm").like("Andaman And Nicobar%"),
               "Andaman And Nicobar Islands")
         .otherwise(F.col("state_norm"))
    )
)


In [0]:
from pyspark.sql import functions as F

df_clean.filter(F.col("state_norm") == "Unknown") \
        .select("state", "district", "pincode") \
        .count()


48

In [0]:
df_clean_valid = df_clean.filter(F.col("state_norm") != "Unknown")


In [0]:
df_with_zone = df_clean_valid.withColumn(
    "zone",
    F.when(F.col("state_norm").isin(
        "Jammu And Kashmir", "Himachal Pradesh", "Punjab",
        "Haryana", "Delhi", "Uttarakhand", "Uttar Pradesh",
        "Chandigarh", "Ladakh"
    ), "North")

    .when(F.col("state_norm").isin(
        "Rajasthan", "Gujarat", "Goa", "Maharashtra",
        "Dadra And Nagar Haveli And Daman And Diu"
    ), "West")

    .when(F.col("state_norm").isin(
        "Tamil Nadu", "Kerala", "Karnataka", "Telangana",
        "Andhra Pradesh", "Puducherry", "Lakshadweep",
        "Andaman And Nicobar Islands"
    ), "South")

    .when(F.col("state_norm").isin(
        "West Bengal", "Odisha", "Bihar", "Jharkhand",
        "Assam", "Tripura", "Meghalaya", "Manipur",
        "Nagaland", "Mizoram", "Arunachal Pradesh",
        "Sikkim"
    ), "East")

    .when(F.col("state_norm").isin(
        "Madhya Pradesh", "Chhattisgarh"
    ), "Central")

    .otherwise("Unknown")
)


In [0]:
df_with_zone.filter(F.col("zone") == "Unknown") \
            .select("state", "state_norm") \
            .distinct() \
            .show(truncate=False)


+-----+----------+
|state|state_norm|
+-----+----------+
+-----+----------+



In [0]:
df_final = df_with_zone.drop("state_norm")


In [0]:
df_zone_agg = (
    df_final
    .groupBy("zone")
    .agg(
        F.sum("age_5_17").alias("age_5_17"),
        F.sum("age_18_greater").alias("age_18_greater"),
         F.sum("age_0_5").alias("age_0_5"),
    )
)


In [0]:
df_zone_agg.show()
(
    df_zone_agg
    .coalesce(1)                      # single CSV file
    .write
    .mode("overwrite")
    .option("header", "true")         # include column names
    .csv("/Volumes/workspace/default/clean_dataset/max_zone_enroll"))

+-------+--------+--------------+-------+
|   zone|age_5_17|age_18_greater|age_0_5|
+-------+--------+--------------+-------+
|   West|  261729|         29852| 689082|
|   East|  638496|         82406| 896972|
|  North|  530206|         26002| 813348|
|  South|  125018|         16531| 609453|
|Central|  133235|         11437| 441945|
+-------+--------+--------------+-------+



Demo

In [0]:
df_demo = spark.read.csv(
    "/Volumes/workspace/default/clean_dataset/district_max_demographic/",
    header=True,
    inferSchema=True
)


In [0]:
df_demo.display()

date,pincode,demo_age_5_17,demo_age_17_,district,state,latitude,longitude,district_key
2025-03-01,273213,49,529,Gorakhpur,Uttar Pradesh,26.499422,83.420327,uttar_pradesh_gorakhpur
2025-03-01,517132,22,375,Chittoor,Andhra Pradesh,13.0503,79.1707,andhra_pradesh_chittoor
2025-03-01,360006,65,765,Rajkot,Gujarat,22.3569496,70.7377804,gujarat_rajkot
2025-03-01,532484,24,314,Srikakulam,Andhra Pradesh,18.3523785,83.8586956,andhra_pradesh_srikakulam
2025-03-01,313801,45,785,Udaipur,Rajasthan,24.25,73.7,rajasthan_udaipur
2025-03-01,332028,28,285,Sikar,Rajasthan,27.5726158,74.8682569,rajasthan_sikar
2025-03-01,572201,88,332,Tumakuru,Karnataka,12.9829466,76.8628964,karnataka_tumakuru
2025-03-01,273211,61,836,Gorakhpur,Uttar Pradesh,26.443438,83.471631,uttar_pradesh_gorakhpur
2025-03-01,518313,83,986,Kurnool,Andhra Pradesh,0,0,andhra_pradesh_kurnool
2025-03-01,721148,13,281,Medinipur West,West Bengal,24.0497,87.5756,west_bengal_medinipur_west


In [0]:
df_demo.select("state").distinct().show(40)

+--------------------+
|               state|
+--------------------+
|              Kerala|
|              Orissa|
|                NULL|
|      Andhra Pradesh|
|              Sikkim|
|                 Goa|
|         Pondicherry|
|    Himachal Pradesh|
|             Haryana|
|The Dadra And Nag...|
|         Lakshadweep|
|           Rajasthan|
|          Chandigarh|
|              Odisha|
|               Delhi|
|          Puducherry|
|         Maharashtra|
|         West Bengal|
|              Ladakh|
|           Telangana|
|               Bihar|
|      Madhya Pradesh|
|         Uttarakhand|
|   Arunachal Pradesh|
|               Assam|
|             Gujarat|
|              Punjab|
|           Karnataka|
|             Manipur|
|       Uttar Pradesh|
|            Nagaland|
|          Tamil Nadu|
|Andaman And Nicob...|
|        Chhattisgarh|
|             Mizoram|
|           Jharkhand|
|   Jammu And Kashmir|
|             Tripura|
|           Meghalaya|
+--------------------+



In [0]:
from pyspark.sql import functions as F

df_clean_demo = (
    df_demo
    # 1. Fill NULL with placeholder
    .withColumn(
        "state",
        F.when(F.col("state").isNull(), "Unknown")
         .otherwise(F.col("state"))
    )
    # 2. Normalize formatting
    .withColumn(
        "state_norm",
        F.initcap(F.trim(F.regexp_replace(F.col("state"), r"\s+", " ")))
    )
    # 3. Fix known name variants
    .withColumn(
        "state_norm",
        F.when(F.col("state_norm").isin("Orissa"), "Odisha")
         .when(F.col("state_norm").isin("Pondicherry"), "Puducherry")
         .when(F.col("state_norm").like("The Dadra And Nagar%"), "Dadra And Nagar Haveli And Daman And Diu")
         .when(F.col("state_norm").like("Dadra And Nagar%"), "Dadra And Nagar Haveli And Daman And Diu")
         .when(F.col("state_norm").like("Andaman And Nicobar%"), "Andaman And Nicobar Islands")
         .otherwise(F.col("state_norm"))
    )
)


In [0]:
df_clean_demo.select("state", "state_norm").distinct().orderBy("state_norm").show(truncate=False)


+--------------------------------------------+----------------------------------------+
|state                                       |state_norm                              |
+--------------------------------------------+----------------------------------------+
|Andaman And Nicobar Islands                 |Andaman And Nicobar Islands             |
|Andhra Pradesh                              |Andhra Pradesh                          |
|Arunachal Pradesh                           |Arunachal Pradesh                       |
|Assam                                       |Assam                                   |
|Bihar                                       |Bihar                                   |
|Chandigarh                                  |Chandigarh                              |
|Chhattisgarh                                |Chhattisgarh                            |
|The Dadra And Nagar Haveli And Daman And Diu|Dadra And Nagar Haveli And Daman And Diu|
|Delhi                          

In [0]:
df_demo_with_zone = df_clean_demo.withColumn(
    "zone",
    F.when(F.col("state_norm").isin(
        "Jammu And Kashmir", "Himachal Pradesh", "Punjab",
        "Haryana", "Delhi", "Uttarakhand", "Uttar Pradesh",
        "Chandigarh", "Ladakh"
    ), "North")
    .when(F.col("state_norm").isin(
        "Rajasthan", "Gujarat", "Goa", "Maharashtra",
        "Dadra And Nagar Haveli And Daman And Diu"
    ), "West")
    .when(F.col("state_norm").isin(
        "Tamil Nadu", "Kerala", "Karnataka", "Telangana",
        "Andhra Pradesh", "Puducherry", "Lakshadweep",
        "Andaman And Nicobar Islands"
    ), "South")
    .when(F.col("state_norm").isin(
        "West Bengal", "Odisha", "Bihar", "Jharkhand",
        "Assam", "Tripura", "Meghalaya", "Manipur",
        "Nagaland", "Mizoram", "Arunachal Pradesh",
        "Sikkim"
    ), "East")
    .when(F.col("state_norm").isin(
        "Madhya Pradesh", "Chhattisgarh"
    ), "Central")
    .otherwise("Unknown")
)


In [0]:
df_clean_valid.filter(F.col("state_norm") == "Unknown").count()


In [0]:
df_final = df_demo_with_zone.filter(F.col("zone") != "Unknown")


In [0]:
df_final.groupBy("zone").count().show()
df_final.filter(F.col("zone") == "Unknown").show()


+-------+------+
|   zone| count|
+-------+------+
|   West|274289|
|   East|384703|
|  North|257710|
|  South|576689|
|Central| 87034|
+-------+------+

+----+-------+-------------+------------+--------+-----+--------+---------+------------+----------+----+
|date|pincode|demo_age_5_17|demo_age_17_|district|state|latitude|longitude|district_key|state_norm|zone|
+----+-------+-------------+------------+--------+-----+--------+---------+------------+----------+----+
+----+-------+-------------+------------+--------+-----+--------+---------+------------+----------+----+



In [0]:
df_final.display()

date,pincode,demo_age_5_17,demo_age_17_,district,state,latitude,longitude,district_key,state_norm,zone
2025-03-01,273213,49,529,Gorakhpur,Uttar Pradesh,26.499422,83.420327,uttar_pradesh_gorakhpur,Uttar Pradesh,North
2025-03-01,517132,22,375,Chittoor,Andhra Pradesh,13.0503,79.1707,andhra_pradesh_chittoor,Andhra Pradesh,South
2025-03-01,360006,65,765,Rajkot,Gujarat,22.3569496,70.7377804,gujarat_rajkot,Gujarat,West
2025-03-01,532484,24,314,Srikakulam,Andhra Pradesh,18.3523785,83.8586956,andhra_pradesh_srikakulam,Andhra Pradesh,South
2025-03-01,313801,45,785,Udaipur,Rajasthan,24.25,73.7,rajasthan_udaipur,Rajasthan,West
2025-03-01,332028,28,285,Sikar,Rajasthan,27.5726158,74.8682569,rajasthan_sikar,Rajasthan,West
2025-03-01,572201,88,332,Tumakuru,Karnataka,12.9829466,76.8628964,karnataka_tumakuru,Karnataka,South
2025-03-01,273211,61,836,Gorakhpur,Uttar Pradesh,26.443438,83.471631,uttar_pradesh_gorakhpur,Uttar Pradesh,North
2025-03-01,518313,83,986,Kurnool,Andhra Pradesh,0,0,andhra_pradesh_kurnool,Andhra Pradesh,South
2025-03-01,721148,13,281,Medinipur West,West Bengal,24.0497,87.5756,west_bengal_medinipur_west,West Bengal,East


In [0]:
df_zone_agg = (
    df_final
    .groupBy("zone")
    .agg(
        F.sum("demo_age_17_").alias("demo_age_17_"),
        F.sum("demo_age_5_17").alias("demo_age_5_17")
    )
)

In [0]:
df_zone_agg.show()
(
    df_zone_agg
    .coalesce(1)                      # single CSV file
    .write
    .mode("overwrite")
    .option("header", "true")         # include column names
    .csv("/Volumes/workspace/default/clean_dataset/max_zone_demo"))

+-------+------------+-------------+
|   zone|demo_age_17_|demo_age_5_17|
+-------+------------+-------------+
|   West|     6722654|       552601|
|   East|     8875062|       775319|
|  North|     8742898|       963146|
|  South|     5506335|       897425|
|Central|     3117661|       407455|
+-------+------------+-------------+

