In [128]:

# Set up the configuration for accessing the storage account
storage_account_name = "team4storage"
storage_account_key = "cqzd7Fys8ii2ocJXH4eZA1P+1LKHbgvzOYtx55kqDmsL3fck8ggndqsuch8Mdh0YUDHBhqr6uyQD+ASttatJWA=="
container = "team4container"



spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_key
)

wasbs_path = f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/"


# Load the weather data from the silver layer
respiratory_Boston_df = spark.read.parquet(f"{wasbs_path}Silver/RespiratoryHospitalizations/RespiratoryHospitalizationsFilteredBoston.parquet")
respiratory_Houston_df = spark.read.parquet(f"{wasbs_path}Silver/RespiratoryHospitalizations/RespiratoryHospitalizationsFilteredHouston.parquet")

# Load the air pollution data
air_pollution_Boston_df =  spark.read.parquet(f"{wasbs_path}Silver/HistoricalAirPollution/HistoricalAirPollutionBoston.parquet")
air_pollution_Houston_df =  spark.read.parquet(f"{wasbs_path}Silver/HistoricalAirPollution/HistoricalAirPollutionHouston.parquet")


from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, FloatType
from pyspark.sql.types import IntegerType

# Helper function to cast the columns to FloatType
def cast_air_pollution_df(df):
    # Cast the columns to FloatType
    df = (
        df.withColumn("co", col("co").cast(DoubleType()))
          .withColumn("nh3", col("nh3").cast(DoubleType()))
          .withColumn("no", col("no").cast(IntegerType()))
          .withColumn("no2", col("no2").cast(DoubleType()))
          .withColumn("o3", col("o3").cast(DoubleType()))
          .withColumn("pm10", col("pm10").cast(DoubleType()))
          .withColumn("pm2_5", col("pm2_5").cast(DoubleType()))
          .withColumn("so2", col("so2").cast(DoubleType()))
    )
    

    return df

# Cast and check
air_pollution_Boston_df = cast_air_pollution_df(air_pollution_Boston_df)
air_pollution_Houston_df = cast_air_pollution_df(air_pollution_Houston_df)

from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql.window import Window


def calculate_aqi(pollutant, concentration):
    breakpoints = {
        'O3_8hr': [(0, 54, 0, 50), (55, 70, 51, 100), (71, 85, 101, 150), (86, 105, 151, 200), (106, 200, 201, 300)],
        'O3_1hr': [(125, 164, 101, 150), (165, 204, 151, 200), (205, 404, 201, 300), (405, 504, 301, 400), (505, 604, 401, 500)],
        'PM2.5_24hr': [(0, 12, 0, 50), (12.1, 35.4, 51, 100), (35.5, 55.4, 101, 150), (55.5, 150.4, 151, 200), (150.5, 250.4, 201, 300), (250.5, 350.4, 301, 400), (350.5, 500.4, 401, 500)],
        'PM10_24hr': [(0, 54, 0, 50), (55, 154, 51, 100), (155, 254, 101, 150), (255, 354, 151, 200), (355, 424, 201, 300), (425, 504, 301, 400), (505, 604, 401, 500)],
        'CO_8hr': [(0, 4.4, 0, 50), (4.5, 9.4, 51, 100), (9.5, 12.4, 101, 150), (12.5, 15.4, 151, 200), (15.5, 30.4, 201, 300), (30.5, 40.4, 301, 400), (40.5, 50.4, 401, 500)],
        'SO2_1hr': [(0, 35, 0, 50), (36, 75, 51, 100), (76, 185, 101, 150), (186, 304, 151, 200)],
        'SO2_24hr': [(305, 604, 201, 300), (605, 804, 301, 400), (805, 1004, 401, 500)],
        'NO2_1hr': [(0, 53, 0, 50), (54, 100, 51, 100), (101, 360, 101, 150), (361, 649, 151, 200), (650, 1249, 201, 300), (1250, 2049, 301, 400), (2050, 3049, 401, 500)],
    }

    if pollutant not in breakpoints:
        raise ValueError(f"Unsupported pollutant: {pollutant}")

    for (Clow, Chigh, Ilow, Ihigh) in breakpoints[pollutant]:
        if Clow <= concentration <= Chigh:
            # Use F.round() instead of Python's round() to handle columns
            return round(((Ihigh - Ilow) / (Chigh - Clow)) * (concentration - Clow) + Ilow)

    return None  # If the concentration is out of the given ranges

