<a href="https://colab.research.google.com/github/JanithRankelum/Air-Pollution-Analysis/blob/janith/Untitled9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Data Preprocessing with apache spark

In [2]:
# 1. Data Loading
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataPreprocessing") \
    .getOrCreate()

# Load the CSV file into a Spark DataFrame
df = spark.read.csv("/content/drive/MyDrive/openaq.csv", header=True, sep=";", inferSchema=True)

# Show the first few rows of the DataFrame
df.show(5)

+------------+--------------------+-------------------+--------------------+---------+-----------+-----+-----------+-------------------+-------------+
|Country Code|                City|           Location|         Coordinates|Pollutant|Source Name| Unit|      Value|       Last Updated|Country Label|
+------------+--------------------+-------------------+--------------------+---------+-----------+-----+-----------+-------------------+-------------+
|          RS|            Belgrade|Belgrade-Stari grad|44.82111999994846...|      NO2|        eea|µg/m³|   44.85432|2024-08-01 07:00:00|       Serbia|
|          RS|          Novi Pazar|         Novi Pazar|43.13970399996856...|      SO2|        eea|µg/m³|16.30967665|2024-07-31 07:00:00|       Serbia|
|          RS|DROBETA TURNU SEV...|               MH-1|44.62663300021146...|      NO2|        eea|µg/m³|       -1.0|2024-08-01 07:00:00|       Serbia|
|          RS|              Vranje|             Vranje|42.55112499985959...|       CO|        

In [3]:
# 2. Data Cleansing
# Check for missing values

from pyspark.sql.functions import col, count, when

# Check for missing values
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Drop rows with missing values in critical columns
df_cleaned = df.na.drop(subset=["Value", "Last Updated"])

# Remove duplicate rows
df_cleaned = df_cleaned.dropDuplicates()

# Show the cleaned DataFrame
df_cleaned.show(5)


+------------+-----+--------+-----------+---------+-----------+----+-----+------------+-------------+
|Country Code| City|Location|Coordinates|Pollutant|Source Name|Unit|Value|Last Updated|Country Label|
+------------+-----+--------+-----------+---------+-----------+----+-----+------------+-------------+
|           0|28538|       0|         70|        0|          0|   0|    0|           0|          127|
+------------+-----+--------+-----------+---------+-----------+----+-----+------------+-------------+

