**Connecting to the storage account and container to access files**

In [2]:
# Read data directly from ADLS Gen2
container_name = "harshcontainer"
air_pollution_folder_path = "Silver/AirPollutionSilverData"
historical_weather_folder_path = "Silver/WeatherSilverData"

# Set up the configuration for accessing the storage account
storage_account_name = "harshshahstorage"
storage_account_key = "key"

# Read all JSON files in the folder
air_pollution_df = spark.read.json(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{air_pollution_folder_path}")
historical_weather_df = spark.read.json(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{historical_weather_folder_path}")

# Set Spark config to access the storage account
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
)

# Define ABFSS path
abfss_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"


StatementMeta(openweather, 1, 2, Finished, Available, Finished)

**Load Data into Dataframe**

In [3]:
# Load the weather data from the silver layer
weather_df = spark.read.parquet(f"{abfss_path}Silver/WeatherSilverData")
weather_df.show()

StatementMeta(openweather, 1, 3, Finished, Available, Finished)

+----------------+--------+-------------------+------+----------+----------+------------+------+----------+----------+------------+------+----------+----------+------------+--------+----------+----------------+-------------------------+------------------+------------------+
|              id|location|          date_time|temp_K|temp_max_K|temp_min_K|feels_like_K|temp_c|temp_max_c|temp_min_C|feels_like_C|temp_F|temp_max_F|temp_min_F|feels_like_F|humidity|wind_speed|weather_id_value|weather_description_value|weather_icon_value|weather_main_value|
+----------------+--------+-------------------+------+----------+----------+------------+------+----------+----------+------------+------+----------+----------+------------+--------+----------+----------------+-------------------------+------------------+------------------+
|Boston1739401200|  Boston|2025-02-12 23:00:00|272.94|    273.79|    271.83|      266.88|   0.0|       1.0|      -1.0|        -6.0| 31.62|     33.15|     29.62|       20.71|  

In [4]:
# Load the air pollution data from silver layer
air_pollution_df = spark.read.parquet(f"{abfss_path}Silver/AirPollutionSilverData")
air_pollution_df.show()

StatementMeta(openweather, 1, 4, Finished, Available, Finished)

+----------------+--------+-------------------+---+------+----+----+-----+----+----+-----+----+
|              id|location|          date_time|aqi|    co| nh3|  no|  no2|  o3|pm10|pm2_5| so2|
+----------------+--------+-------------------+---+------+----+----+-----+----+----+-----+----+
|Boston1707170400|  Boston|2024-02-05 22:00:00|  2|280.38|0.77|0.18|11.65|3.00|1.89| 1.30|1.61|
|Boston1707174000|  Boston|2024-02-05 23:00:00|  2|293.73|0.90|0.00|13.54|3.00|2.23| 1.56|1.67|
|Boston1707177600|  Boston|2024-02-06 00:00:00|  2|303.75|1.06|0.00|14.74|3.00|2.83| 2.03|1.79|
|Boston1707181200|  Boston|2024-02-06 01:00:00|  2|300.41|1.08|0.00|13.54|3.00|2.90| 2.08|1.64|
|Boston1707184800|  Boston|2024-02-06 02:00:00|  2|290.39|1.00|0.00|11.31|3.00|2.46| 1.77|1.45|
|Boston1707188400|  Boston|2024-02-06 03:00:00|  2|283.72|0.92|0.00| 9.68|3.00|2.08| 1.49|1.31|
|Boston1707192000|  Boston|2024-02-06 04:00:00|  2|273.70|0.75|0.00| 7.63|3.00|1.51| 1.05|1.15|
|Boston1707195600|  Boston|2024-02-06 05

**Data Processing**

In [5]:
# Recalculate AQI for Air Pollution Data

from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

def calculate_aqi(pollutant, concentration):
    breakpoints = {
        'O3_8hr': [(0, 54, 0, 50), (55, 70, 51, 100), (71, 85, 101, 150), (86, 105, 151, 200), (106, 200, 201, 300)],
        'O3_1hr': [(125, 164, 101, 150), (165, 204, 151, 200), (205, 404, 201, 300), (405, 504, 301, 400), (505, 604, 401, 500)],
        'PM2.5_24hr': [(0, 12, 0, 50), (12.1, 35.4, 51, 100), (35.5, 55.4, 101, 150), (55.5, 150.4, 151, 200), (150.5, 250.4, 201, 300), (250.5, 350.4, 301, 400), (350.5, 500.4, 401, 500)],
        'PM10_24hr': [(0, 54, 0, 50), (55, 154, 51, 100), (155, 254, 101, 150), (255, 354, 151, 200), (355, 424, 201, 300), (425, 504, 301, 400), (505, 604, 401, 500)],
        'CO_8hr': [(0, 4.4, 0, 50), (4.5, 9.4, 51, 100), (9.5, 12.4, 101, 150), (12.5, 15.4, 151, 200), (15.5, 30.4, 201, 300), (30.5, 40.4, 301, 400), (40.5, 50.4, 401, 500)],
        'SO2_1hr': [(0, 35, 0, 50), (36, 75, 51, 100), (76, 185, 101, 150), (186, 304, 151, 200)],
        'SO2_24hr': [(305, 604, 201, 300), (605, 804, 301, 400), (805, 1004, 401, 500)],
        'NO2_1hr': [(0, 53, 0, 50), (54, 100, 51, 100), (101, 360, 101, 150), (361, 649, 151, 200), (650, 1249, 201, 300), (1250, 2049, 301, 400), (2050, 3049, 401, 500)],
    }

    if pollutant not in breakpoints:
        raise ValueError(f"Unsupported pollutant: {pollutant}")

    for (Clow, Chigh, Ilow, Ihigh) in breakpoints[pollutant]:
        if Clow <= concentration <= Chigh:
            return round(((Ihigh - Ilow) / (Chigh - Clow)) * (concentration - Clow) + Ilow)

    return None  # If the concentration is out of the given ranges

StatementMeta(openweather, 1, 5, Finished, Available, Finished)

In [12]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType

def calculate_rolling_average(df, column, window):
    windowSpec = Window.partitionBy("location").orderBy("date_time").rowsBetween(-window+1, 0)
    return F.avg(column).over(windowSpec)

def calculate_aqi_row(row):
    pollutant_map = {
        'o3_8hr': 'O3_8hr',
        'o3_1hr': 'O3_1hr',
        'pm2_5_24hr': 'PM2.5_24hr',
        'pm10_24hr': 'PM10_24hr',
        'co_8hr': 'CO_8hr',
        'so2_1hr': 'SO2_1hr',
        'so2_24hr': 'SO2_24hr',
        'no2_1hr': 'NO2_1hr'
    }
    aqi_values = []

    for col_name, pollutant in pollutant_map.items():
        concentration = row[col_name]
        if concentration is not None:
            try:
                concentration = float(concentration)  # Ensure it's a float
                aqi = calculate_aqi(pollutant, concentration)
                if aqi is not None:
                    aqi_values.append(aqi)
            except ValueError:
                print(f"Skipping invalid value: {concentration} for pollutant {col_name}")

    return max(aqi_values) if aqi_values else None

def calculate_us_aqi(df):
    df = df.withColumn("o3_8hr", calculate_rolling_average(df, "o3", 8).cast(DoubleType()))
    df = df.withColumn("o3_1hr", F.col("o3").cast(DoubleType()))
    df = df.withColumn("pm2_5_24hr", calculate_rolling_average(df, "pm2_5", 24).cast(DoubleType()))
    df = df.withColumn("pm10_24hr", calculate_rolling_average(df, "pm10", 24).cast(DoubleType()))
    df = df.withColumn("co_8hr", calculate_rolling_average(df, "co", 8).cast(DoubleType()))
    df = df.withColumn("so2_1hr", F.col("so2").cast(DoubleType()))
    df = df.withColumn("so2_24hr", calculate_rolling_average(df, "so2", 24).cast(DoubleType()))
    df = df.withColumn("no2_1hr", F.col("no2").cast(DoubleType()))

    calculate_aqi_udf = F.udf(lambda row: calculate_aqi_row(row), IntegerType())
    
    df = df.withColumn("us_aqi", calculate_aqi_udf(F.struct(
        F.col("o3_8hr"),
        F.col("o3_1hr"),
        F.col("pm2_5_24hr"),
        F.col("pm10_24hr"),
        F.col("co_8hr"),
        F.col("so2_1hr"),
        F.col("so2_24hr"),
        F.col("no2_1hr")
    )))

    return df

# Apply the function
air_pollution_df = calculate_us_aqi(air_pollution_df)

StatementMeta(openweather, 1, 12, Finished, Available, Finished)

In [13]:
# Display selected columns to verify recalculations
# Ensure all columns are properly cast before displaying
from pyspark.sql.functions import col

air_pollution_df = air_pollution_df.withColumn("o3_8hr", col("o3_8hr").cast("double")) \
                                   .withColumn("pm2_5_24hr", col("pm2_5_24hr").cast("double")) \
                                   .withColumn("pm10_24hr", col("pm10_24hr").cast("double")) \
                                   .withColumn("co_8hr", col("co_8hr").cast("double")) \
                                   .withColumn("so2_1hr", col("so2_1hr").cast("double")) \
                                   .withColumn("so2_24hr", col("so2_24hr").cast("double")) \
                                   .withColumn("no2_1hr", col("no2_1hr").cast("double")) \
                                   .withColumn("us_aqi", col("us_aqi").cast("double"))

# Display selected columns to verify recalculations
air_pollution_df.select(
    "date_time", "o3_8hr", "o3_1hr", "pm2_5_24hr", "pm10_24hr", "co_8hr", "so2_1hr", "so2_24hr", "no2_1hr", "us_aqi"
).show(10)


StatementMeta(openweather, 1, 13, Finished, Available, Finished)

+-------------------+------+------+------------------+------------------+------------------+-------+------------------+-------+------+
|          date_time|o3_8hr|o3_1hr|        pm2_5_24hr|         pm10_24hr|            co_8hr|so2_1hr|          so2_24hr|no2_1hr|us_aqi|
+-------------------+------+------+------------------+------------------+------------------+-------+------------------+-------+------+
|2024-02-05 22:00:00|   3.0|   3.0|               1.3|              1.89|            280.38|   1.61|              1.61|  11.65|  11.0|
|2024-02-05 23:00:00|   3.0|   3.0|1.4300000000000002|              2.06|           287.055|   1.67|1.6400000000000001|  13.54|  13.0|
|2024-02-06 00:00:00|   3.0|   3.0|1.6300000000000001| 2.316666666666667|            292.62|   1.79|1.6900000000000002|  14.74|  14.0|
|2024-02-06 01:00:00|   3.0|   3.0|1.7425000000000002|            2.4625|          294.5675|   1.64|            1.6775|  13.54|  13.0|
|2024-02-06 02:00:00|   3.0|   3.0|             1.748|2

**Data Aggregation**

In [16]:
# Weather data

# Aggregation by Date: Compute daily averages for temperature, humidity, wind speed, etc.
weather_agg_df = weather_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.round(F.avg("temp_F")).alias("avg_temp_F"),
    F.round(F.avg("humidity"),2).alias("avg_humidity"),
    F.round(F.avg("wind_speed"),2).alias("avg_wind_speed"),
    F.round(F.max("temp_max_F")).alias("max_temp_F"),
    F.round(F.min("temp_min_F")).alias("min_temp_F"),
    F.count("id").alias("weather_records")
)

