# Air Polutant Dataset Combiner

Python Version: 3.13.5

In [69]:
import csv

def read_csv_rows(absFilePath: str) -> list[list[str]]:
    with open(absFilePath, mode = 'r') as f:
        reader = csv.reader(f)
        rows = []
        for row in reader:
            rows.append(row)
        f.close()
        return rows

In [70]:
import os

REL_FILE_PATHS = [
    os.path.join(os.getcwd(), "rawData", "air_daily_2010_2016.csv"),
    os.path.join(os.getcwd(), "rawData", "air_daily_2017_2024.csv")
]

In [71]:
COL_TO_LOCATION_MAP = {
    # 0 indexed column number to location mapping
    2   : "SHATIN",
    3   : "TSUEN WAN",
    4   : "CENTRAL",
    5   : "EASTERN",
    6   : "KWUN TONG",
    7   : "TUEN MUN",
    8   : "TUNG CHUNG",
    9   : "SHAM SHUI PO",
    10  : "SOUTHERN",
    11  : "YUEN LONG",
    12  : "CENTRAL/WESTERN",
    13  : "NORTH",
    14  : "KWAI CHUNG",
    15  : "TAP MUN",
    16  : "TSEUNG KWAN O",
    17  : "TAI PO",
    18  : "MONG KOK",
    19  : "CAUSEWAY BAY"
}

In [72]:
POLLUTANTS = [
    "Carbon Monoxide",
    "Fine Suspended Particulates",
    "Nitrogen Dioxide",
    "Nitrogen Oxides",
    "Ozone",
    "Respirable Suspended Particulates",
    "Sulphur Dioxide"
]

POLLUTANT_ACRONYM = {
    "Carbon Monoxide"                   :   "CO",
    "Fine Suspended Particulates"       :   "FSP",
    "Nitrogen Dioxide"                  :   "NO2",
    "Nitrogen Oxides"                   :   "NOX",
    "Ozone"                             :   "O3",
    "Respirable Suspended Particulates" :   "RSP",
    "Sulphur Dioxide"                   :   "SO2"
}

In [73]:
from dataclasses import dataclass

@dataclass
class CustomDate:
    year    :   int
    month   :   int
    day     :   int

def dateSplitter(dashFormatDate: str) -> CustomDate:
    """
    Date formatted as dd-mm-yyyy
    """
    return CustomDate(year=int(dashFormatDate[6:]),
                      month=int(dashFormatDate[3:5]),
                      day=int(dashFormatDate[0:2]))

In [74]:
from typing import Literal, Union, Self

from pydantic import BaseModel, model_validator

class PollutantDatapoint(BaseModel):
    year            :   int
    month           :   int
    day             :   int
    location        :   Literal["SHATIN","TSUEN WAN","CENTRAL","EASTERN","KWUN TONG","TUEN MUN","TUNG CHUNG","SHAM SHUI PO","SOUTHERN","YUEN LONG","CENTRAL/WESTERN","NORTH","KWAI CHUNG","TAP MUN","TSEUNG KWAN O","TAI PO","MONG KOK","CAUSEWAY BAY"]
    pollutantType   :   Literal["Carbon Monoxide","Fine Suspended Particulates","Nitrogen Dioxide","Nitrogen Oxides","Ozone","Respirable Suspended Particulates","Sulphur Dioxide"]
    pollutantAcronym:   Literal["CO", "FSP", "NO2", "NOX", "O3", "RSP", "SO2"]
    value           :   Union[float, Literal["N.A."]]
    unit            :   Literal["μg/m3", "10μg/m3"]

    @model_validator(mode="after")
    def check_unit_matches(self) -> Self:
        if self.pollutantAcronym == "CO":
            if self.unit != "10μg/m3":
                raise ValueError("Pollutant(CO)+Unit Mismatch")
        else:
            if self.unit != "μg/m3":
                raise ValueError("Pollutant(non-CO)+Unit Mismatch")
        return self

In [75]:
rows2010_2016 = read_csv_rows(REL_FILE_PATHS[0])
rows2010_2016 = rows2010_2016[14:]

rows2017_2024 = read_csv_rows(REL_FILE_PATHS[1])
rows2017_2024 = rows2017_2024[14:]

rows: list[list[str]] = []

rows.extend(rows2010_2016)
rows.extend(rows2017_2024)

In [76]:
def convert_row_to_pollutant_datapoints(row: list[str])-> list[PollutantDatapoint]:
    """
    one row generates many datapoints (one for each location)
    """
    date = dateSplitter(row[0])
    pollutantType = (row[1]).strip()
    pollutantAcronym = POLLUTANT_ACRONYM[pollutantType]
    unit = "10μg/m3" if (pollutantAcronym == "CO") else "μg/m3"

    datapoints: list[PollutantDatapoint] = []
    # columns 2 to 19 (inclusive)
    for index in range(2, 20):
        location = COL_TO_LOCATION_MAP[index]
        value = row[index]
        datapoint = PollutantDatapoint(year     = date.year,
                                        month   = date.month,
                                        day     = date.day,
                                        location= location,
                                        pollutantType= pollutantType,
                                        pollutantAcronym= pollutantAcronym,
                                        value = value,
                                        unit = unit
                                    )
        datapoints.append(datapoint)
    
    return datapoints

In [77]:
combined_datapoint_list: list[PollutantDatapoint] = []

for row in rows:
    generated_datapoints = convert_row_to_pollutant_datapoints(row)
    combined_datapoint_list.extend(generated_datapoints)

In [78]:
def save_datapoint_list(datapoint_list: list[BaseModel], absFilePath: str):
    with open(absFilePath, mode="w") as f:
        for datapoint in datapoint_list:
            f.write(datapoint.model_dump_json())
            f.write("\n")
        f.close()

In [79]:
import os

saveFilePath = os.path.join(os.getcwd(), "cleaned", "combinedParticles.jsonl")
save_datapoint_list(combined_datapoint_list, saveFilePath)