In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max, min, countDistinct, round, when, isnan


In [None]:
# 1️⃣ Create Spark session
spark = SparkSession.builder \
    .appName("Air Quality Analysis") \
    .getOrCreate()

In [None]:

# 2️⃣ Load dataset
file_path = "air_quality_data.csv"  # 🔹 Change this to your CSV path
df = spark.read.csv(file_path, header=True, inferSchema=True)

print("Schema:")
df.printSchema()

print("\nSample data:")
df.show(5, truncate=False)

In [None]:

# 3️⃣ Clean data: handle missing/NA values in pollutant columns
df_clean = df.replace('NA', None)
df_clean = df_clean.na.drop(subset=["pollutant_avg"])


In [None]:

# 4️⃣ Count total unique monitoring stations
unique_stations = df_clean.select(countDistinct("station")).collect()[0][0]
print(f"\nNumber of unique monitoring stations: {unique_stations}")


In [None]:

# 5️⃣ Average pollutant concentration by state
print("\nAverage pollutant levels by state:")
df_clean.groupBy("state").agg(
    round(avg("pollutant_avg"), 2).alias("Avg_Pollutant_Level")
).orderBy(col("Avg_Pollutant_Level").desc()).show(10)

In [None]:

# 6️⃣ Most common pollutants monitored
print("\nMost common pollutants monitored:")
df_clean.groupBy("pollutant_id").count().orderBy(col("count").desc()).show()

In [None]:
# 7️⃣ Maximum and minimum pollution values by pollutant
print("\nMin/Max/Average values for each pollutant:")
df_clean.groupBy("pollutant_id").agg(
    round(min("pollutant_min"), 2).alias("Min_Value"),
    round(max("pollutant_max"), 2).alias("Max_Value"),
    round(avg("pollutant_avg"), 2).alias("Avg_Value")
).orderBy(col("Avg_Value").desc()).show(10)


In [None]:
# 8️⃣ Top polluted cities based on average pollutant value
print("\nTop polluted cities (based on average pollutant value):")
df_clean.groupBy("city").agg(
    round(avg("pollutant_avg"), 2).alias("City_Avg_Pollution")
).orderBy(col("City_Avg_Pollution").desc()).show(10)


In [None]:
# 9️⃣ Identify critical pollutants (where average > 50)
print("\nCritical pollutants (avg level > 50):")
df_clean.filter(col("pollutant_avg") > 50).select(
    "state", "city", "station", "pollutant_id", "pollutant_avg"
).show(10)

In [None]:

# 🔟 Calculate AQI indicator (simplified version)
df_aqi = df_clean.withColumn(
    "AQI_Category",
    when(col("pollutant_avg") <= 30, "Good")
    .when((col("pollutant_avg") > 30) & (col("pollutant_avg") <= 60), "Moderate")
    .when((col("pollutant_avg") > 60) & (col("pollutant_avg") <= 90), "Poor")
    .otherwise("Severe")
)

In [None]:

print("\nAQI categories for each record:")
df_aqi.select("state", "city", "pollutant_id", "pollutant_avg", "AQI_Category").show(10)


In [None]:

# 1️⃣1️⃣ Save results (optional)
output_path = "processed_air_quality_data.csv"
df_aqi.write.mode("overwrite").csv(output_path, header=True)
print(f"\n✅ Processed data with AQI categories saved at: {output_path}")

In [None]:

# Stop Spark session
spark.stop()