# Weather Condition Counts: Count the occurrences of different weather conditions for each day
weather_condition_counts_df = weather_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date"), "weather_main_value").count()

# Temperature Extremes: Identify the maximum and minimum temperatures for each day
temp_extremes_df = weather_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.max("temp_max_F").alias("max_temp_F"),
    F.min("temp_min_F").alias("min_temp_F")
)

# Display the Aggregated Data (use the show functions)

# Display weather_agg_df
print("Weather Aggregated Data:")
weather_agg_df.show()

# Display weather_condition_counts_df
print("\nWeather Condition Counts:")
weather_condition_counts_df.show()

# Display temp_extremes_df
print("\nTemperature Extremes:")
temp_extremes_df.show()

StatementMeta(openweather, 1, 16, Finished, Available, Finished)

Weather Aggregated Data:
+----------+----------+------------+--------------+----------+----------+---------------+
|      date|avg_temp_F|avg_humidity|avg_wind_speed|max_temp_F|min_temp_F|weather_records|
+----------+----------+------------+--------------+----------+----------+---------------+
|2024-02-21|      33.0|        67.0|          6.69|      35.0|      31.0|              1|
|2024-02-22|      33.0|       70.96|          2.96|      47.0|      21.0|             24|
|2024-02-23|      38.0|       86.21|          2.89|      44.0|      33.0|             24|
|2024-02-24|      37.0|       72.79|           4.1|      42.0|      26.0|             24|
|2024-02-25|      25.0|       41.21|          3.28|      41.0|      12.0|             24|
|2024-02-26|      38.0|       63.63|          3.85|      57.0|      25.0|             24|
|2024-02-27|      43.0|       75.13|          1.65|      64.0|      28.0|             24|
|2024-02-28|      54.0|       87.21|          5.79|      60.0|      45.0|  

