# Create Silver Cleaned AirQuality Delta Table

### Loading Raw Data

In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!

weather_df = spark.read.format('csv').option('header', True).option('inferSchema', True).load('Files/airquality/weather/*.csv')

pollution_df = spark.read.format('csv').option('header', True).option('inferSchema', True).load('Files/airquality/pollution/*.csv')


### Dropping unnecesary columns

In [None]:
from pyspark.sql.functions import col

cols = ['Index', '_c0', 'sensor_id']

weather_df = weather_df.drop(*cols)

pollution_df = pollution_df.drop(*cols)

### Joining Weather and Pollution Data in one AirQuality Data

In [None]:
airquality_df = weather_df.alias("w").join(pollution_df.alias("p"), (weather_df.location == pollution_df.location) & (weather_df.timestamp == pollution_df.timestamp), "inner") \
.select("w.lat", "w.lon", "w.timestamp", "w.pressure", "w.temperature", "w.humidity", "p.P1", "p.P2")

### Sum the pollution column as one Pollution Total Column

In [None]:
airquality_df = airquality_df.withColumn("Pollution_total", col('P1') + col('P2'))
airquality_df = airquality_df.drop(col('P1'))
airquality_df = airquality_df.drop(col('P2'))

### Filtering NULL Data

In [None]:
airquality_df = airquality_df.filter("lat is not null AND lon is not null AND timestamp is not null AND pressure is not null AND temperature is not null AND Pollution_total is not null")

### Create Date Derived Columns

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, hour

airquality_df = airquality_df \
.withColumn("year", year(col('timestamp'))) \
.withColumn("month", month(col('timestamp'))) \
.withColumn("day", dayofmonth(col('timestamp'))) \
.withColumn("hour", hour(col('timestamp')))

### Creating Cleaned Temperature Data

Using Average Temperature by Month, Day, Hour because too much dirty temperature data

In [None]:
from pyspark.sql.functions import avg

temperature_df = airquality_df.groupBy(["month", "day", "hour"]) \
.agg(avg("temperature").alias("average_temperature"))

Then Created the Cleaned Temperature Data by Time then joining to the AirQuality Data

In [None]:
airquality_df = airquality_df.alias("a").join(temperature_df.alias("t"), (airquality_df.month == temperature_df.month) & (airquality_df.day == temperature_df.day) & (airquality_df.hour == temperature_df.hour), "inner") \
.select("a.lat", "a.lon", "a.timestamp", "a.pressure", "t.average_temperature", "a.humidity", "a.Pollution_total", "a.year", "a.month", "a.day", "a.hour")

### Writing AirQuality Cleaned Data to Delta Table in MS Fabric Lakehouse

In [None]:
airquality_df.write.format('delta').mode('overwrite').save('Tables/airquality')