In [0]:
dbutils.fs.ls('/mnt/anuaqstorageacc/Air_Quality_Target/')

In [0]:
# Listing all the files and sorting them based on latest timestamp
 
files = dbutils.fs.ls('/mnt/anuaqstorageacc/Air_Quality_Target/')
latest_file = sorted(files, key=lambda x: x.modificationTime, reverse=True)[0]
 
print(f"Latest File: {latest_file.path}")

In [0]:
df = spark.read.json(latest_file.path)
display(df)

In [0]:
states_all = ["Andhra Pradesh","Arunachal Pradesh ","Assam","Bihar","Chhattisgarh","Delhi","Goa","Gujarat","Haryana","Himachal Pradesh","Jharkhand","Karnataka","Kerala","Madhya Pradesh","Maharashtra","Manipur","Meghalaya","Mizoram","Nagaland","Odisha","Punjab","Rajasthan","Sikkim","Tamil Nadu","Telangana","Tripura","Uttar Pradesh","Uttarakhand","West Bengal"]

In [0]:
# State value should be from all valid states of India
 
alarm_states = df.filter(df.state.isin(states_all) == False)
 
if alarm_states.count() > 0:
  print(f"{alarm_states.count()} rows have incorrect state values")
else:
  print("All States are Correct")

In [0]:
# Values of latitude and longitude should be within valid maximum and minimum i.e. -90 to 90 and -180 to 180
 
df_check = df.filter((df.latitude.cast("Double") < -90) | (df.latitude.cast("Double") > 90) | (df.longitude.cast("Double") < -180) | (df.longitude.cast("Double") > 180))
 
if df_check.count() > 0:
  print(f"{df_check.count} rows have incorrect latitude or longitude values")
else:
  print("All latitude and longitude values are correct")

In [0]:
from pyspark.sql import functions as F
# OZONE value should be less than 100
# CO value should be less than 7
# SO2 value should be less than 40
 
# Cleaning Pollutant_Avg
df_pollutant = df.withColumn("pollutant_avg", F.when(F.col('pollutant_avg') == 'NA', 0).otherwise(F.col('pollutant_avg')))
 
ozone_range = df_pollutant.filter((df_pollutant.pollutant_id == 'OZONE') & (df_pollutant.pollutant_avg.cast("Double") > 100))
co_range = df_pollutant.filter((df_pollutant.pollutant_id == 'CO') & (df_pollutant.pollutant_avg.cast("Double") > 7))
so2_range = df_pollutant.filter((df_pollutant.pollutant_id == 'SO2') & (df_pollutant.pollutant_avg.cast("Double") > 40))
 

In [0]:
# Writing the data into the Bronze layer
df.write.format("delta").mode("overwrite").saveAsTable("faampn6.anuradha.Air_Quality_Bronze")

In [0]:
df = spark.read.table("faampn6.anuradha.Air_Quality_Bronze")
display(df)

In [0]:
# To keep the Capital Cities of each state we need to map the key-value pair
 
capitals = {
  "Andhra Pradesh": "Amaravati",
  "Arunachal Pradesh": "Itanagar",
  "Assam": "Dispur",
  "Bihar": "Patna",
  "Chhattisgarh": "Raipur",
  "Delhi": "New Delhi",
  "Goa": "Panaji",
  "Gujarat": "Gandhinagar",
  "Haryana": "Chandigarh",
  "Himachal Pradesh": "Shimla",
  "Jharkhand": "Ranchi",
  "Karnataka": "Bengaluru",
  "Kerala": "Thiruvananthapuram",
  "Madhya Pradesh": "Bhopal",
  "Maharashtra": "Mumbai",
  "Manipur": "Imphal",
  "Meghalaya": "Shillong",
  "Mizoram": "Aizawl",
  "Nagaland": "Kohima",
  "Odisha": "Bhubaneswar",
  "Punjab": "Chandigarh",
  "Rajasthan": "Jaipur",
  "Sikkim": "Gangtok",
  "Tamil Nadu": "Chennai",
  "Telangana": "Hyderabad",
  "Tripura": "Agartala",
  "Uttar Pradesh": "Lucknow",
  "Uttarakhand": "Dehradun",
  "West Bengal": "Kolkata"
}

In [0]:
df_capital = spark.createDataFrame(capitals.items(), ["state", "city"])
display(df_capital)

In [0]:
df_2 = df.join(df_capital, (df.state == df_capital.state) & (df.city == df_capital.city), "inner")
display(df_2)

In [0]:
df_2 = df_2.drop(df_capital.state,df_capital.city)
display(df_2)

In [0]:
df_2 = df_2.dropDuplicates()
display(df_2)

In [0]:
df_2.write.mode("overwrite").format("delta").saveAsTable("faampn6.anuradha.air_quality_silver")
display(df_2)

In [0]:
df_silver = spark.read.table("faampn6.anuradha.air_quality_silver")
display(df_silver)

In [0]:
from pyspark.sql.functions import to_date
 
df_silver = df_silver.withColumn("date", to_date("Data_Fetch_time"))
display(df_silver)

In [0]:
from pyspark.sql import functions as F
 
df_clean = df_silver.withColumn("pollutant_avg", F.when(F.col('pollutant_avg') == 'NA', 0).otherwise(F.col('pollutant_avg')))\
    .withColumn("pollutant_min", F.when(F.col('pollutant_min') == 'NA', 0).otherwise(F.col('pollutant_min')))\
    .withColumn("pollutant_max", F.when(F.col('pollutant_max') == 'NA', 0).otherwise(F.col('pollutant_max')))
display(df_clean)

In [0]:
from pyspark.sql import functions as F
df_summary = df_clean.groupBy("state", "city", "pollutant_id", "date")\
    .agg(
        F.min("pollutant_min").alias("Min_Pollutant"),
        F.max("pollutant_max").alias("Max_Pollutant"),
        F.avg("pollutant_avg").alias("Avg_Pollutant")
        )
 
display(df_summary)

In [0]:
df_summary.write.format("delta").mode("overwrite").saveAsTable("faampn6.anuradha.air_quality_gold")

In [0]:
df_gold = spark.read.table("faampn6.anuradha.air_quality_gold")
display(df_gold)

In [0]:
import plotly.express as px
 
df_pd = df_gold.select("Max_Pollutant", "Avg_Pollutant", "pollutant_id", "state", "city").toPandas()
fig = px.scatter(df_pd, x="Avg_Pollutant", y="Max_Pollutant", color="pollutant_id", hover_data=["state", "city"])
fig.show()