In [17]:
# Air Pollution Data

# Daily Average AQI: Compute the daily average Air Quality Index (AQI)
aqi_agg_df = air_pollution_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.round(F.avg("us_aqi")).alias("avg_us_aqi")
)

# Pollutant Aggregation: Calculate daily averages for each pollutant
pollutant_agg_df = air_pollution_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.round(F.avg("co"),2).alias("avg_co"),
    F.round(F.avg("no2"),2).alias("avg_no2"),
    F.round(F.avg("o3"),2).alias("avg_o3"),
    F.round(F.avg("so2"),2).alias("avg_so2"),
    F.round(F.avg("pm2_5"),2).alias("avg_pm2_5"),
    F.round(F.avg("pm10"),2).alias("avg_pm10")
)

# Identify High Pollution Events: Mark days with high pollution levels based on a specified threshold
high_pollution_events_df = air_pollution_df.withColumn("high_pollution", F.when(col("us_aqi") > 100, 1).otherwise(0)).groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.sum("high_pollution").alias("high_pollution_events")
)

# Display the Aggregated Data (use the show functions)

# Display aqi_agg_df
print("AQI Aggregated Data:")
aqi_agg_df.show()

# Display pollutant_agg_df
print("\nPollution Counts:")
pollutant_agg_df.show()