+------------+---------------+--------------------+--------------------+---------+--------------+-----+--------+-------------------+-------------+
|Country Code|           City|            Location|         Coordinates|Pollutant|   Source Name| Unit|   Value|       Last Updated|Country Label|
+------------+---------------+--------------------+--------------------+---------+--------------+-----+--------+-------------------+-------------+
|          TR|           Uşak|         Uşak (Yen

In [4]:
# 3. Data Transformation

from pyspark.sql.functions import col, to_timestamp, year, mean, stddev

# Convert 'Last Updated' column to timestamp
df_transformed = df_cleaned.withColumn("Last Updated", to_timestamp(col("Last Updated")))

# Create a new feature: Year from 'Last Updated'
df_transformed = df_transformed.withColumn("Year", year(col("Last Updated")))

# Compute mean and standard deviation for normalization
stats = df_transformed.select(mean(col("Value")).alias("mean"), stddev(col("Value")).alias("stddev")).collect()[0]
mean_value, stddev_value = stats["mean"], stats["stddev"]

# Ensure stddev is not zero to avoid division errors
if stddev_value and stddev_value != 0:
    df_transformed = df_transformed.withColumn("Value_Normalized", (col("Value") - mean_value) / stddev_value)
else:
    df_transformed = df_transformed.withColumn("Value_Normalized", col("Value"))

# Show the transformed DataFrame
df_transformed.show(5)


+------------+---------------+--------------------+--------------------+---------+--------------+-----+--------+-------------------+-------------+----+--------------------+
|Country Code|           City|            Location|         Coordinates|Pollutant|   Source Name| Unit|   Value|       Last Updated|Country Label|Year|    Value_Normalized|
+------------+---------------+--------------------+--------------------+---------+--------------+-----+--------+-------------------+-------------+----+--------------------+
|          TR|           Uşak|         Uşak (Yeni)|38.6639258113343,...|       CO|       Turkiye|µg/m³|1373.504|2023-05-31 00:00:00|       Turkey|2023|0.029000433850905845|
|          ES|       Asturias|             ES0008R|43.4391699994639,...|       NO|     EEA Spain|µg/m³|    0.64|2024-01-16 00:00:00|        Spain|2024|-0.02078703539679573|
|          AT|3100 St. Pölten|St. Pölten Eybner...|48.21160000001331...|       O3|           eea|µg/m³|   5.887|2025-01-31 08:00:00|   

In [5]:
# 4. Feature Engineering
from pyspark.sql.functions import when

# Create a new feature: Pollutant Category based on 'Pollutant'
df_transformed = df_transformed.withColumn(
    "Pollutant_Category",
    when(col("Pollutant").isin(["NO2", "SO2"]), "Gas")
    .when(col("Pollutant").isin(["PM2.5", "PM10"]), "Particulate")
    .otherwise("Other")
)

# Show the DataFrame with new features
df_transformed.show(5)

+------------+---------------+--------------------+--------------------+---------+--------------+-----+--------+-------------------+-------------+----+--------------------+------------------+
|Country Code|           City|            Location|         Coordinates|Pollutant|   Source Name| Unit|   Value|       Last Updated|Country Label|Year|    Value_Normalized|Pollutant_Category|
+------------+---------------+--------------------+--------------------+---------+--------------+-----+--------+-------------------+-------------+----+--------------------+------------------+
|          TR|           Uşak|         Uşak (Yeni)|38.6639258113343,...|       CO|       Turkiye|µg/m³|1373.504|2023-05-31 00:00:00|       Turkey|2023|0.029000433850905845|             Other|
|          ES|       Asturias|             ES0008R|43.4391699994639,...|       NO|     EEA Spain|µg/m³|    0.64|2024-01-16 00:00:00|        Spain|2024|-0.02078703539679573|             Other|
|          AT|3100 St. Pölten|St. Pölten

In [6]:
# 5. Comparison with pandas
import pandas as pd
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, year, mean, stddev, when, count, isnan

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataPreprocessing") \
    .getOrCreate()

# Load the CSV file into a Spark DataFrame
start_time = time.time()
df_spark = spark.read.csv("/content/drive/MyDrive/openaq.csv", header=True, sep=";", inferSchema=True)
print(f"Spark Loading Time: {time.time() - start_time:.4f} seconds")

# Data Cleansing
start_time = time.time()
df_spark_cleaned = df_spark.na.drop(subset=["Value", "Last Updated"]).dropDuplicates()
df_spark_cleaned.cache()  # Cache for better performance
print(f"Spark Cleansing Time: {time.time() - start_time:.4f} seconds")

# Data Transformation
start_time = time.time()
df_spark_cleaned = df_spark_cleaned.withColumn("Last Updated", to_timestamp(col("Last Updated")))
df_spark_cleaned = df_spark_cleaned.withColumn("Year", year(col("Last Updated")))

# Compute mean and stddev safely
stats = df_spark_cleaned.select(mean(col("Value")).alias("mean"), stddev(col("Value")).alias("stddev")).collect()[0]
mean_value, stddev_value = stats["mean"], stats["stddev"]

# Ensure stddev is not zero to prevent division errors
if stddev_value and stddev_value != 0:
    df_spark_cleaned = df_spark_cleaned.withColumn("Value_Normalized", (col("Value") - mean_value) / stddev_value)
else:
    df_spark_cleaned = df_spark_cleaned.withColumn("Value_Normalized", col("Value"))

print(f"Spark Transformation Time: {time.time() - start_time:.4f} seconds")

# Feature Engineering
start_time = time.time()
df_spark_cleaned = df_spark_cleaned.withColumn(
    "Pollutant_Category",
    when(col("Pollutant").isin(["NO2", "SO2"]), "Gas")
    .when(col("Pollutant").isin(["PM2.5", "PM10"]), "Particulate")
    .otherwise("Other")
)
print(f"Spark Feature Engineering Time: {time.time() - start_time:.4f} seconds")

# Stop Spark session
spark.stop()

# Pandas Processing
# Load the CSV file into a Pandas DataFrame
start_time = time.time()
df_pandas = pd.read_csv("/content/drive/MyDrive/openaq.csv", sep=";")
print(f"Pandas Loading Time: {time.time() - start_time:.4f} seconds")

# Data Cleansing
start_time = time.time()
df_pandas_cleaned = df_pandas.dropna(subset=["Value", "Last Updated"]).drop_duplicates()
print(f"Pandas Cleansing Time: {time.time() - start_time:.4f} seconds")

# Data Transformation
start_time = time.time()
df_pandas_cleaned["Last Updated"] = pd.to_datetime(df_pandas_cleaned["Last Updated"])
df_pandas_cleaned["Year"] = df_pandas_cleaned["Last Updated"].dt.year

# Compute mean and stddev safely
mean_value_pandas = df_pandas_cleaned["Value"].mean()
stddev_value_pandas = df_pandas_cleaned["Value"].std()

# Handle stddev == 0 case
if stddev_value_pandas and stddev_value_pandas != 0:
    df_pandas_cleaned["Value_Normalized"] = (df_pandas_cleaned["Value"] - mean_value_pandas) / stddev_value_pandas
else:
    df_pandas_cleaned["Value_Normalized"] = df_pandas_cleaned["Value"]

print(f"Pandas Transformation Time: {time.time() - start_time:.4f} seconds")

# Feature Engineering
start_time = time.time()
df_pandas_cleaned["Pollutant_Category"] = df_pandas_cleaned["Pollutant"].apply(
    lambda x: "Gas" if x in ["NO2", "SO2"] else ("Particulate" if x in ["PM2.5", "PM10"] else "Other")
)
print(f"Pandas Feature Engineering Time: {time.time() - start_time:.4f} seconds")


Spark Loading Time: 1.6907 seconds
Spark Cleansing Time: 0.1111 seconds
Spark Transformation Time: 13.2037 seconds
Spark Feature Engineering Time: 0.0419 seconds
Pandas Loading Time: 0.2302 seconds
Pandas Cleansing Time: 0.0851 seconds
Pandas Transformation Time: 0.0545 seconds
Pandas Feature Engineering Time: 0.0136 seconds


In [None]:
import time
import threading
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, mean, max, min, to_timestamp

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RealTimeAnalytics") \
    .getOrCreate()

# Define API URL
API_URL = "https://public.opendatasoft.com//api/explore/v2.1/catalog/datasets/openaq/records?select=*&where=%22sri%20lanka%22&limit=20&lang=en"

# Define Schema for the DataFrame
schema = StructType([
    StructField("country", StringType(), True),
    StructField("city", StringType(), True),
    StructField("location", StringType(), True),
    StructField("lon", DoubleType(), True),
    StructField("lat", DoubleType(), True),
    StructField("measurements_parameter", StringType(), True),
    StructField("measurements_sourcename", StringType(), True),
    StructField("measurements_unit", StringType(), True),
    StructField("measurements_value", DoubleType(), True),
    StructField("measurements_lastupdated", StringType(), True),
    StructField("country_name_en", StringType(), True)
])

# Create an empty DataFrame with the defined schema
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

# Function to fetch API data
def fetch_api_data():
    response = requests.get(API_URL)
    if response.status_code == 200:
        raw_data = response.json().get("results", [])

        # Extract and format the required fields
        formatted_data = [
            {
                "country": item.get("country"),
                "city": item.get("city", "Unknown"),  # Handle missing city
                "location": item.get("location"),
                "lon": item.get("coordinates", {}).get("lon"),
                "lat": item.get("coordinates", {}).get("lat"),
                "measurements_parameter": item.get("measurements_parameter"),
                "measurements_sourcename": item.get("measurements_sourcename"),
                "measurements_unit": item.get("measurements_unit"),
                "measurements_value": item.get("measurements_value"),
                "measurements_lastupdated": item.get("measurements_lastupdated"),
                "country_name_en": item.get("country_name_en")
            }
            for item in raw_data
        ]

        return formatted_data
    else:
        return []

# Function to update DataFrame with new API data every 10 seconds
def update_dataframe():
    global df
    while True:
        # Fetch new data from API
        new_data = fetch_api_data()

        if new_data:
            # Convert new API data to a Spark DataFrame
            new_df = spark.createDataFrame(new_data, schema=schema)

            # Convert 'measurements_lastupdated' to TimestampType
            new_df = new_df.withColumn("measurements_lastupdated", to_timestamp(col("measurements_lastupdated")))

            # Append new data to the existing DataFrame
            df = df.union(new_df)

            # Show the latest data
            print("\n🔄 Latest Incoming Data:")
            df.show(truncate=False)

            # Compute summary statistics
            summary = df.select(
                mean("measurements_value").alias("mean_value"),
                max("measurements_value").alias("max_value"),
                min("measurements_value").alias("min_value")
            ).collect()[0]

            print("\n📊 Summary Statistics:")
            print(f"Mean Value: {summary['mean_value']}")
            print(f"Max Value: {summary['max_value']}")
            print(f"Min Value: {summary['min_value']}")

            # Detect anomalies (e.g., values above a threshold)
            threshold = 100  # Example threshold
            anomalies = df.filter(col("measurements_value") > threshold)

            if anomalies.count() > 0:
                print("\n🚨 Anomalies Detected:")
                anomalies.show(truncate=False)

        time.sleep(10)  # Fetch new data every 10 seconds

# Run the update function in a separate thread
thread = threading.Thread(target=update_dataframe, daemon=True)
thread.start()

# Keep the Spark application running
while True:
    time.sleep(60)  # Keep alive




🔄 Latest Incoming Data:
+-------+-------+---------------------------+---------+--------+----------------------+-----------------------+-----------------+------------------+------------------------+---------------+
|country|city   |location                   |lon      |lat     |measurements_parameter|measurements_sourcename|measurements_unit|measurements_value|measurements_lastupdated|country_name_en|
+-------+-------+---------------------------+---------+--------+----------------------+-----------------------+-----------------+------------------+------------------------+---------------+
|LK     |Colombo|US Diplomatic Post: Colombo|79.848684|6.913253|PM2.5                 |StateAir_Colombo       |µg/m³            |-999.0            |2025-01-31 08:30:00     |Sri Lanka      |
|LK     |N/A    |Colombo                    |79.875341|6.909445|PM2.5                 |AirNow                 |µg/m³            |10.0              |2025-01-31 08:30:00     |Sri Lanka      |
+-------+-------+--------