In [0]:
# Import required functions

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.functions import col, when, to_timestamp, date_format, year, month, dayofmonth, lit

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Myanmar Air Quality Data Processing") \
    .getOrCreate()

In [0]:
# Load CSV data

input_path = "/FileStore/tables/myanmar-air-quality/FinalData.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)

# Show the loaded data
df.show(5)


+------+-----------------+---------------+----+-------+------------+-----+-----+-----+-------------+----------+------+---------+----------------+----------+-----------------+
|  City|           Center|           Date|Year|  Month|      Season|PM1_0|PM2_5| PM10|Temperature_F|Humidity_%|   AQI|New_cases|Cumulative_cases|New_deaths|Cumulative_deaths|
+------+-----------------+---------------+----+-------+------------+-----+-----+-----+-------------+----------+------+---------+----------------+----------+-----------------+
|Yangon|7 Miles Mayangone|10/20/2019 0:00|2019|October|Rainy Season| 29.6|44.27|52.87|        95.67|     49.64|122.59|        0|               0|         0|                0|
|Yangon|7 Miles Mayangone|10/21/2019 0:00|2019|October|Rainy Season|25.22|37.49|45.21|        94.42|     51.67| 105.9|        0|               0|         0|                0|
|Yangon|7 Miles Mayangone|10/22/2019 0:00|2019|October|Rainy Season|24.46|35.84|42.32|         95.4|     49.26|101.84|       

In [0]:
# Get a list of column names
columns = df.columns

# Create a new list to store unique column names
unique_columns = []

# Track duplicates
duplicates = {}

# Process each column name
for col_name in columns:
    if col_name in unique_columns:
        # If duplicate, append a suffix to make it unique
        if col_name in duplicates:
            duplicates[col_name] += 1
        else:
            duplicates[col_name] = 1
        col_name = f"{col_name}_{duplicates[col_name]}"
    
    unique_columns.append(col_name)

# Rename the columns
df = df.toDF(*unique_columns)

# Show the DataFrame with unique column names
df.show(5)


+------+-----------------+---------------+----+-------+------------+-----+-----+-----+-------------+----------+------+---------+----------------+----------+-----------------+
|  City|           Center|           Date|Year|  Month|      Season|PM1_0|PM2_5| PM10|Temperature_F|Humidity_%|   AQI|New_cases|Cumulative_cases|New_deaths|Cumulative_deaths|
+------+-----------------+---------------+----+-------+------------+-----+-----+-----+-------------+----------+------+---------+----------------+----------+-----------------+
|Yangon|7 Miles Mayangone|10/20/2019 0:00|2019|October|Rainy Season| 29.6|44.27|52.87|        95.67|     49.64|122.59|        0|               0|         0|                0|
|Yangon|7 Miles Mayangone|10/21/2019 0:00|2019|October|Rainy Season|25.22|37.49|45.21|        94.42|     51.67| 105.9|        0|               0|         0|                0|
|Yangon|7 Miles Mayangone|10/22/2019 0:00|2019|October|Rainy Season|24.46|35.84|42.32|         95.4|     49.26|101.84|       

In [0]:
from pyspark.sql.types import IntegerType, StringType, FloatType, DateType
       
# Convert the 'Date' column to TimestampType with the correct format
df = df.withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy H:mm"))

# Extract Year_Key, Month_Key, and Day_Key
df = df.withColumn("Year_Key", year(col("Date")).cast(StringType())) \
       .withColumn("Month_Key", date_format(col("Date"), "yyyyMM")) \
       .withColumn("Day_Key", date_format(col("Date"), "yyyyMMdd"))


In [0]:
# Calculate Health_Impact based on PM2_5 value
# Adjust the classification of PM2_5 values
df = df.withColumn("PM2_5_Range", 
                   when((col("PM2_5") >= 0) & (col("PM2_5") <= 50), "0-50")
                   .when((col("PM2_5") > 50) & (col("PM2_5") <= 100), "51-100")
                   .when((col("PM2_5") > 100) & (col("PM2_5") <= 200), "101-200")
                   .when((col("PM2_5") > 200) & (col("PM2_5") <= 300), "201-300")
                   .when((col("PM2_5") > 300) & (col("PM2_5") <= 400), "301-400")
                   .when((col("PM2_5") > 400) & (col("PM2_5") <= 500), "401-500")
                   .otherwise("Unknown")) \
         .withColumn("health_impact", 
                    when(col("PM2_5_Range") == "0-50", "Good")
                    .when(col("PM2_5_Range") == "51-100", "Satisfactory")
                    .when(col("PM2_5_Range") == "101-200", "Moderate")
                    .when(col("PM2_5_Range") == "201-300", "Poor")
                    .when(col("PM2_5_Range") == "301-400", "Very poor")
                    .when(col("PM2_5_Range") == "401-500", "Severe")
                    .otherwise("Unknown")) \
         .withColumn("Description", 
                    when(col("PM2_5_Range") == "0-50", "minimal impact")
                    .when(col("PM2_5_Range") == "51-100", "minor breathing discomfort for sensitive people")
                    .when(col("PM2_5_Range") == "101-200", "breathing discomfort for people with asthma, heart disease, or lungs")
                    .when(col("PM2_5_Range") == "201-300", "breathing discomfort for most people on prolonged exposure")
                    .when(col("PM2_5_Range") == "301-400", "respiratory illness on prolonged exposure")
                    .when(col("PM2_5_Range") == "401-500", "severe respiratory illness on prolonged exposure")
                    .otherwise("Unknown"))

