In [0]:
# Imports
from pyspark.sql.functions import *
from pyspark.sql import Window

In [0]:
silver_df = spark.read.table("workspace.default.aqi_silver")

In [0]:
# Defining the different highs and low for every pollutant
# Order -> (aqi_low, aqi_high, bp_low, bp_high)
PM10_BREAKPOINTS = [
    (0, 50, 0, 50),
    (51, 100, 51, 100),
    (101, 200, 101, 250),
    (201, 300, 251, 350),
    (301, 400, 351, 430)]

PM25_BREAKPOINTS = [
    (0, 50, 0, 30),
    (51, 100, 31, 60),
    (101, 200, 61, 90),
    (201, 300, 91, 120),
    (301, 400, 121, 250)]

NO2_BREAKPOINTS = [
    (0, 50, 0, 40),
    (51, 100, 41, 80),
    (101, 200, 81, 180),
    (201, 300, 181, 280),
    (301, 400, 281, 400)]

OZONE_BREAKPOINTS = [
    (0, 50, 0, 50),
    (51, 100, 51, 100),
    (101, 200, 101, 168),
    (201, 300, 169, 208),
    (301, 400, 209, 748)]

CO_BREAKPOINTS = [
    (0, 50, 0, 1.0),
    (51, 100, 1.1, 2.0),
    (101, 200, 2.1, 10),
    (201, 300, 10.1, 17),
    (301, 400, 17.1, 34)]

SO2_BREAKPOINTS = [
    (0, 50, 0, 40),
    (51, 100, 41, 80),
    (101, 200, 81, 380),
    (201, 300, 381, 800),
    (301, 400, 801, 1600)]

NH3_BREAKPOINTS = [
    (0, 50, 0, 200),
    (51, 100, 201, 400),
    (101, 200, 401, 800),
    (201, 300, 801, 1200),
    (301, 400, 1201, 1800)]

# Function to get the sub index value for every pollutant based on their breakpoint values that we defined above
def create_sub_index(pollutant_col_name: str, breakpoints: list):

    # Check for nulls
    expr = when(col(pollutant_col_name).isNull(), lit(None))

    for aqi_low, aqi_high, bp_low, bp_high in breakpoints:
        # Formula to create pollutant sub-indexes
        formula = (
            round(((lit(aqi_high) - lit(aqi_low)) / (lit(bp_high) - lit(bp_low))) * 
            (col(pollutant_col_name) - lit(bp_low)) + lit(aqi_low), 2)
        )
        expr = expr.when(col(pollutant_col_name) <= bp_high, formula)
    
    # If expr has no value, it will automatically be set to 500 because we intentionally did not handle it in the function
    return expr.otherwise(lit(500))

In [0]:
# Running the function
gold_df = (
    silver_df
    .withColumn("PM25_aqi", create_sub_index("`PM2.5`", PM25_BREAKPOINTS))  
    .withColumn("PM10_aqi", create_sub_index("PM10", PM10_BREAKPOINTS))
    .withColumn("SO2_aqi", create_sub_index("SO2", SO2_BREAKPOINTS))
    .withColumn("CO_aqi", create_sub_index("CO", CO_BREAKPOINTS))
    .withColumn("NO2_aqi", create_sub_index("NO2", NO2_BREAKPOINTS))
    .withColumn("OZONE_aqi", create_sub_index("OZONE", OZONE_BREAKPOINTS))
    .withColumn("NH3_aqi", create_sub_index("NH3", NH3_BREAKPOINTS))
)

# Overall AQI will be the max of all of the different pollutant sub-levels
gold_df = (
    gold_df.withColumn("overall_aqi", 
        round(
            greatest(
                col("CO_aqi"), 
                col("NH3_aqi"), 
                col("NO2_aqi"), 
                col("OZONE_aqi"), 
                col("PM10_aqi"), 
                col("PM25_aqi"), 
                col("SO2_aqi"))
        , 2)
    )
)

In [0]:
# Defining the window
windowSpec = Window.orderBy(col("overall_aqi").asc())

# Add a ranking to the stations and assigning category to them.
gold_df = (
    gold_df\
        .where(col("overall_aqi").isNotNull())\
        .withColumn("rank", row_number().over(windowSpec))\
        .withColumn("aqi_category", 
                    when(col("overall_aqi") <= 50, "Good")\
                    .when(col("overall_aqi") <= 100, "Satisfactory")\
                    .when(col("overall_aqi") <= 200, "Moderate")\
                    .when(col("overall_aqi") <= 300, "Poor")\
                    .when(col("overall_aqi") <= 400, "Very Poor")\
                    .otherwise("Severe"))
    )

