In [0]:
spark

In [0]:
from pyspark.sql import SparkSession

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType, IntegerType, DateType

** Data Processing with PySpark in Databricks **

In [0]:
station_hour_df  = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://air-quality-india/station_hour.csv")

In [0]:
stations_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://air-quality-india/stations.csv")

In [0]:
station_day_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://air-quality-india/station_day.csv")

In [0]:
city_day_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://air-quality-india/city_day.csv")

In [0]:
city_day_df = city_day_df.withColumnRenamed("PM2.5", "PM2_5")
city_day_cleaned_df = city_day_df.dropna()

In [0]:
city_hour_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://air-quality-india/city_hour.csv")

**Data Cleaning**

In [0]:
station_hour_df = station_hour_df.withColumnRenamed("PM2.5", "PM2_5")
station_hour_cleaned_df = station_hour_df.dropna()

In [0]:
stations_cleaned_df = stations_df.dropna()
station_day_df = station_day_df.withColumnRenamed("PM2.5", "PM2_5")
station_day_cleaned_df = station_day_df.dropna()
city_hour_df = city_hour_df.withColumnRenamed("PM2.5", "PM2_5")
city_hour_cleaned_df = city_hour_df.dropna()



In [0]:
station_hour_cleaned_df.show(5)

+---------+-------------------+-----+------+----+-----+-----+----+---+-----+------+-------+-------+------+-----+----------+
|StationId|           Datetime|PM2_5|  PM10|  NO|  NO2|  NOx| NH3| CO|  SO2|    O3|Benzene|Toluene|Xylene|  AQI|AQI_Bucket|
+---------+-------------------+-----+------+----+-----+-----+----+---+-----+------+-------+-------+------+-----+----------+
|    AP001|2017-11-25 09:00:00|104.0| 148.5|1.93| 23.0|13.75| 9.8|0.1| 15.3|117.62|    0.3|   10.4|  0.23|155.0|  Moderate|
|    AP001|2017-11-25 10:00:00| 94.5| 142.0|1.33|16.25| 9.75|9.65|0.1| 17.0|136.23|   0.28|    7.1|  0.15|159.0|  Moderate|
|    AP001|2017-11-25 11:00:00|82.75| 126.5|1.47|14.83| 9.07| 9.7|0.1| 15.4|149.92|    0.2|   4.55|  0.08|173.0|  Moderate|
|    AP001|2017-11-25 14:00:00| 68.5| 117.0|1.35| 13.6| 8.35| 7.4|0.1| 21.8| 161.7|    0.1|    2.3|   0.0|191.0|  Moderate|
|    AP001|2017-11-25 15:00:00|69.25|112.25|1.52| 11.8| 7.55|9.25|0.1|21.38|161.68|    0.1|   2.35|   0.0|191.0|  Moderate|
+-------

In [0]:
from pyspark.sql.functions import to_timestamp
station_hour_cleaned_df = station_hour_cleaned_df.withColumn("Datetime", to_timestamp("Datetime", "dd-MM-yyyy HH:mm"))
city_hour_cleaned_df = city_hour_cleaned_df.withColumn("Datetime", to_timestamp("Datetime", "dd-MM-yyyy HH:mm"))

In [0]:
station_day_cleaned_df = station_day_cleaned_df.withColumn("Date", to_timestamp("Date", "dd-MM-yyyy"))
city_day_cleaned_df = city_day_cleaned_df.withColumn("Date", to_timestamp("Date", "dd-MM-yyyy"))

**Data Analysis**

In [0]:
from pyspark.sql.functions import col, year, month, dayofweek, avg, count, round


In [0]:
## Monthly Average PM2.5
monthly_pm25 = station_day_cleaned_df.groupBy(year("Date").alias("Year"), month("Date").alias("Month")).agg(avg("PM2_5").alias("Avg_PM2_5")).orderBy("Year", "Month")


In [0]:
monthly_pm25.show(5)

+----+-----+------------------+
|Year|Month|         Avg_PM2_5|
+----+-----+------------------+
|2015|    9|            20.917|
|2015|   10|53.541785714285716|
|2015|   11| 55.44034482758619|
|2015|   12| 48.19642857142857|
|2016|    1| 65.49833333333332|
+----+-----+------------------+
only showing top 5 rows



In [0]:
## Average PM2.5 Levels by City
city_pm25 = city_day_cleaned_df.groupBy("City").agg(avg("PM2_5").alias("Avg_PM2_5"))

In [0]:
city_pm25.show(5)