In [0]:
# Drop duplicate rows based on input columns
df = df.dropDuplicates(["City","Center", "Date","Year","Month", "Season","PM1_0", "PM2_5", "PM10", "Temperature_F", "Humidity_%", "AQI" ])

In [0]:
# Select and reorder columns according to the assignment output fields
output_df = df.select(
    col("City"),
    col("Center"),
    col("Date"),
    col("Year_Key"),
    col("Month_Key"),
    col("Day_Key"),
    col("Season"),
    col("PM1_0"),
    col("PM2_5"),
    col("PM10"),
    col("Temperature_F"),
    col("Humidity_%"),
    col("AQI"),
    col("Health_Impact")
)

# Show the DataFrame with the correct output schema
output_df.show(10)

+------+--------------------+-------------------+--------+---------+--------+----------+-----+-----+----+-------------+----------+---+-------------+
|  City|              Center|               Date|Year_Key|Month_Key| Day_Key|    Season|PM1_0|PM2_5|PM10|Temperature_F|Humidity_%|AQI|Health_Impact|
+------+--------------------+-------------------+--------+---------+--------+----------+-----+-----+----+-------------+----------+---+-------------+
|Yangon|         WWF-Myanmar|2020-04-10 00:00:00|    2020|   202004|20200410|Hot Season|  0.0|  0.0|0.58|         74.0|      26.5|0.0|         Good|
|Yangon|Thin Gan Gyun Yan...|2020-04-25 00:00:00|    2020|   202004|20200425|Hot Season|  0.0|  0.0| 0.0|        99.15|     39.62|0.0|         Good|
|Yangon|Thin Gan Gyun Yan...|2020-04-24 00:00:00|    2020|   202004|20200424|Hot Season|  0.0|  0.0| 0.0|        98.73|     42.01|0.0|         Good|
|Yangon|Thin Gan Gyun Yan...|2020-04-22 00:00:00|    2020|   202004|20200422|Hot Season|  0.0|  0.0| 0.0| 

In [0]:
# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Output path for the parquet files
output_path = "/FileStore/tables/myanmar-air-quality/output_files/myanmar_air_quality_parquet"

# Write the DataFrame as parquet files
output_df.write.mode("overwrite").parquet(output_path)

print(f"Data successfully written to {output_path}")

Data successfully written to /FileStore/tables/myanmar-air-quality/output_files/myanmar_air_quality_parquet


In [0]:
# Group by the PM2_5_Range and health_impact, and calculate the average PM2_5
summary_df = df.groupBy("PM2_5_Range", "health_impact", "Description").agg({"PM2_5": "avg"}).withColumnRenamed("avg(PM2_5)", "Average_PM2_5")

# Show the summary
summary_df.show(truncate=False)

+-----------+-------------+--------------------------------------------------------------------+------------------+
|PM2_5_Range|health_impact|Description                                                         |Average_PM2_5     |
+-----------+-------------+--------------------------------------------------------------------+------------------+
|0-50       |Good         |minimal impact                                                      |19.306436068701842|
|101-200    |Moderate     |breathing discomfort for people with asthma, heart disease, or lungs|120.15159090909087|
|51-100     |Satisfactory |minor breathing discomfort for sensitive people                     |67.26554631828975 |
+-----------+-------------+--------------------------------------------------------------------+------------------+



In [0]:
%sql
USE SCHEMA air_quality;

CREATE TABLE IF NOT EXISTS myanmar_air_quality;

CONVERT TO DELTA myanmar_air_quality;

In [0]:
%sql
-- Step 1: Read new or updated data from the Parquet files into a staging table
DROP TABLE IF EXISTS myanmar_air_quality_staging;

CREATE TABLE myanmar_air_quality_staging;

COPY INTO myanmar_air_quality_staging
  FROM '/FileStore/tables/myanmar-air-quality/output_files/myanmar_air_quality_parquet'
  FILEFORMAT = PARQUET
  FORMAT_OPTIONS ('inferSchema' = 'true', 'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
5122,5122,0


In [0]:
%sql

-- Step 2: Merge new/updated data into the target Delta table
MERGE INTO myanmar_air_quality AS target
USING myanmar_air_quality_staging AS source
ON    target.City = source.City
AND   target.Center = source.Center
AND   target.Date = source.Date
AND   target.Season = source.Season
AND   target.PM1_0 = source.PM1_0
AND   target.PM2_5 = source.PM2_5
AND   target.PM10 = source.PM10
AND   target.Temperature_F = source.Temperature_F
AND   target.`Humidity_%` = source.`Humidity_%`
AND   target.AQI = source.AQI
-- Update matched rows
WHEN MATCHED THEN
  UPDATE SET *
-- Insert unmatched rows
WHEN NOT MATCHED THEN
  INSERT *;

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
5122,5122,0,0