# Adding the dominating pollutant for every station
gold_df = (
    gold_df.withColumn("dominant_pollutant",
                    when(col("PM25_aqi") == col("overall_aqi"), "PM2.5")\
                    .when(col("PM10_aqi") == col("overall_aqi"), "PM10")\
                    .when(col("SO2_aqi") == col("overall_aqi"), "SO2")\
                    .when(col("CO_aqi") == col("overall_aqi"), "CO")\
                    .when(col("NO2_aqi") == col("overall_aqi"), "NO2")\
                    .when(col("OZONE_aqi") == col("overall_aqi"), "OZONE")\
                    .when(col("NH3_aqi") == col("overall_aqi"), "NH3")\
                    .otherwise("NA")
                    )
)



In [0]:
# Write the gold layer to a table
gold_df.write\
    .format("delta")\
    .mode("overwrite")\
    .saveAsTable("aqi_gold_station")

# Confiriming if the data was written or not
display(spark.table("aqi_gold_station").limit(10))



station,city,state,country,last_update_timestamp,ingestion_timestamp,latitude,longitude,SO2,CO,NO2,OZONE,PM2.5,PM10,NH3,PM25_aqi,PM10_aqi,SO2_aqi,CO_aqi,NO2_aqi,OZONE_aqi,NH3_aqi,overall_aqi,rank,aqi_category,dominant_pollutant
"Powai, Mumbai - MPCB",Mumbai,Maharashtra,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,19.1375,72.915056,,,6.0,,,,2.0,,,,,7.5,,0.5,7.5,1,Good,NO2
"Velachery Res. Area, Chennai - CPCB",Chennai,TamilNadu,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,13.0052189,80.2398125,6.0,,14.0,5.0,,,3.0,,,7.5,,17.5,5.0,0.75,17.5,2,Good,NO2
"Chandkheda, Ahmedabad - IITM",Ahmedabad,Gujarat,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,23.107969,72.574648,,,,35.0,,,,,,,,,35.0,,35.0,3,Good,OZONE
"Khunmoh, Srinagar - JKPCC",Srinagar,Jammu_and_Kashmir,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,34.06313,74.96017,3.0,,21.0,,41.0,47.0,,67.9,47.0,3.75,,26.25,,,67.9,4,Satisfactory,PM2.5
"Khrew, Pampore - JKPCC",Pampore,Jammu_and_Kashmir,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,34.031606,75.009712,2.0,,34.0,,58.0,62.0,,96.62,62.0,2.5,,42.5,,,96.62,5,Satisfactory,PM2.5
"Sikulpuikawn, Aizawl - Mizoram PCB",Aizawl,Mizoram,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,23.7176342,92.7192841,16.0,3.0,,4.0,57.0,55.0,,94.93,55.0,20.0,112.28,,4.0,,112.28,6,Moderate,CO
"Mahape, Navi Mumbai - MPCB",Navi Mumbai,Maharashtra,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,19.1135051,73.008978,20.0,,85.0,10.0,62.0,136.0,1.0,104.41,124.26,25.0,,105.0,10.0,0.25,124.26,7,Moderate,PM10
"PSG College of Arts and Science, Coimbatore - TNPCB",Coimbatore,TamilNadu,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,11.0328,77.0349,4.0,,11.0,5.0,74.0,56.0,1.0,145.38,56.0,5.0,,13.75,5.0,0.25,145.38,8,Moderate,PM2.5
"IGSC Planetarium Complex, Patna - BSPCB",Patna,Bihar,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,25.610369,85.132568,1.0,6.0,1.0,29.0,,,,,,1.25,149.87,1.25,29.0,,149.87,9,Moderate,CO
"Madan Mohan Malaviya University of Technology, Gorakhpur - UPPCB",Gorakhpur,Uttar_Pradesh,India,2026-02-14T12:00:00.000Z,2026-02-14T08:57:27.048Z,26.730136,83.433859,43.0,4.0,12.0,2.0,76.0,88.0,1.0,152.21,88.0,53.51,124.81,15.0,2.0,0.25,152.21,10,Moderate,PM2.5


In [0]:
%sql
CREATE VIEW aqi_gold_city AS
SELECT 
  city, 
  state,
  AVG(overall_aqi) AS avg_aqi,
  MIN(overall_aqi) AS min_aqi,
  MAX(overall_aqi) AS max_aqi
FROM aqi_gold_station
GROUP BY city, state