+---------+------------------+
|     City|         Avg_PM2_5|
+---------+------------------+
|    Patna| 65.83125654450266|
| Gurugram| 49.54100840336136|
|  Kolkata|47.606294416243685|
|Amaravati|41.713219814241484|
| Amritsar| 56.42150943396224|
+---------+------------------+
only showing top 5 rows



In [0]:
## Correlation Matrix Preparation (to be calculated in Power BI)
pollutant_columns = ["PM2_5", "PM10", "NO", "NO2", "NH3", "CO", "SO2", "O3"]
pollutant_data = station_hour_df[pollutant_columns]

In [0]:
pollutant_data.show()

+-----+------+----+-----+-----+----+-----+------+
|PM2_5|  PM10|  NO|  NO2|  NH3|  CO|  SO2|    O3|
+-----+------+----+-----+-----+----+-----+------+
| 60.5|  98.0|2.35| 30.8|  8.5| 0.1|11.85| 126.4|
| 65.5|111.25| 2.7| 24.2| 9.77| 0.1|13.17|117.12|
| 80.0| 132.0| 2.1|25.18|12.02| 0.1|12.08| 98.98|
| 81.5|133.25|1.95|16.25|11.58| 0.1|10.47| 112.2|
|75.25| 116.0|1.43|17.48|12.03| 0.1| 9.12|106.35|
|69.25|108.25| 0.7|18.47| 13.8| 0.1| 9.25|  91.1|
| 67.5| 111.5|1.05|12.15|17.65| 0.1|  9.4| 112.7|
| 68.0| 111.0|1.25|14.12|20.28| 0.1|  8.9|116.12|
| 73.0| 102.0| 0.3| 14.3| 11.5| 0.3| 11.8| 121.5|
| 81.0| 123.0| 0.8|24.85|10.28| 0.1|11.62|  83.8|
| 82.5|122.75|1.25|21.82|10.18| 0.1| 12.7| 87.25|
| 77.5| 112.5| 0.5| 23.5|  8.9| 0.1|11.58| 76.45|
| 77.0| 112.5| 1.3|23.58| 8.75| 0.1| 11.1|101.93|
|80.75| 122.0|1.08| 19.2|  9.3| 0.1|12.58|107.77|
| 83.0| 137.0|2.12|26.03| 7.73| 0.1|12.75| 93.57|
| null|  null|null| null| null|null| null|  null|
|104.0| 148.5|1.93| 23.0|  9.8| 0.1| 15.3|117.62|


In [0]:
pollutant_data_cleaned = pollutant_data.dropna()

In [0]:
pollutant_data_cleaned.show()

+-----+------+----+-----+-----+---+-----+------+
|PM2_5|  PM10|  NO|  NO2|  NH3| CO|  SO2|    O3|
+-----+------+----+-----+-----+---+-----+------+
| 60.5|  98.0|2.35| 30.8|  8.5|0.1|11.85| 126.4|
| 65.5|111.25| 2.7| 24.2| 9.77|0.1|13.17|117.12|
| 80.0| 132.0| 2.1|25.18|12.02|0.1|12.08| 98.98|
| 81.5|133.25|1.95|16.25|11.58|0.1|10.47| 112.2|
|75.25| 116.0|1.43|17.48|12.03|0.1| 9.12|106.35|
|69.25|108.25| 0.7|18.47| 13.8|0.1| 9.25|  91.1|
| 67.5| 111.5|1.05|12.15|17.65|0.1|  9.4| 112.7|
| 68.0| 111.0|1.25|14.12|20.28|0.1|  8.9|116.12|
| 73.0| 102.0| 0.3| 14.3| 11.5|0.3| 11.8| 121.5|
| 81.0| 123.0| 0.8|24.85|10.28|0.1|11.62|  83.8|
| 82.5|122.75|1.25|21.82|10.18|0.1| 12.7| 87.25|
| 77.5| 112.5| 0.5| 23.5|  8.9|0.1|11.58| 76.45|
| 77.0| 112.5| 1.3|23.58| 8.75|0.1| 11.1|101.93|
|80.75| 122.0|1.08| 19.2|  9.3|0.1|12.58|107.77|
| 83.0| 137.0|2.12|26.03| 7.73|0.1|12.75| 93.57|
|104.0| 148.5|1.93| 23.0|  9.8|0.1| 15.3|117.62|
| 94.5| 142.0|1.33|16.25| 9.65|0.1| 17.0|136.23|
|82.75| 126.5|1.47|1

In [0]:
## AQI Distribution
aqi_distribution = station_day_cleaned_df.groupBy("AQI_Bucket").agg(count("*").alias("Count"))