def calculate_rolling_average(df, column, window):
    windowSpec = Window.partitionBy("Location").orderBy("date_time").rowsBetween(-window+1, 0)
    return F.avg(column).over(windowSpec)

# Modify calculate_aqi_row to work with PySpark Columns
def calculate_aqi_row(row):
    pollutant_map = {
        'o3_8hr': 'O3_8hr',
        'o3_1hr': 'O3_1hr',
        'pm2_5_24hr': 'PM2.5_24hr',
        'pm10_24hr': 'PM10_24hr',
        'co_8hr': 'CO_8hr',
        'so2_1hr': 'SO2_1hr',
        'so2_24hr': 'SO2_24hr',
        'no2_1hr': 'NO2_1hr'
    }
    aqi_values = []

    for col_name, pollutant in pollutant_map.items():
        concentration = row[col_name]
        if concentration is not None:
            aqi = calculate_aqi(pollutant, concentration)
            if aqi is not None:
                aqi_values.append(aqi)

    if aqi_values:
        return max(aqi_values)
    return None

# Apply the AQI calculation to your DataFrame
def calculate_us_aqi(df):
    df = df.withColumn("o3_8hr", calculate_rolling_average(df, "o3", 8))
    df = df.withColumn("o3_1hr", df["o3"])
    df = df.withColumn("pm2_5_24hr", calculate_rolling_average(df, "pm2_5", 24))
    df = df.withColumn("pm10_24hr", calculate_rolling_average(df, "pm10", 24))
    df = df.withColumn("co_8hr", calculate_rolling_average(df, "co", 8))
    df = df.withColumn("so2_1hr", df["so2"])
    df = df.withColumn("so2_24hr", calculate_rolling_average(df, "so2", 24))
    df = df.withColumn("no2_1hr", df["no2"])

    calculate_aqi_udf = F.udf(lambda row: calculate_aqi_row(row), IntegerType())
    
    df = df.withColumn("us_aqi", calculate_aqi_udf(F.struct(
        col("o3_8hr"),
        col("o3_1hr"),
        col("pm2_5_24hr"),
        col("pm10_24hr"),
        col("co_8hr"),
        col("so2_1hr"),
        col("so2_24hr"),
        col("no2_1hr")
    )))

    return df
# air_pollution_Boston_df.head(10)

# # Now re-run the AQI calculation
air_pollution_Boston_df = calculate_us_aqi(air_pollution_Boston_df)
air_pollution_Houston_df = calculate_us_aqi(air_pollution_Houston_df)



# Daily Average AQI: Compute the daily average Air Quality Index (AQI)
aqi_agg_Boston_df = air_pollution_Boston_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.round(F.avg("us_aqi")).alias("avg_us_aqi")
)

aqi_agg_Houston_df = air_pollution_Houston_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.round(F.avg("us_aqi")).alias("avg_us_aqi")
)

# Pollutant Aggregation: Calculate daily averages for each pollutant
pollutant_agg_Boston_df = air_pollution_Boston_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.round(F.avg("co"),2).alias("avg_co"),
    F.round(F.avg("no2"),2).alias("avg_no2"),
    F.round(F.avg("o3"),2).alias("avg_o3"),
    F.round(F.avg("so2"),2).alias("avg_so2"),
    F.round(F.avg("pm2_5"),2).alias("avg_pm2_5"),
    F.round(F.avg("pm10"),2).alias("avg_pm10")
)

pollutant_agg_Houston_df = air_pollution_Houston_df.groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.round(F.avg("co"),2).alias("avg_co"),
    F.round(F.avg("no2"),2).alias("avg_no2"),
    F.round(F.avg("o3"),2).alias("avg_o3"),
    F.round(F.avg("so2"),2).alias("avg_so2"),
    F.round(F.avg("pm2_5"),2).alias("avg_pm2_5"),
    F.round(F.avg("pm10"),2).alias("avg_pm10")
)