# Display high_pollution_events_df
print("\n Pollution Extremes:")
high_pollution_events_df.show()

StatementMeta(openweather, 1, 17, Finished, Available, Finished)

AQI Aggregated Data:
+----------+----------+
|      date|avg_us_aqi|
+----------+----------+
|2024-07-14|      22.0|
|2024-08-20|      23.0|
|2024-09-15|      40.0|
|2024-10-24|      64.0|
|2024-08-06|      48.0|
|2024-10-22|      61.0|
|2025-02-25|      84.0|
|2024-04-03|      12.0|
|2024-12-12|      16.0|
|2024-02-08|      77.0|
|2024-02-28|      58.0|
|2024-07-12|      25.0|
|2025-01-03|       8.0|
|2024-10-10|      11.0|
|2024-12-09|      27.0|
|2024-09-21|       8.0|
|2024-11-20|      32.0|
|2024-12-03|      29.0|
|2024-02-20|      30.0|
|2024-11-08|      23.0|
+----------+----------+
only showing top 20 rows


Pollution Counts:
+----------+------+-------+------+-------+---------+--------+
|      date|avg_co|avg_no2|avg_o3|avg_so2|avg_pm2_5|avg_pm10|
+----------+------+-------+------+-------+---------+--------+
|2024-07-14|294.57|  14.11|   3.0|   2.94|     9.08|    9.75|
|2024-08-20|309.03|  11.25|   3.0|   2.27|     2.63|    3.11|
|2024-09-15|292.48|  13.85|   3.0|   2.24|     6

**Save Data to Gold Layer**

In [19]:
# Save aggregated data as single files

abfss_base_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/Gold"

weather_agg_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/agg_weather")
weather_condition_counts_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/agg_weather_conditions")
temp_extremes_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/agg_temp_extremes")
aqi_agg_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/agg_aqi")
pollutant_agg_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/agg_pollutants")
high_pollution_events_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/agg_high_pollution_events")

# Save processed weather data
weather_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/processed_weather")

# Save processed air pollution data
air_pollution_df.coalesce(1).write.mode("overwrite").parquet(f"{abfss_base_path}/processed_air_pollution")

StatementMeta(openweather, 1, 19, Finished, Available, Finished)

****