In [0]:
aqi_distribution.show()

+------------+-----+
|  AQI_Bucket|Count|
+------------+-----+
|        Good| 1073|
|      Severe|  100|
|   Very Poor|  399|
|Satisfactory| 3730|
|        Poor|  610|
|    Moderate| 4402|
+------------+-----+



In [0]:
## Daily Average AQI for Each City
daily_avg_aqi = city_day_cleaned_df.groupBy("City", "Date") \
    .agg(round(avg("AQI"), 2).alias("Avg_AQI")) \
    .orderBy("City", "Date")

In [0]:
daily_avg_aqi.show()

+---------+-------------------+-------+
|     City|               Date|Avg_AQI|
+---------+-------------------+-------+
|Amaravati|2017-11-25 00:00:00|  184.0|
|Amaravati|2017-11-26 00:00:00|  197.0|
|Amaravati|2017-11-27 00:00:00|  198.0|
|Amaravati|2017-11-28 00:00:00|  188.0|
|Amaravati|2017-11-29 00:00:00|  173.0|
|Amaravati|2017-11-30 00:00:00|  165.0|
|Amaravati|2017-12-01 00:00:00|  191.0|
|Amaravati|2017-12-02 00:00:00|  191.0|
|Amaravati|2017-12-03 00:00:00|  227.0|
|Amaravati|2017-12-04 00:00:00|  168.0|
|Amaravati|2017-12-05 00:00:00|  198.0|
|Amaravati|2017-12-06 00:00:00|  201.0|
|Amaravati|2017-12-07 00:00:00|  252.0|
|Amaravati|2017-12-08 00:00:00|  310.0|
|Amaravati|2017-12-09 00:00:00|  196.0|
|Amaravati|2017-12-10 00:00:00|  132.0|
|Amaravati|2017-12-11 00:00:00|  147.0|
|Amaravati|2017-12-12 00:00:00|  179.0|
|Amaravati|2017-12-13 00:00:00|  145.0|
|Amaravati|2017-12-14 00:00:00|  115.0|
+---------+-------------------+-------+
only showing top 20 rows



In [0]:
## Weekly AQI Trends
weekly_aqi = city_hour_cleaned_df.groupBy(year("Datetime").alias("Year"), month("Datetime").alias("Month"), dayofweek("Datetime").alias("DayOfWeek")) \
    .agg(avg("AQI").alias("Avg_AQI")) \
    .orderBy("Year", "Month", "DayOfWeek")

In [0]:
weekly_aqi.show()

+----+-----+---------+------------------+
|Year|Month|DayOfWeek|           Avg_AQI|
+----+-----+---------+------------------+
|2015|    1|        1|          350.3125|
|2015|    1|        2| 338.6145833333333|
|2015|    1|        3|         335.46875|
|2015|    1|        4|  334.752688172043|
|2015|    1|        5|355.45098039215685|
|2015|    1|        6|           356.825|
|2015|    1|        7|317.72477064220186|
|2015|    2|        1| 337.7604166666667|
|2015|    2|        2|          313.5625|
|2015|    2|        3|         333.46875|
|2015|    2|        4| 337.9166666666667|
|2015|    2|        5|319.63829787234044|
|2015|    2|        6| 318.8958333333333|
|2015|    2|        7| 334.1979166666667|
|2015|    3|        1|232.39166666666668|
|2015|    3|        2|             228.5|
|2015|    3|        3|           258.725|
|2015|    3|        4|             251.5|
|2015|    3|        5|         296.90625|
|2015|    3|        6| 286.2395833333333|
+----+-----+---------+------------

In [0]:
## Monthly Trends of Major Pollutants (PM2.5, PM10)
monthly_pollutants = station_day_cleaned_df.groupBy(year("Date").alias("Year"), month("Date").alias("Month")) \
    .agg(avg("PM2_5").alias("Avg_PM2_5"), avg("PM10").alias("Avg_PM10")) \
    .orderBy("Year", "Month")

In [0]:
monthly_pollutants.show()

