In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Install Java (required for Spark)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Copy Spark archive from Drive or download if not present
import os

spark_tgz_path = "/content/drive/MyDrive/spark-3.5.0-bin-hadoop3.tgz"
if not os.path.exists(spark_tgz_path):
    !wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz -O "$spark_tgz_path"

# Copy to working directory and extract
!cp "$spark_tgz_path" .
!tar -xzf spark-3.5.0-bin-hadoop3.tgz

# Set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Install findspark
!pip install -q findspark
import findspark
findspark.init()



In [3]:
from pyspark.sql import SparkSession
import os

# Initialize Spark session
spark = SparkSession.builder.appName("ConcatenateCSV").getOrCreate()

# Path to your directory containing the CSV files
csv_directory = "/content/drive/MyDrive/AQI_Weather_Analyser_Project"  # Replace this with your actual path

# List all CSV files in the directory
csv_files = [os.path.join(csv_directory, f) for f in os.listdir(csv_directory) if f.endswith(".csv")]

# Read all CSV files into a DataFrame
df = spark.read.option("header", "true").csv(csv_files)

# Show the schema to confirm
df.printSchema()

# Show the first few rows to inspect the data
df.show(5)


root
 |-- From Date: string (nullable = true)
 |-- To Date: string (nullable = true)
 |-- PM2.5 (ug/m3): string (nullable = true)
 |-- PM10 (ug/m3): string (nullable = true)
 |-- NO (ug/m3): string (nullable = true)
 |-- NO2 (ug/m3): string (nullable = true)
 |-- NOx (ppb): string (nullable = true)
 |-- NH3 (ug/m3): string (nullable = true)
 |-- SO2 (ug/m3): string (nullable = true)
 |-- CO (mg/m3): string (nullable = true)
 |-- Ozone (ug/m3): string (nullable = true)
 |-- Benzene (ug/m3): string (nullable = true)
 |-- Toluene (ug/m3): string (nullable = true)
 |-- Eth-Benzene (ug/m3): string (nullable = true)
 |-- MP-Xylene (ug/m3): string (nullable = true)
 |-- O Xylene (ug/m3): string (nullable = true)
 |-- Temp (degree C): string (nullable = true)
 |-- RH (%): string (nullable = true)
 |-- WS (m/s): string (nullable = true)
 |-- WD (deg): string (nullable = true)
 |-- SR (W/mt2): string (nullable = true)
 |-- BP (mmHg): string (nullable = true)
 |-- VWS (m/s): string (nullable = tr

In [4]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, to_timestamp, when

# Step 1: Rename columns to uppercase with underscores
renamed_cols = {
    "From Date": "FROM_DATE",
    "To Date": "TO_DATE",
    "PM2.5 (ug/m3)": "PM25",
    "PM10 (ug/m3)": "PM10",
    "NO (ug/m3)": "NO",
    "NO2 (ug/m3)": "NO2",
    "NOx (ppb)": "NOX",
    "NH3 (ug/m3)": "NH3",
    "SO2 (ug/m3)": "SO2",
    "CO (mg/m3)": "CO",
    "Ozone (ug/m3)": "OZONE",
    "Benzene (ug/m3)": "BENZENE",
    "Toluene (ug/m3)": "TOLUENE",
    "Eth-Benzene (ug/m3)": "ETH_BENZENE",
    "MP-Xylene (ug/m3)": "MP_XYLENE",
    "O Xylene (ug/m3)": "O_XYLENE",
    "Temp (degree C)": "TEMP_C",
    "RH (%)": "RH",
    "WS (m/s)": "WS",
    "WD (deg)": "WD",
    "SR (W/mt2)": "SR",
    "BP (mmHg)": "BP",
    "VWS (m/s)": "VWS",
    "CH4 (ug/m3)": "CH4",
    "NMHC ()": "NMHC",
    "THC (ug/m3)": "THC"
}

# Rename the columns
for old, new in renamed_cols.items():
    df = df.withColumnRenamed(old, new)

# Step 2: Convert date columns
df = df.withColumn("FROM_DATE", to_timestamp("FROM_DATE", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("TO_DATE", to_timestamp("TO_DATE", "yyyy-MM-dd HH:mm:ss"))

# Step 3: Clean and cast all other columns to float
def safe_cast_to_float(col_name):
    return when(
        (col(col_name).isNull()) |
        (col(col_name) == "") |
        (col(col_name) == "null") |
        (col(col_name) == "NULL") |
        (col(col_name) == "-"),
        None
    ).otherwise(col(col_name).cast(FloatType()))

# Apply the safe float casting to all non-date columns
for col_name in df.columns:
    if col_name not in ["FROM_DATE", "TO_DATE"]:
        df = df.withColumn(col_name, safe_cast_to_float(col_name))


In [None]:
from pyspark.sql.functions import col, count, when

float_cols = [c for c in df.columns if c not in ["FROM_DATE", "TO_DATE"]]

null_counts = df.select([
    count(when(col(c).isNull(), c)).alias(c + "_null_count")
    for c in float_cols
])

null_counts.show(truncate=False)


In [5]:
rows = df.count()
cols = len(df.columns)
print(f"DataFrame size: {rows} rows × {cols} columns")


DataFrame size: 1339113 rows × 26 columns


In [6]:
from pyspark.sql.functions import count, col, when

total_rows = df.count()

null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()

null_percent = {k: (v / total_rows) * 100 for k, v in null_counts.items()}

for col_name, pct in null_percent.items():
    print(f"{col_name}: {pct:.2f}% nulls")



FROM_DATE: 0.53% nulls
TO_DATE: 0.53% nulls
PM25: 34.26% nulls
PM10: 41.84% nulls
NO: 26.01% nulls
NO2: 23.68% nulls
NOX: 24.08% nulls
NH3: 35.76% nulls
SO2: 32.26% nulls
CO: 29.97% nulls
OZONE: 33.33% nulls
BENZENE: 52.48% nulls
TOLUENE: 51.59% nulls
ETH_BENZENE: 81.34% nulls
MP_XYLENE: 71.84% nulls
O_XYLENE: 66.02% nulls
TEMP_C: 57.52% nulls
RH: 40.33% nulls
WS: 40.58% nulls
WD: 57.41% nulls
SR: 67.59% nulls
BP: 70.64% nulls
VWS: 81.17% nulls
CH4: 99.32% nulls
NMHC: 100.00% nulls
THC: 97.25% nulls


In [7]:
threshold = 85.0

# Get columns to drop
cols_to_drop = [col_name for col_name, pct in null_percent.items() if pct >= threshold]

# Drop columns
df = df.drop(*cols_to_drop)

print(f"Dropped columns: {cols_to_drop}")



Dropped columns: ['CH4', 'NMHC', 'THC']


In [8]:
# Rename date columns
df = df.withColumnRenamed("From_Date", "Start_Date") \
       .withColumnRenamed("To_Date", "End_Date")

# Confirm changes
df.printSchema()
df.select("Start_Date", "End_Date").show(5)


root
 |-- Start_Date: timestamp (nullable = true)
 |-- End_Date: timestamp (nullable = true)
 |-- PM25: float (nullable = true)
 |-- PM10: float (nullable = true)
 |-- NO: float (nullable = true)
 |-- NO2: float (nullable = true)
 |-- NOX: float (nullable = true)
 |-- NH3: float (nullable = true)
 |-- SO2: float (nullable = true)
 |-- CO: float (nullable = true)
 |-- OZONE: float (nullable = true)
 |-- BENZENE: float (nullable = true)
 |-- TOLUENE: float (nullable = true)
 |-- ETH_BENZENE: float (nullable = true)
 |-- MP_XYLENE: float (nullable = true)
 |-- O_XYLENE: float (nullable = true)
 |-- TEMP_C: float (nullable = true)
 |-- RH: float (nullable = true)
 |-- WS: float (nullable = true)
 |-- WD: float (nullable = true)
 |-- SR: float (nullable = true)
 |-- BP: float (nullable = true)
 |-- VWS: float (nullable = true)

+-------------------+-------------------+
|         Start_Date|           End_Date|
+-------------------+-------------------+
|2010-01-03 14:00:00|2010-01-03 15:00:0

In [9]:
from pyspark.sql.functions import col, isnull

date_cols = ["START_DATE", "END_DATE"]
non_date_cols = [c for c in df.columns if c not in date_cols]

# Fill nulls only in non-date columns
df = df.fillna(0, subset=non_date_cols)

# Confirm no nulls remain in non-date columns and date columns unchanged
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()



+----------+--------+----+----+---+---+---+---+---+---+-----+-------+-------+-----------+---------+--------+------+---+---+---+---+---+---+
|Start_Date|End_Date|PM25|PM10| NO|NO2|NOX|NH3|SO2| CO|OZONE|BENZENE|TOLUENE|ETH_BENZENE|MP_XYLENE|O_XYLENE|TEMP_C| RH| WS| WD| SR| BP|VWS|
+----------+--------+----+----+---+---+---+---+---+---+-----+-------+-------+-----------+---------+--------+------+---+---+---+---+---+---+
|      7054|    7054|   0|   0|  0|  0|  0|  0|  0|  0|    0|      0|      0|          0|        0|       0|     0|  0|  0|  0|  0|  0|  0|
+----------+--------+----+----+---+---+---+---+---+---+-----+-------+-------+-----------+---------+--------+------+---+---+---+---+---+---+



In [10]:
df = df.na.drop(subset=["START_DATE", "END_DATE"])


In [11]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----------+--------+----+----+---+---+---+---+---+---+-----+-------+-------+-----------+---------+--------+------+---+---+---+---+---+---+
|Start_Date|End_Date|PM25|PM10| NO|NO2|NOX|NH3|SO2| CO|OZONE|BENZENE|TOLUENE|ETH_BENZENE|MP_XYLENE|O_XYLENE|TEMP_C| RH| WS| WD| SR| BP|VWS|
+----------+--------+----+----+---+---+---+---+---+---+-----+-------+-------+-----------+---------+--------+------+---+---+---+---+---+---+
|         0|       0|   0|   0|  0|  0|  0|  0|  0|  0|    0|      0|      0|          0|        0|       0|     0|  0|  0|  0|  0|  0|  0|
+----------+--------+----+----+---+---+---+---+---+---+-----+-------+-------+-----------+---------+--------+------+---+---+---+---+---+---+



In [12]:

from pyspark.sql import functions as F

df.select("PM25", "PM10", "NOX", "NO2").describe().show()
df.select([F.count(F.when(F.col(c).isNull() | (F.col(c) == 0), c)).alias(c + "_null_or_zero") for c in ["PM25", "PM10", "NOX", "NO2"]]).show()

+-------+------------------+-----------------+------------------+------------------+
|summary|              PM25|             PM10|               NOX|               NO2|
+-------+------------------+-----------------+------------------+------------------+
|  count|           1332059|          1332059|           1332059|           1332059|
|   mean|23.292261648788944| 34.3875124209823|16.039573404615517|15.564740330857443|
| stddev| 39.51423048109861|52.43245887574646| 23.05468558896255|19.283755714511386|
|    min|               0.0|              0.0|               0.0|               0.0|
|    max|            999.99|           999.99|            498.79|            499.99|
+-------+------------------+-----------------+------------------+------------------+

+-----------------+-----------------+----------------+----------------+
|PM25_null_or_zero|PM10_null_or_zero|NOX_null_or_zero|NO2_null_or_zero|
+-----------------+-----------------+----------------+----------------+
|           458718

In [13]:
for c in df.columns:
    print(f"Unique values for column '{c}':")
    df.select(c).distinct().show(65, truncate=False)  # Show up to 65 unique values
    print("\n" + "-"*50)  # Separator for better readability

Unique values for column 'Start_Date':
+-------------------+
|Start_Date         |
+-------------------+
|2010-01-08 01:00:00|
|2010-01-16 10:00:00|
|2010-01-21 09:00:00|
|2010-02-03 15:00:00|
|2010-02-09 12:00:00|
|2010-02-15 10:00:00|
|2010-02-19 06:00:00|
|2010-03-19 01:00:00|
|2010-03-19 21:00:00|
|2010-03-25 00:00:00|
|2010-04-19 00:00:00|
|2010-05-03 00:00:00|
|2010-05-11 23:00:00|
|2010-05-21 10:00:00|
|2010-06-08 09:00:00|
|2010-06-20 17:00:00|
|2010-07-20 12:00:00|
|2010-07-22 21:00:00|
|2010-08-13 10:00:00|
|2010-08-15 17:00:00|
|2010-08-21 00:00:00|
|2010-09-04 10:00:00|
|2010-09-23 10:00:00|
|2010-09-24 02:00:00|
|2010-09-24 18:00:00|
|2010-09-25 19:00:00|
|2010-09-27 09:00:00|
|2010-10-01 17:00:00|
|2010-10-01 23:00:00|
|2010-10-05 12:00:00|
|2010-10-22 00:00:00|
|2010-11-02 00:00:00|
|2010-11-12 07:00:00|
|2010-11-18 22:00:00|
|2010-11-19 12:00:00|
|2010-11-22 14:00:00|
|2010-11-25 11:00:00|
|2010-11-25 17:00:00|
|2010-12-09 10:00:00|
|2010-12-19 15:00:00|
|2010-12-27 03:

In [14]:
from pyspark.sql.functions import mean, expr

numeric_cols = [c for c in df.columns if c not in ["Start_Date", "End_Date"]]

# Mean for numeric columns
mean_stats = df.select([mean(c).alias(c) for c in numeric_cols])
mean_stats.show()

# Median requires approxQuantile since Spark has no built-in median
median_stats = {}
for c in numeric_cols:
    median = df.approxQuantile(c, [0.5], 0.01)[0]
    median_stats[c] = median

print("Median values:")
for col_name, median_val in median_stats.items():
    print(f"{col_name}: {median_val}")


+------------------+----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+----------------+------------------+------------------+------------------+------------------+-----------------+
|              PM25|            PM10|               NO|               NO2|               NOX|              NH3|              SO2|                CO|             OZONE|          BENZENE|          TOLUENE|       ETH_BENZENE|        MP_XYLENE|          O_XYLENE|            TEMP_C|              RH|                WS|                WD|                SR|                BP|              VWS|
+------------------+----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------

In [15]:
from pyspark.sql.functions import col, when

# 1. PM Ratio with zero-division check
df = df.withColumn(
    "PM_RATIO",
    when(col("PM10") != 0, col("PM25") / col("PM10")).otherwise(None)
)

# 2. NOx to NO2 Ratio with zero-division check
df = df.withColumn(
    "NOX_TO_NO2",
    when(col("NO2") != 0, col("NOX") / col("NO2")).otherwise(None)
)

# 3. PM2.5 * Wind Speed Interaction (null-safe)
df = df.withColumn(
    "PM25_WIND_SCALED",
    when(col("PM25").isNotNull() & col("WS").isNotNull(),
         col("PM25") * col("WS")).otherwise(None)
)

# 4. Humidity * Temperature Interaction (null-safe)
df = df.withColumn(
    "HUMIDITY_TEMP_INDEX",
    when(col("RH").isNotNull() & col("TEMP_C").isNotNull(),
         col("RH") * col("TEMP_C")).otherwise(None)
)


In [16]:
from pyspark.sql.functions import col, when, lit

# Define your CPCB breakpoints using your DataFrame's column names
cpcb_breakpoints = {
    "PM25":   [(0,30,0,50), (31,60,51,100), (61,90,101,200), (91,120,201,300), (121,250,301,400), (251,500,401,500)],
    "PM10":   [(0,50,0,50), (51,100,51,100), (101,250,101,200), (251,350,201,300), (351,430,301,400), (431,500,401,500)],
    "NO2":    [(0,40,0,50), (41,80,51,100), (81,180,101,200), (181,280,201,300), (281,400,301,400), (401,1000,401,500)],
    "SO2":    [(0,40,0,50), (41,80,51,100), (81,380,101,200), (381,800,201,300), (801,1600,301,400), (1601,2100,401,500)],
    "CO":     [(0,1,0,50), (1.1,2,51,100), (2.1,10,101,200), (10.1,17,201,300), (17.1,34,301,400), (34.1,50,401,500)],
    "OZONE":  [(0,50,0,50), (51,100,51,100), (101,168,101,200), (169,208,201,300), (209,748,301,400), (749,1000,401,500)],
    "NH3":    [(0,200,0,50), (201,400,51,100), (401,800,101,200), (801,1200,201,300), (1201,1800,301,400), (1801,2400,401,500)]
}

# AQI expression builder
def compute_aqi_expr(pollutant, breakpoints):
    expr = None
    for (clow, chigh, aqi_low, aqi_high) in breakpoints:
        cond = (col(pollutant) >= clow) & (col(pollutant) <= chigh)
        formula = ((col(pollutant) - clow) / (chigh - clow)) * (aqi_high - aqi_low) + aqi_low
        expr = when(cond, formula) if expr is None else expr.when(cond, formula)
    return expr.otherwise(lit(None))

# Apply AQI calculations
for pollutant in cpcb_breakpoints:
    aqi_col = f"AQI_{pollutant}"
    df = df.withColumn(aqi_col, compute_aqi_expr(pollutant, cpcb_breakpoints[pollutant]))


In [17]:
from pyspark.sql.functions import greatest, round as spark_round

df = df.withColumn("AQI_CPCB", greatest(
    col("AQI_PM25"),
    col("AQI_PM10"),
    col("AQI_NO2"),
    col("AQI_SO2"),
    col("AQI_CO"),
    col("AQI_OZONE"),
    col("AQI_NH3")
))

# Optional rounding
df = df.withColumn("AQI_CPCB", spark_round("AQI_CPCB"))


In [18]:
from pyspark.sql.functions import when

df = df.withColumn("AQI_Category", when(col("AQI_CPCB") <= 50, "Good")
    .when(col("AQI_CPCB") <= 100, "Satisfactory")
    .when(col("AQI_CPCB") <= 200, "Moderate")
    .when(col("AQI_CPCB") <= 300, "Poor")
    .when(col("AQI_CPCB") <= 400, "Very Poor")
    .otherwise("Severe"))


In [19]:
# Rename Start_Date to Start_TS if necessary
df = df.withColumnRenamed("Start_Date", "Start_TS")

from pyspark.sql.window import Window
from pyspark.sql.functions import avg, lag, max, col, unix_timestamp

# Ensure Start_TS is in timestamp format
df = df.withColumn("Start_TS_unix", unix_timestamp("Start_TS"))

# Rolling and lag window (row-based)
row_window = Window.orderBy("Start_TS").rowsBetween(-2, 0)
lag_window = Window.orderBy("Start_TS")

# Time-based window (past 7 days in seconds)
time_window_7d = Window.orderBy("Start_TS_unix").rangeBetween(-7 * 86400, 0)

# PM2.5 features
df = df.withColumn("Rolling_PM25_mean", avg("PM25").over(row_window))
df = df.withColumn("Lag_PM25_1h", lag("PM25", 1).over(lag_window))

# Temperature max over 7 days
df = df.withColumn("Rolling_Temperature_max", max("TEMP_C").over(time_window_7d))

# Add same features for PM10, NO2, CO, SO2
for pollutant in ["PM10", "NO2", "CO", "SO2"]:
    df = df.withColumn(f"Rolling_{pollutant}_mean", avg(col(pollutant)).over(row_window))
    df = df.withColumn(f"Lag_{pollutant}_1h", lag(col(pollutant), 1).over(lag_window))
    df = df.withColumn(f"Rolling_{pollutant}_max_7d", max(col(pollutant)).over(time_window_7d))


In [20]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+--------+--------+----+----+---+---+---+---+---+---+-----+-------+-------+-----------+---------+--------+------+---+---+---+---+---+---+--------+----------+----------------+-------------------+--------+--------+-------+-------+------+---------+-------+--------+------------+-------------+-----------------+-----------+-----------------------+-----------------+-----------+-------------------+----------------+----------+------------------+---------------+---------+-----------------+----------------+----------+------------------+
|Start_TS|End_Date|PM25|PM10| NO|NO2|NOX|NH3|SO2| CO|OZONE|BENZENE|TOLUENE|ETH_BENZENE|MP_XYLENE|O_XYLENE|TEMP_C| RH| WS| WD| SR| BP|VWS|PM_RATIO|NOX_TO_NO2|PM25_WIND_SCALED|HUMIDITY_TEMP_INDEX|AQI_PM25|AQI_PM10|AQI_NO2|AQI_SO2|AQI_CO|AQI_OZONE|AQI_NH3|AQI_CPCB|AQI_Category|Start_TS_unix|Rolling_PM25_mean|Lag_PM25_1h|Rolling_Temperature_max|Rolling_PM10_mean|Lag_PM10_1h|Rolling_PM10_max_7d|Rolling_NO2_mean|Lag_NO2_1h|Rolling_NO2_max_7d|Rolling_CO_mean|Lag_CO_1h|Ro

In [21]:

from pyspark.sql import functions as F

df.select("PM25", "PM10", "NOX", "NO2").describe().show()
df.select([F.count(F.when(F.col(c).isNull() | (F.col(c) == 0), c)).alias(c + "_null_or_zero") for c in ["PM25", "PM10", "NOX", "NO2"]]).show()


+-------+------------------+-----------------+------------------+------------------+
|summary|              PM25|             PM10|               NOX|               NO2|
+-------+------------------+-----------------+------------------+------------------+
|  count|           1332059|          1332059|           1332059|           1332059|
|   mean|23.292261648788944| 34.3875124209823|16.039573404615517|15.564740330857443|
| stddev| 39.51423048109861|52.43245887574646| 23.05468558896255|19.283755714511386|
|    min|               0.0|              0.0|               0.0|               0.0|
|    max|            999.99|           999.99|            498.79|            499.99|
+-------+------------------+-----------------+------------------+------------------+

+-----------------+-----------------+----------------+----------------+
|PM25_null_or_zero|PM10_null_or_zero|NOX_null_or_zero|NO2_null_or_zero|
+-----------------+-----------------+----------------+----------------+
|           458718

In [22]:
print(f"Total rows: {df.count()}")

Total rows: 1332059


In [23]:
df.select("AQI_Category").distinct().show()
df.groupBy("AQI_CPCB").count().orderBy("AQI_CPCB").show()


+------------+
|AQI_Category|
+------------+
|        Good|
|      Severe|
|   Very Poor|
|Satisfactory|
|        Poor|
|    Moderate|
+------------+

+--------+------+
|AQI_CPCB| count|
+--------+------+
|     0.0|253037|
|     1.0|   430|
|     2.0|   489|
|     3.0|   549|
|     4.0|   605|
|     5.0|   763|
|     6.0|  1982|
|     7.0|  1367|
|     8.0|  1778|
|     9.0|  2007|
|    10.0|  2233|
|    11.0|  2111|
|    12.0|  2630|
|    13.0|  2910|
|    14.0|  3546|
|    15.0|  3872|
|    16.0|  5127|
|    17.0|  6410|
|    18.0|  5808|
|    19.0|  8124|
+--------+------+
only showing top 20 rows



In [24]:
rolling_cols = [c for c in df.columns if "Rolling" in c or "Lag" in c]
df.select(rolling_cols).describe().show()


+-------+------------------+------------------+-----------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|summary| Rolling_PM25_mean|       Lag_PM25_1h|Rolling_Temperature_max|Rolling_PM10_mean|      Lag_PM10_1h|Rolling_PM10_max_7d|  Rolling_NO2_mean|        Lag_NO2_1h|Rolling_NO2_max_7d|  Rolling_CO_mean|        Lag_CO_1h| Rolling_CO_max_7d|  Rolling_SO2_mean|        Lag_SO2_1h|Rolling_SO2_max_7d|
+-------+------------------+------------------+-----------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|  count|           1332059|           1332058|                1332059|          1332059|          1332058|  

In [25]:
numeric_cols = ["PM25", "PM10", "NO2", "CO", "SO2", "TEMP_C", "RH", "WS"]
df.select(numeric_cols).toPandas().corr()


Unnamed: 0,PM25,PM10,NO2,CO,SO2,TEMP_C,RH,WS
PM25,1.0,0.39053,0.272747,0.107504,0.001786,-0.031724,0.043208,0.070844
PM10,0.39053,1.0,0.243427,-0.071627,0.16179,0.060474,0.133373,0.250817
NO2,0.272747,0.243427,1.0,0.070759,0.03038,0.066405,0.112573,0.178188
CO,0.107504,-0.071627,0.070759,1.0,-0.076133,-0.089688,-0.072748,-0.090049
SO2,0.001786,0.16179,0.03038,-0.076133,1.0,0.190164,-0.034892,0.03037
TEMP_C,-0.031724,0.060474,0.066405,-0.089688,0.190164,1.0,-0.251621,-0.025016
RH,0.043208,0.133373,0.112573,-0.072748,-0.034892,-0.251621,1.0,0.658938
WS,0.070844,0.250817,0.178188,-0.090049,0.03037,-0.025016,0.658938,1.0


In [26]:
# 2. SAVE AS SINGLE CSV FILE

output_path = "/content/final_output_csv"

# Repartition to 1 file and save as CSV
df.coalesce(1) \
  .write.option("header", "true") \
  .mode("overwrite") \
  .csv(output_path)

# 3. DOWNLOAD THE CSV FILE

import os
import shutil
import glob

# Find the part file
part_file = glob.glob(f"{output_path}/part-*.csv")[0]
shutil.move(part_file, "/content/final_AQI.csv")



# Download
from google.colab import files
files.download("/content/final_AQI.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>