<a href="https://colab.research.google.com/github/NamanVerma27/India_AQI_Analysis-Project/blob/main/aqi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, round, expr

In [None]:
def create_spark_session():
    """
    Create a Spark session for data processing
    """
    return SparkSession.builder \
        .appName("IndianAirQualityAnalysis") \
        .getOrCreate()

In [None]:
def load_air_quality_data(spark, file_path):
    """
    Load air quality dataset with proper schema
    """
    # Read the CSV file
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Convert Date column to date type using expr()
    df = df.withColumn("Date", expr("to_date(Date)"))

    return df

In [None]:
def initial_data_exploration(df):
    """
    Perform initial data exploration and basic cleaning
    """
    # Print basic information about the dataset
    print("Total number of records:", df.count())
    print("\nUnique Cities:", df.select("City").distinct().count())

    # Date range using expr()
    print("\nDate Range:")
    df.select(
        expr("min(Date) as min_date"),
        expr("max(Date) as max_date")
    ).show()

    # Basic column statistics
    print("\nColumn Statistics:")
    columns_to_analyze = ["PM2.5", "PM10", "NO2", "AQI"]
    for column in columns_to_analyze:
        print(f"\n{column} Column Statistics:")
        # Enclose column name in backticks to handle special characters
        df.select(
            expr(f"avg(`{column}`) as mean"),
            expr(f"stddev(`{column}`) as std"),
            expr(f"min(`{column}`) as min"),
            expr(f"max(`{column}`) as max")
        ).show()

    return df

In [None]:
def main():
    # Create Spark Session
    spark = create_spark_session()

    # Load Data - replace with your actual file path
    file_path = "city_day.csv"
    air_quality_df = load_air_quality_data(spark, file_path)

    # Perform Initial Exploration
    initial_data_exploration(air_quality_df)

if __name__ == "__main__":
    main()

Total number of records: 29531

Unique Cities: 26

Date Range:
+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2015-01-01|2020-07-01|
+----------+----------+


Column Statistics:

PM2.5 Column Statistics:
+-----------------+-----------------+----+------+
|             mean|              std| min|   max|
+-----------------+-----------------+----+------+
|67.45057794890272|64.66144945715128|0.04|949.99|
+-----------------+-----------------+----+------+


PM10 Column Statistics:
+------------------+-----------------+----+------+
|              mean|              std| min|   max|
+------------------+-----------------+----+------+
|118.12710293078102|90.60510971779476|0.01|1000.0|
+------------------+-----------------+----+------+


NO2 Column Statistics:
+------------------+------------------+----+------+
|              mean|               std| min|   max|
+------------------+------------------+----+------+
|28.560659061126763|24.474745795589442|0.01|362.21|
+-----

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, round, expr, avg, stddev, max as spark_max

def clean_air_quality_data(df):
    """
    Clean and preprocess air quality data

    Args:
        df (DataFrame): Input air quality DataFrame

    Returns:
        DataFrame: Cleaned and processed DataFrame
    """
    # Handle missing values - Use backticks to escape special characters
    cleaned_df = df.fillna({
        "`PM2.5`": 0,  # Changed "PM2.5" to "`PM2.5`"
        "`PM10`": 0,  # Changed "PM10" to "`PM10`"
        "NO2": 0,
        "AQI": 0
    })

    # Create pollution level categories
    # Changed the when condition to check if PM2.5 is less than or equal to 30 for "Good" category
    cleaned_df = cleaned_df.withColumn("PM2.5_Category",
        when(col("`PM2.5`") <= 30, "Good")
        .when((col("`PM2.5`") > 30) & (col("`PM2.5`") <= 60), "Moderate")
        .when((col("`PM2.5`") > 60) & (col("`PM2.5`") <= 90), "Unhealthy for Sensitive Groups")
        .otherwise("Unhealthy")
    )

    # Round numeric columns  - Use backticks to escape special characters
    numeric_columns = ["`PM2.5`", "`PM10`", "NO2", "AQI"] # Changed "PM2.5" to "`PM2.5`" and "PM10" to "`PM10`"
    for column in numeric_columns:
        cleaned_df = cleaned_df.withColumn(column, round(col(column), 2))

    return cleaned_df

In [None]:
def analyze_city_pollution(df):
    """
    Analyze pollution levels by city

    Args:
        df (DataFrame): Cleaned air quality DataFrame

    Returns:
        DataFrame: City-wise pollution analysis
    """
    city_pollution_analysis = df.groupBy("City").agg(
        round(avg("`PM2.5`"), 2).alias("Avg_PM2.5"),  # Changed "PM2.5" to "`PM2.5`"
        round(stddev("`PM2.5`"), 2).alias("Std_PM2.5"),  # Changed "PM2.5" to "`PM2.5`"
        round(spark_max("`PM2.5`"), 2).alias("Max_PM2.5"),  # Changed "PM2.5" to "`PM2.5`"
        round(avg("AQI"), 2).alias("Avg_AQI"),
        expr("count(*) as Total_Measurements")
    ).orderBy("Avg_AQI", ascending=False)

    return city_pollution_analysis

In [None]:
def main():
    # Create Spark Session
    spark = create_spark_session()

    # Load Data
    file_path = "city_day.csv"
    air_quality_df = load_air_quality_data(spark, file_path)

    # Clean Data
    cleaned_df = clean_air_quality_data(air_quality_df)

    # Analyze City-wise Pollution
    city_pollution = analyze_city_pollution(cleaned_df)

    # Show results
    city_pollution.show(10)

    # Optional: Save results
    city_pollution.write.csv("city_pollution_analysis.csv", header=True, mode="overwrite")

if __name__ == "__main__":
    main()

+---------+---------+---------+---------+-------+------------------+
|     City|Avg_PM2.5|Std_PM2.5|Max_PM2.5|Avg_AQI|Total_Measurements|
+---------+---------+---------+---------+-------+------------------+
|Ahmedabad|    46.64|    45.41|   381.69| 300.22|              2009|
|    Delhi|   117.08|    82.95|   685.36|  258.2|              2009|
|  Lucknow|   104.14|    81.35|   742.67| 205.39|              2009|
| Gurugram|   106.36|   100.27|   949.99| 194.82|              1679|
|    Patna|   102.16|    97.55|    645.5| 189.07|              1858|
| Guwahati|    63.56|    61.64|   916.67| 138.16|               502|
|   Jaipur|    53.91|     27.0|   311.35| 131.28|              1114|
|  Talcher|    49.19|    51.13|   354.44| 130.46|               925|
|  Kolkata|    60.01|    58.64|   304.74| 130.21|               814|
|   Bhopal|     48.5|    31.19|   136.42| 127.77|               289|
+---------+---------+---------+---------+-------+------------------+
only showing top 10 rows