+----+-----+------------------+------------------+
|Year|Month|         Avg_PM2_5|          Avg_PM10|
+----+-----+------------------+------------------+
|2015|    9|            20.917|            38.035|
|2015|   10|53.541785714285716| 97.28535714285714|
|2015|   11| 55.44034482758619|102.93241379310346|
|2015|   12| 48.19642857142857| 94.66071428571429|
|2016|    1| 65.49833333333332| 103.0938888888889|
|2016|    2| 48.34941176470588| 97.90235294117647|
|2016|    3|           54.6275| 97.06571428571428|
|2016|    4| 45.76499999999999| 95.58615384615388|
|2016|    5| 30.46428571428572| 72.23464285714286|
|2016|    7| 30.78500000000001| 83.02454545454545|
|2016|    8|26.998387096774195| 80.37870967741935|
|2016|    9| 31.06882352941176| 65.40352941176471|
|2016|   10|49.190740740740736| 88.09111111111112|
|2016|   11| 61.58033333333334| 98.68366666666667|
|2016|   12| 59.06803278688524| 97.80983606557379|
|2017|    1| 70.74951612903226|120.76354838709678|
|2017|    2| 72.30403846153845|

**JOIN DATAFRSMES**

In [0]:
df_hour_combined = station_hour_cleaned_df.join(stations_cleaned_df, on="StationId", how="inner")
df_day_combined = station_day_cleaned_df.join(stations_cleaned_df, on="StationId", how="inner")
df_city_hour_combined = city_hour_cleaned_df.join(stations_cleaned_df, city_hour_cleaned_df.City == stations_cleaned_df.City, how="inner")

**Average AQI Per City**

In [0]:
# Group by City and calculate the average AQI (hourly data)
avg_aqi_city_hour = df_hour_combined.groupBy("City").agg({"AQI": "avg"}).alias("average_aqi_hour")

# Group by City and calculate the average AQI (daily data)
avg_aqi_city_day = df_day_combined.groupBy("City").agg({"AQI": "avg"}).alias("average_aqi_day")

In [0]:
avg_aqi_city_hour.show(5)

+-------------+------------------+
|         City|          avg(AQI)|
+-------------+------------------+
|        Patna| 165.7777997234841|
|    Amaravati|111.37185251798562|
|        Delhi| 210.3611625909687|
|   Chandigarh| 90.80251183108847|
|Visakhapatnam|117.86265488338192|
+-------------+------------------+
only showing top 5 rows



In [0]:
avg_aqi_city_day.show(5)

+-------------+------------------+
|         City|          avg(AQI)|
+-------------+------------------+
|        Patna|167.42023346303503|
|    Amaravati|             101.5|
|        Delhi| 212.9804618117229|
|   Chandigarh|  93.2418772563177|
|Visakhapatnam|116.95402298850574|
+-------------+------------------+
only showing top 5 rows



**Time Series Analysis**

In [0]:
from pyspark.sql.functions import col, year, month, dayofmonth

# Extract year, month, day from Datetime
df_hour_combined = df_hour_combined.withColumn("Year", year(col("Datetime"))) \
    .withColumn("Month", month(col("Datetime"))) \
  .withColumn("Day", dayofmonth(col("Datetime")))

# Average AQI over time
avg_aqi_time = df_hour_combined.groupBy("Year", "Month", "Day").agg({"AQI": "avg"}).orderBy("Year", "Month", "Day")
avg_aqi_time.show()


+----+-----+---+------------------+
|Year|Month|Day|          avg(AQI)|
+----+-----+---+------------------+
|2015|    9|  8|              61.0|
|2015|    9|  9| 69.91666666666667|
|2015|    9| 10| 72.08333333333333|
|2015|    9| 11| 64.95833333333333|
|2015|    9| 12|63.583333333333336|
|2015|    9| 13|             64.25|
|2015|    9| 14| 66.29166666666667|
|2015|    9| 15|55.916666666666664|
|2015|    9| 16|58.708333333333336|
|2015|    9| 17|48.285714285714285|
|2015|   10|  2|              96.0|
|2015|   10|  3|            82.875|
|2015|   10|  4|             100.0|
|2015|   10|  5| 89.33333333333333|
|2015|   10|  6|103.20833333333333|
|2015|   10|  7|             106.5|
|2015|   10|  8|            135.25|
|2015|   10| 10|104.55555555555556|
|2015|   10| 11|107.58333333333333|
|2015|   10| 12|113.79166666666667|
+----+-----+---+------------------+
only showing top 20 rows



In [0]:
display(monthly_pm25)

Year,Month,Avg_PM2_5
2015,9,20.917
2015,10,53.54178571428572
2015,11,55.44034482758619
2015,12,48.19642857142857
2016,1,65.49833333333332
2016,2,48.34941176470588
2016,3,54.6275
2016,4,45.76499999999999
2016,5,30.46428571428572
2016,7,30.78500000000001


In [0]:
display(city_pm25)

