In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

spark= SparkSession \
       .builder \
       .appName("KCC_Test_Notebook") \
       .config('spark.ui.port', '4040') \
       .getOrCreate()

# file path for all raw data parquet files
file_path = "/app/storage/raw_data/kcc_data_*/data.parquet"

# read all parquet files as a single dataframe
df = spark.read.option("header", True).option("inferSchema", True).parquet(file_path)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 15:59:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
df.printSchema()

root
 |-- state_name: string (nullable = true)
 |-- district_name: string (nullable = true)
 |-- block_name: string (nullable = true)
 |-- season: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- category: string (nullable = true)
 |-- crop: string (nullable = true)
 |-- query_type: string (nullable = true)
 |-- query_text: string (nullable = true)
 |-- kcc_ans: string (nullable = true)
 |-- created_on: timestamp (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- _dlt_load_id: string (nullable = true)
 |-- _dlt_id: string (nullable = true)



# 1. Data Exploration

In [3]:
df.show(n=5)

+-------------+--------------+-----------------+------+------------+----------+--------+-------------------+--------------------+--------------------+--------------------+----+-----+-----------------+--------------+
|   state_name| district_name|       block_name|season|      sector|  category|    crop|         query_type|          query_text|             kcc_ans|          created_on|year|month|     _dlt_load_id|       _dlt_id|
+-------------+--------------+-----------------+------+------------+----------+--------+-------------------+--------------------+--------------------+--------------------+----+-----+-----------------+--------------+
|       PUNJAB|     GURDASPUR|FATEHGARH CHURIAN|    NA| AGRICULTURE|    Others|  Others|            Weather|Farmer asked quer...|\t\t\t\n ਜਿਲ੍ਹੇ ਦ...|2021-03-11 11:17:...|2021|    3|1747834131.999308|j7aHEcr8qDusAw|
|    TELANGANA|WARANGAL RURAL|        SHAYAMPET|    NA|HORTICULTURE|Vegetables|Chillies|Nutrient Management|CHILLI NUTRIENT M...|ఒక ఎకరా

In [4]:
df.count()

                                                                                

9455891

*The dataset contains more than 9 million records*

In [5]:
state_count = df.groupBy("state_name").agg(F.count("state_name").alias("row_count"))\
                        .orderBy(F.col("row_count").desc())

state_count.show(n=100)



+--------------------+---------+
|          state_name|row_count|
+--------------------+---------+
|       UTTAR PRADESH|  1665263|
|           RAJASTHAN|  1142087|
|         MAHARASHTRA|   941201|
|      MADHYA PRADESH|   823181|
|             HARYANA|   608851|
|              PUNJAB|   540432|
|             GUJARAT|   510669|
|           TAMILNADU|   444472|
|           KARNATAKA|   427137|
|               BIHAR|   400259|
|         WEST BENGAL|   330873|
|              ODISHA|   307744|
|      ANDHRA PRADESH|   245313|
|           TELANGANA|   211017|
|    HIMACHAL PRADESH|   189299|
|   JAMMU AND KASHMIR|   147530|
|        CHHATTISGARH|   119418|
|         UTTARAKHAND|    97310|
|            JHARKAND|    71791|
|              KERALA|    69213|
|               ASSAM|    59249|
|               DELHI|    47386|
|                  NA|    29645|
|             TRIPURA|     8598|
|          PUDUCHERRY|     3516|
|           MEGHALAYA|     3378|
|             MANIPUR|     2765|
|         

                                                                                

*Incorrect values - "0", "NA"*

In [6]:
district_count = df.groupBy("district_name").agg(F.count("district_name").alias("row_count"))\
                        .orderBy(F.col("row_count").desc())

district_count.show(n=100)



+--------------+---------+
| district_name|row_count|
+--------------+---------+
|          9999|    88064|
|   HANUMANGARH|    85093|
|      JUNAGADH|    80621|
|        HISSAR|    76499|
|        JAIPUR|    75317|
|        NAGAUR|    74335|
|        BARMER|    73553|
|    AHMADNAGAR|    73281|
|    AURANGABAD|    67757|
|         CHURU|    65801|
|       BIKANER|    65599|
|       BHIWANI|    65589|
|      BHATINDA|    64754|
|       JODHPUR|    64163|
|       BULDANA|    62391|
|         SIRSA|    60918|
|       KURNOOL|    60372|
|         ALWAR|    60078|
|        NANDED|    60035|
|       SOLAPUR|    58359|
|         NASIK|    57758|
|  BANAS KANTHA|    54656|
|    GANGANAGAR|    52106|
|       SITAPUR|    48239|
|          JIND|    47975|
|      YEVATMAL|    46402|
|          BEED|    46197|
|     FATEHABAD|    46053|
|   VILLUPPURAM|    46039|
|      BAREILLY|    45897|
|         JALNA|    45471|
|       SANGRUR|    44451|
|      JAMNAGAR|    43848|
|         SAGAR|    43788|
|

                                                                                

*Incorrect values - "9999", "NA"*

In [7]:
block_count = df.groupBy("block_name").agg(F.count("block_name").alias("row_count"))\
                        .orderBy(F.col("row_count").desc())

block_count.show(n=100)



+----------------+---------+
|      block_name|row_count|
+----------------+---------+
|            0   |  1866467|
|              NA|    29645|
|         RAJGARH|    29572|
|     HANUMANGARH|    27036|
|           NOHAR|    22354|
|       JAISALMER|    21191|
|          BHADRA|    18873|
|           DELHI|    18208|
|          BARMER|    17495|
|   TALWANDI SABO|    15397|
|        BATHINDA|    14275|
|         KOLAYAT|    13655|
|         HANSI-I|    13352|
|         ADAMPUR|    13183|
|       FATEHABAD|    12977|
|         HISAR-I|    12922|
|           OSIAN|    12814|
|        MALEGAON|    12494|
|    MAHENDRAGARH|    12354|
|    SARDARSHAHAR|    12098|
|          NAGAUR|    11901|
|          THARAD|    11758|
|      BHOPALGARH|    11634|
|       KALYANPUR|    11573|
|         BIKANER|    11309|
|          LOHARU|    11279|
|           SIRSA|    11252|
|       TARANAGAR|    11122|
|         DABWALI|    11101|
|          TOSHAM|    11036|
|       NAWABGANJ|    10928|
|  PIRAWA (SUN

                                                                                

*More than 1.8 Million records with block name "0" and "NA" this indicates that this field is only present in the recent records.*

In [8]:
season_count = df.groupBy("season").agg(F.count("season").alias("row_count"))\
                        .orderBy(F.col("row_count").desc())

season_count.show(n=100)



+------+---------+
|season|row_count|
+------+---------+
|    NA|  5483285|
|KHARIF|  1601838|
| JAYAD|  1345796|
|  RABI|  1024972|
+------+---------+



                                                                                

*5.4 Million records with season as NA*

In [9]:
sector_count = df.groupBy("sector").agg(F.count("sector").alias("row_count"))\
                          .orderBy(F.col("row_count").desc())

sector_count.show(n=100)



+----------------+---------+
|          sector|row_count|
+----------------+---------+
|     AGRICULTURE|  6785736|
|    HORTICULTURE|  2228498|
|            9999|   283606|
|ANIMAL HUSBANDRY|   120058|
|       FISHERIES|    19541|
|             256|    11048|
|             825|     7404|
+----------------+---------+



                                                                                

*Numeric values should be invalid Values for sector*

In [20]:
category_count = df.groupBy("category").agg(F.count("category").alias("row_count"))\
                        .orderBy(F.col("row_count").desc())

category_count.show(n=100)



+--------------------+---------+
|            category|row_count|
+--------------------+---------+
|              Others|  3330001|
|             Cereals|  1337681|
|                   0|  1329728|
|          Vegetables|  1097450|
|            Oilseeds|   426311|
|              Fruits|   425933|
|              Pulses|   377622|
|         Fiber Crops|   304916|
|             Millets|   165861|
|Condiments and Sp...|   162459|
|Sugar and Starch ...|   144145|
|        Fodder Crops|    84935|
|              Animal|    82015|
|    Plantation Crops|    53950|
|             Flowers|    42832|
|Medicinal and Aro...|    37121|
|              Inland|    15285|
|                 418|    11048|
|  Drug and Narcotics|     8722|
|               Avian|     6358|
|          Beekeeping|     4705|
|        Green Manure|     4068|
|                 417|     1691|
|              Marine|     1051|
|                  -1|        3|
+--------------------+---------+



                                                                                

In [10]:
crop_count = df.groupBy("crop").agg(F.count("crop").alias("row_count"))\
                          .orderBy(F.col("row_count").desc())

crop_count.show(n=100)



+--------------------+---------+
|                crop|row_count|
+--------------------+---------+
|              Others|  3399458|
|               Wheat|   666864|
|        Paddy (Dhan)|   622351|
|      Cotton (Kapas)|   347134|
|                9999|   283606|
|                1137|   178162|
|               Onion|   154904|
|            Chillies|   146719|
|Groundnut (pea nu...|   135287|
|      Soybean (bhat)|   117797|
|              Tomato|   116943|
|Sugarcane (Noble ...|   106460|
|Bengal Gram (Gram...|   100000|
|             Brinjal|    98465|
|              Potato|    96029|
|             Mustard|    93828|
|Green Gram (Moong...|    86863|
|       Maize (Makka)|    86234|
|               Mango|    76117|
|                1280|    68410|
|Black Gram (urd b...|    67423|
|                1279|    66462|
|               Apple|    65480|
| Bovine(Cow,Buffalo)|    64418|
|Bhindi(Okra/Ladys...|    60677|
|              Banana|    57241|
|    Sunnhemp (Patua)|    57050|
|         

                                                                                

*Here again numeric values should not be accepted*

In [11]:
query_type_count = df.groupBy("query_type").agg(F.count("query_type").alias("row_count"))\
                          .orderBy(F.col("row_count").desc())

query_type_count.show(n=100)



+--------------------+---------+
|          query_type|row_count|
+--------------------+---------+
|             Weather|  2540566|
|\tPlant Protection\t|  1525944|
|  Government Schemes|   823303|
|                   0|   417556|
|Fertilizer Use an...|   360265|
|  Cultural Practices|   308966|
| Nutrient Management|   285326|
|                9999|   283606|
|                   3|   260031|
|  Market Information|   239784|
|                  76|   195290|
|                  75|   173688|
|                   2|   168044|
|                   5|   167165|
|           Varieties|   156727|
|                  29|   140246|
|     Weed Management|   115403|
|               Seeds|    82112|
|\tWater Management\t|    77532|
|Seeds and Plantin...|    72414|
|Sowing Time and W...|    58034|
|                  22|    57606|
|                  78|    57346|
|\tField Preparati...|    55587|
|                   1|    54140|
|Agriculture Mecha...|    49645|
|                 187|    43074|
|Bio-Pesti

                                                                                

*Query type is a mix of text and numeric values - supposedly the numeric values were IDs that were used initially. However since we do not have it's mapping we will replace all numeric with a placeholder*

In [12]:
df.select("query_text").show(10, truncate=False)

+------------------------------------------------------------------------------+
|query_text                                                                    |
+------------------------------------------------------------------------------+
|Farmer asked query on Weather                                                 |
|CHILLI NUTRIENT MANAGEMENT\n                                                  |
|FARMER ASKING ABOUT GOVERNMENT SCHEME ?\n                                     |
|Information about Weather forecast of Block Kalan in District Shahjahanpur…?\n|
|ASKED ABOUT CHILLI TEMP.?                                                     |
|PM Kisan status query                                                         |
|Information regarding for control of Wilt, root rot and collar rot in pea?\n  |
|Information about control of black Mango blossoms problem.......?             |
|Farmer asked query on Weather                                                 |
|ASKED ABOUT ATTACK OF Wilt 

In [14]:
kcc_ans_list = df.select("kcc_ans").limit(60).rdd.flatMap(lambda x: x).collect()

kcc_ans_list

['\t\t\t\n ਜਿਲ੍ਹੇ ਦਾ ਮੌਸਮ-  ਅੱਜ ਦੇ ਦਿਨ ਬੱਦਲ ਵਾਈ ਰਹੇਗੀ ਪਰ ਮੀਂਹ ਦੀ ਕੋਈ ਸੰਭਾਵਨਾ ਨਹੀਂ ਹੈ Iਪਰ ਕੱਲ   ਨੂੰ ਹਲਕੀ ਤੋਂ ਦਰਮਿਆਨੀ ਮੀਂਹ ਦੀ ਸੰਭਾਵਨਾ ਹੈ ਵੱਧ ਤੋਂ ਵੱਧ ਤਾਪਮਾਨ 30 ਡਿਗਰੀ ਸੈਲਸੀਅਸ ਅਤੇ ਘੱਟ ਤੋਂ ਘੱਟ 16 ਡਿਗਰੀ ਸੈਲਸੀਅਸ ਰਹੇਗਾ ਅਤੇ ਹਵਾ ਦੀ ਗਤੀ 20-25 ਕਿਲੋਮੀਟਰ ਪ੍ਰਤੀ ਘੰਟਾ ਹੈ I\t\t\t\n\t\t\t',
 'ఒక ఎకరాకు అగ్రోమిన్ మాక్స్ ఒక KG + ఒక ఎకరాకు 13-0-45 ఎకరాకు 1 కిలో 200 లీటర్ల నీటికి చొప్పున కలిపి ఒక ఎకరాకు పిచికారి చేయాలి\n',
 'शासकीय योजनांबद्दल अधिक माहितीसाठी तालुका कृषी अधिकारी याना भेट द्या .\n',
 'श्रीमान जी आपके यहां 12,13 मार्च के दिन मध्यम बारिश होने की संभावना है तथा 11,14,15,16 मार्च के दिन बारिश की संभावना नहीं है आंशिक रूप से बादल छाए रहेंगे तापमान 21 से 34 सेंटीग्रेड तथा हवा की गति 07 से 11 किलोमीटर प्रति घंटा चलने की संभावना है तथा कृषि कार्य मौसम के अनुसार करें |\n',
 'उष्\u200dण आणि दमट हवामानात मिरची पिकाची वाढ चांगली होऊन उत्\u200dपादन चांगले मिळते. मिरची पिकाची लागवड पावसाळा, उन्\u200dहाळा आणि हिवाळा या तीनही हंगामात करता येते. पावसाळात जास्\u200dत पाऊस व ढगाळ वातावरण असल्\u200dयास फूलांची गळ

*`kcc_ans` may sometimes contain PII such as phone numbers, emails, or even bank/account numbers and names. The challenge is to detect and mask these patterns reliably.*

# 2. Data Cleaning

In [22]:
# 1. Define invalid values for categorical columns
invalid_values_dict = {
    "state_name": ["NA", "0"],
    "district_name": ["NA", "9999"],
    "block_name": ["NA", "0   "],
    "category" : ["0"],
    "season": ["NA"]
}

# 2. Define columns where numeric values are invalid
regex_invalid_cols = ["sector", "crop", "query_type", "category"]

# 3. Start with the original df
df_cleaned = df

# 4. Replace invalids for categorical columns
for col_name, invalids in invalid_values_dict.items():
    df_cleaned = df_cleaned.withColumn(
        col_name,
        F.when(
            F.col(col_name).isin(invalids) | F.col(col_name).isNull(),
            "Not Available"
        ).otherwise(F.col(col_name))
    )

# 5. Replace invalids for regex columns (numeric or NA/0/null)
for col_name in regex_invalid_cols:
    df_cleaned = df_cleaned.withColumn(
        col_name,
        F.when(
            F.col(col_name).rlike("^[0-9]+$") | 
            F.col(col_name).isin("NA", "0") | 
            F.col(col_name).isNull(),
            "Not Available"
        ).otherwise(F.col(col_name))
    )

# 6. Show results for verification
for col in list(invalid_values_dict.keys()) + regex_invalid_cols:
    df_cleaned.groupBy(col).count().orderBy(F.desc("count")).show(10)


                                                                                

+--------------+-------+
|    state_name|  count|
+--------------+-------+
| UTTAR PRADESH|1665263|
|     RAJASTHAN|1142087|
|   MAHARASHTRA| 941201|
|MADHYA PRADESH| 823181|
|       HARYANA| 608851|
|        PUNJAB| 540432|
|       GUJARAT| 510669|
|     TAMILNADU| 444472|
|     KARNATAKA| 427137|
|         BIHAR| 400259|
+--------------+-------+
only showing top 10 rows



                                                                                

+-------------+------+
|district_name| count|
+-------------+------+
|Not Available|117709|
|  HANUMANGARH| 85093|
|     JUNAGADH| 80621|
|       HISSAR| 76499|
|       JAIPUR| 75317|
|       NAGAUR| 74335|
|       BARMER| 73553|
|   AHMADNAGAR| 73281|
|   AURANGABAD| 67757|
|        CHURU| 65801|
+-------------+------+
only showing top 10 rows



                                                                                

+-------------+-------+
|   block_name|  count|
+-------------+-------+
|Not Available|1896112|
|      RAJGARH|  29572|
|  HANUMANGARH|  27036|
|        NOHAR|  22354|
|    JAISALMER|  21191|
|       BHADRA|  18873|
|        DELHI|  18208|
|       BARMER|  17495|
|TALWANDI SABO|  15397|
|     BATHINDA|  14275|
+-------------+-------+
only showing top 10 rows



                                                                                

+--------------------+-------+
|            category|  count|
+--------------------+-------+
|              Others|3330001|
|       Not Available|1342467|
|             Cereals|1337681|
|          Vegetables|1097450|
|            Oilseeds| 426311|
|              Fruits| 425933|
|              Pulses| 377622|
|         Fiber Crops| 304916|
|             Millets| 165861|
|Condiments and Sp...| 162459|
+--------------------+-------+
only showing top 10 rows



                                                                                

+-------------+-------+
|       season|  count|
+-------------+-------+
|Not Available|5483285|
|       KHARIF|1601838|
|        JAYAD|1345796|
|         RABI|1024972|
+-------------+-------+



                                                                                

+----------------+-------+
|          sector|  count|
+----------------+-------+
|     AGRICULTURE|6785736|
|    HORTICULTURE|2228498|
|   Not Available| 302058|
|ANIMAL HUSBANDRY| 120058|
|       FISHERIES|  19541|
+----------------+-------+



                                                                                

+--------------------+-------+
|                crop|  count|
+--------------------+-------+
|              Others|3399458|
|       Not Available|1168202|
|               Wheat| 666864|
|        Paddy (Dhan)| 622351|
|      Cotton (Kapas)| 347134|
|               Onion| 154904|
|            Chillies| 146719|
|Groundnut (pea nu...| 135287|
|      Soybean (bhat)| 117797|
|              Tomato| 116943|
+--------------------+-------+
only showing top 10 rows



                                                                                

+--------------------+-------+
|          query_type|  count|
+--------------------+-------+
|             Weather|2540566|
|       Not Available|2364951|
|\tPlant Protection\t|1525944|
|  Government Schemes| 823303|
|Fertilizer Use an...| 360265|
|  Cultural Practices| 308966|
| Nutrient Management| 285326|
|  Market Information| 239784|
|           Varieties| 156727|
|     Weed Management| 115403|
+--------------------+-------+
only showing top 10 rows





+--------------------+-------+
|            category|  count|
+--------------------+-------+
|              Others|3330001|
|       Not Available|1342467|
|             Cereals|1337681|
|          Vegetables|1097450|
|            Oilseeds| 426311|
|              Fruits| 425933|
|              Pulses| 377622|
|         Fiber Crops| 304916|
|             Millets| 165861|
|Condiments and Sp...| 162459|
+--------------------+-------+
only showing top 10 rows



                                                                                

In [23]:
# query_type has some unwanted characters like \t, we will remove that
df_cleaned = df_cleaned.withColumn(
    "query_type",
    F.regexp_replace("query_type", r"\t", "")
)

## PII Masking

In [24]:
# Mask +91 followed by 10 digits, or any standalone 10-digit number
phone_pattern = r"(\+91[\-\s]?\d{10})|(\b\d{10}\b)"
# standard email regex pattern
email_pattern = r"[a-zA-Z0-9.\-_]+@[a-zA-Z0-9\-_]+\.[a-zA-Z.]+"
# assuming account numbers are 9 to 18 consecutive digits
account_pattern = r"\b\d{9,18}\b"

df_cleaned = df_cleaned.withColumn("kcc_ans", F.regexp_replace(F.col("kcc_ans"), phone_pattern, "[PHONE]"))\
                      .withColumn("kcc_ans", F.regexp_replace(F.col("kcc_ans"), email_pattern, "[EMAIL]"))\
                      .withColumn("kcc_ans", F.regexp_replace(F.col("kcc_ans"), account_pattern, "[ACCOUNT]"))

df_cleaned.show()

+-----------------+--------------+-----------------+-------------+------------+----------+---------------+--------------------+--------------------+--------------------+--------------------+----+-----+-----------------+--------------+
|       state_name| district_name|       block_name|       season|      sector|  category|           crop|          query_type|          query_text|             kcc_ans|          created_on|year|month|     _dlt_load_id|       _dlt_id|
+-----------------+--------------+-----------------+-------------+------------+----------+---------------+--------------------+--------------------+--------------------+--------------------+----+-----+-----------------+--------------+
|           PUNJAB|     GURDASPUR|FATEHGARH CHURIAN|Not Available| AGRICULTURE|    Others|         Others|             Weather|Farmer asked quer...|\t\t\t\n ਜਿਲ੍ਹੇ ਦ...|2021-03-11 11:17:...|2021|    3|1747834131.999308|j7aHEcr8qDusAw|
|        TELANGANA|WARANGAL RURAL|        SHAYAMPET|Not Avai

# Data Transformation

fct_queries
- query_id (surrogate key for uniquely identifying each query)
- created_on
- state_id (surrogate key for state_name)
- category_id (surrogate key for category)
- sector_id (surrogate key for sector)
- crop
- query_type
- query_text
- kcc_ans

dim_demography
- state_id
- state_name
- district_names (an array of districts within that state)
- block_names (an array of block_names within that state)

dim_category
- category_id
- category

dim_sector
- sector_id
- sector

### Surrogate Keys
For surrogate key generation we can use row_number and partition by the unique identifier fields
- For query_id we can use the existing _dlt_id: this is a row key generated dlt on load, (https://dlthub.com/docs/general-usage/destination-tables#naming-convention-tables-and-columns:~:text=Each%20row%20in%20all%20(root%20and%20nested)%20data%20tables%20created%20by%20dlt%20contains%20a%20unique%20column%20named%20_dlt_id%20(row%20key).)
- state_id: partition by state_name 
- category_id: partition by category
- sector_id: partition by sector

### DDLs

```
CREATE TABLE IF NOT EXISTS fct_queries (
    query_id INTEGER PRIMARY KEY,
    created_on TIMESTAMP,
    state_id INTEGER,
    category_id INTEGER,
    sector_id INTEGER,
    crop TEXT,
    query_type TEXT,
    query_text TEXT,
    kcc_ans TEXT
)

CREATE TABLE IF NOT EXISTS dim_demography (
    state_id INTEGER PRIMARY KEY,
    state_name TEXT,
    district_names ARRAY,
    block_names ARRAY
)

CREATE TABLE IF NOT EXISTS dim_category (
    category_id INTEGER PRIMARY KEY,
    category TEXT
)

CREATE TABLE IF NOT EXISTS dim_sector (
    sector_id INTEGER PRIMARY KEY,
    category TEXT
)

```

### Plan of Execution
Now I think there should be separate job for the first ever processing - this is when there's no schema extract and a separate job for the monthly ELT

The **First Processing Job should**
- Run DDL commands
- Create surrogate keys for dimensions
- Populate dimensional and fact tables

**The subsequent processing jobs should**
- append new kcc queries to fct_queries
- generate surrogate keys within the raw queries table
- check if they are any new entries for the dimensional tables
- we possible cannot implement SCD 2 as that requires a unique identifier from the source data itself
    - for instance: state_name: ABC on 23/05/2025
    - on 29/07/2048: state_name: ABC changed to ABCD, this would generate a new surrogate key - the code cannot identify whether this is a new dim or updated