# Identify High Pollution Events: Mark days with high pollution levels based on a specified threshold
high_pollution_events_Boston_df = air_pollution_Boston_df.withColumn("high_pollution", F.when(col("us_aqi") > 100, 1).otherwise(0)).groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.sum("high_pollution").alias("high_pollution_events")
)
# Identify High Pollution Events: Mark days with high pollution levels based on a specified threshold
high_pollution_events_Houston_df = air_pollution_Houston_df.withColumn("high_pollution", F.when(col("us_aqi") > 100, 1).otherwise(0)).groupBy(F.date_format(col("date_time"), "yyyy-MM-dd").alias("date")).agg(
    F.sum("high_pollution").alias("high_pollution_events")
)


# Save aggregated data as single files
# can modify this to go directly to your directories
# aqi_agg_Boston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Boston_aqi")
# aqi_agg_Houston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Houston_aqi")
# pollutant_agg_Boston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Boston_pollutants")
# pollutant_agg_Houston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Houston_pollutants")
# high_pollution_events_Boston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Boston_high_pollution_events")
# high_pollution_events_Houston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Houston_high_pollution_events")

# Save processed air pollution data
# air_pollution_Boston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/Boston_processed_air_pollution")
# air_pollution_Houston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/Houston_processed_air_pollution")

def cast_respiratory_df(df):
    # Cast the columns to IntegerType where appropriate
    df = (
        df.withColumn("total_visits_covid", col("total_visits_covid").cast(IntegerType()))
          .withColumn("total_visits_influenza", col("total_visits_influenza").cast(IntegerType()))
          .withColumn("total_visits_rsv", col("total_visits_rsv").cast(IntegerType()))
          .withColumn("total_visits_smoothed_combined", col("total_visits_smoothed_combined").cast(DoubleType()))
          .withColumn("total_visits_smoothed_covid", col("total_visits_smoothed_covid").cast(DoubleType()))
          .withColumn("total_visits_smoothed_influenza", col("total_visits_smoothed_influenza").cast(DoubleType()))
          .withColumn("total_visits_smoothed_rsv", col("total_visits_smoothed_rsv").cast(DoubleType()))
    )
    return df

# Cast columns
respiratory_Boston_df = cast_respiratory_df(respiratory_Boston_df)
respiratory_Houston_df = cast_respiratory_df(respiratory_Houston_df)

def identify_high_visit_events_Bos(df, threshold_covid = 27125, threshold_flu = 441755, threshold_rsv = 10462):
    # Mark high visit events based on threshold
    df = df.withColumn("high_visits_covid", F.when(col("total_visits_covid") > threshold_covid, 1).otherwise(0))
    df = df.withColumn("high_visits_influenza", F.when(col("total_visits_influenza") > threshold_flu, 1).otherwise(0))
    df = df.withColumn("high_visits_rsv", F.when(col("total_visits_rsv") > threshold_rsv, 1).otherwise(0))
    return df

def identify_high_visit_events_Hou(df, threshold_covid = 175325, threshold_flu = 285530, threshold_rsv = 67625):
    # Mark high visit events based on threshold
    df = df.withColumn("high_visits_covid", F.when(col("total_visits_covid") > threshold_covid, 1).otherwise(0))
    df = df.withColumn("high_visits_influenza", F.when(col("total_visits_influenza") > threshold_flu, 1).otherwise(0))
    df = df.withColumn("high_visits_rsv", F.when(col("total_visits_rsv") > threshold_rsv, 1).otherwise(0))
    return df
# Identify high visit events for Boston and Houston
high_visit_events_Boston_df = identify_high_visit_events_Bos(respiratory_Boston_df)
high_visit_events_Houston_df = identify_high_visit_events_Hou(respiratory_Houston_df)


# high_visit_events_Boston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Boston_high_visit_events")
# high_visit_events_Houston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/agg_Houston_high_visit_events")
# respiratory_Boston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/Boston_processed_respiratory")
# respiratory_Houston_df.coalesce(1).write.mode("overwrite").parquet(f"{wasbs_path}Gold/Houston_processed_respiratory")