City,Avg_PM2_5
Patna,65.83125654450266
Gurugram,49.54100840336136
Kolkata,47.606294416243685
Amaravati,41.71321981424149
Amritsar,56.42150943396224
Delhi,117.13475490196072
Chandigarh,39.48978339350181
Hyderabad,46.03892459826955
Visakhapatnam,47.37224580017685


In [0]:
display(pollutant_data_cleaned)

PM2_5,PM10,NO,NO2,NH3,CO,SO2,O3
60.5,98.0,2.35,30.8,8.5,0.1,11.85,126.4
65.5,111.25,2.7,24.2,9.77,0.1,13.17,117.12
80.0,132.0,2.1,25.18,12.02,0.1,12.08,98.98
81.5,133.25,1.95,16.25,11.58,0.1,10.47,112.2
75.25,116.0,1.43,17.48,12.03,0.1,9.12,106.35
69.25,108.25,0.7,18.47,13.8,0.1,9.25,91.1
67.5,111.5,1.05,12.15,17.65,0.1,9.4,112.7
68.0,111.0,1.25,14.12,20.28,0.1,8.9,116.12
73.0,102.0,0.3,14.3,11.5,0.3,11.8,121.5
81.0,123.0,0.8,24.85,10.28,0.1,11.62,83.8


In [0]:
display(aqi_distribution)

AQI_Bucket,Count
Good,1073
Severe,100
Very Poor,399
Satisfactory,3730
Poor,610
Moderate,4402


In [0]:
display(daily_avg_aqi)

City,Date,Avg_AQI
Amaravati,2017-11-25T00:00:00.000+0000,184.0
Amaravati,2017-11-26T00:00:00.000+0000,197.0
Amaravati,2017-11-27T00:00:00.000+0000,198.0
Amaravati,2017-11-28T00:00:00.000+0000,188.0
Amaravati,2017-11-29T00:00:00.000+0000,173.0
Amaravati,2017-11-30T00:00:00.000+0000,165.0
Amaravati,2017-12-01T00:00:00.000+0000,191.0
Amaravati,2017-12-02T00:00:00.000+0000,191.0
Amaravati,2017-12-03T00:00:00.000+0000,227.0
Amaravati,2017-12-04T00:00:00.000+0000,168.0


In [0]:
display(weekly_aqi)

Year,Month,DayOfWeek,Avg_AQI
2015,1,1,350.3125
2015,1,2,338.6145833333333
2015,1,3,335.46875
2015,1,4,334.752688172043
2015,1,5,355.45098039215685
2015,1,6,356.825
2015,1,7,317.72477064220186
2015,2,1,337.7604166666667
2015,2,2,313.5625
2015,2,3,333.46875


In [0]:
display(monthly_pollutants)

Year,Month,Avg_PM2_5,Avg_PM10
2015,9,20.917,38.035
2015,10,53.54178571428572,97.28535714285714
2015,11,55.44034482758619,102.93241379310346
2015,12,48.19642857142857,94.66071428571428
2016,1,65.49833333333332,103.0938888888889
2016,2,48.34941176470588,97.90235294117647
2016,3,54.6275,97.06571428571428
2016,4,45.76499999999999,95.58615384615388
2016,5,30.46428571428572,72.23464285714286
2016,7,30.78500000000001,83.02454545454545


In [0]:
avg_aqi_city_hour.show(50)

+-------------+------------------+
|         City|          avg(AQI)|
+-------------+------------------+
|        Patna| 165.7777997234841|
|    Amaravati|111.37185251798562|
|        Delhi| 210.3611625909687|
|   Chandigarh| 90.80251183108847|
|Visakhapatnam|117.86265488338192|
|     Gurugram|148.64478371501272|
|     Amritsar|129.85304601425793|
|    Hyderabad| 98.26471984805318|
|      Kolkata| 129.1410447761194|
+-------------+------------------+



In [0]:
display(avg_aqi_city_day)

City,avg(AQI)
Patna,167.42023346303503
Amaravati,101.5
Delhi,212.9804618117229
Chandigarh,93.2418772563177
Visakhapatnam,116.95402298850574
Gurugram,152.87387387387386
Kolkata,128.3654024051804
Amritsar,129.16981132075472
Hyderabad,97.38621509209744


In [0]:
display(avg_aqi_time)

Year,Month,Day,avg(AQI)
2015,9,8,61.0
2015,9,9,69.91666666666667
2015,9,10,72.08333333333333
2015,9,11,64.95833333333333
2015,9,12,63.583333333333336
2015,9,13,64.25
2015,9,14,66.29166666666667
2015,9,15,55.91666666666666
2015,9,16,58.708333333333336
2015,9,17,48.285714285714285
