In [1]:
import os
%pwd

'c:\\Users\\hp\\Documents\\DS\\Complete Project\\03-Air-Quality-Index-Predictor\\research'

In [3]:
os.chdir("../")
%pwd

'c:\\Users\\hp\\Documents\\DS\\Complete Project\\03-Air-Quality-Index-Predictor'

In [1]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path

In [None]:
from Air_Quality_Predictor.constants import *
from Air_Quality_Predictor.utils.common import *

In [None]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])


    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path
        )

        return data_transformation_config

In [3]:
import os
from Air_Quality_Predictor.utils.common import *
from Air_Quality_Predictor.logging import logger
import pandas as pd
import numpy as np

In [None]:
class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    def clean_specie_column(df):
        df['specie'] = df['specie'].str.lower()
        df['specie'].replace(to_replace=dict(pm25="pm2.5"), inplace=True)
        return df
    
    def clean_city_names(df):
        df['city'].replace(to_replace=dict(Hāpur="Hapur", Ghāziābād="Ghaziabad"), inplace=True)
        return df
    
    def pivot_dataset(dataset):
        return (dataset
            .pivot_table(index=['date', 'city'], columns='specie', values='median')
            .reset_index()
            .rename_axis(None, axis=1)
            .rename_axis('Index'))
    
    def convert_date_column(df, date_column, date_format):
        df[date_column] = pd.to_datetime(df[date_column], format=date_format)
        return df

    def filter_year(df, year):
        df = df[df['date'].dt.year != year]
        return df

    def rename_pollutants_columns(df):
        df.rename(columns={'co': 'CO', 'no2': 'NO2', 'o3': 'O3', 'pm10': 'PM10', 'pm2.5': 'PM2.5', 'so2': 'SO2'}, inplace=True)
        return df

    def calculate_subindices(df, breakpoints_dict):
        subindices = {}
        for column, breakpoints in breakpoints_dict.items():
            subindices[column] = df[column].apply(lambda x: get_subindex(x, breakpoints))
        return subindices

    # Define breakpoints for each sub-index
    breakpoints_dict = {
        "PM2.5": [30, 60, 90, 120, 250],
        "PM10": [50, 100, 250, 350, 430],
        "O3": [50, 100, 168, 208, 748],
        "CO": [1, 2, 10, 17, 34],
        "NO2": [40, 80, 180, 280, 400],
        "SO2": [40, 80, 380, 800, 1600]
    }

    def calculate_AQI(new_df):
        new_df["Checks"] = (new_df["PM2.5_SubIndex"] > 0).astype(int) + \
                        (new_df["PM10_SubIndex"] > 0).astype(int) + \
                        (new_df["SO2_SubIndex"] > 0).astype(int) + \
                        (new_df["NOx_SubIndex"] > 0).astype(int) + \
                        (new_df["CO_SubIndex"] > 0).astype(int) + \
                        (new_df["O3_SubIndex"] > 0).astype(int)

        new_df["AQI_calculated"] = round(new_df[["PM2.5_SubIndex", "PM10_SubIndex", "SO2_SubIndex", "NOx_SubIndex",
                                        "CO_SubIndex", "O3_SubIndex"]].max(axis=1))
        new_df.loc[new_df["PM2.5_SubIndex"] + new_df["PM10_SubIndex"] <= 0, "AQI_calculated"] = np.NaN
        new_df.loc[new_df.Checks < 3, "AQI_calculated"] = np.NaN
        return new_df

    def calculate_AQI_bucket(new_df):
        def get_AQI_bucket(x):
            if x <= 50:
                return "Good"
            elif x <= 100:
                return "Satisfactory"
            elif x <= 200:
                return "Moderate"
            elif x <= 300:
                return "Poor"
            elif x <= 400:
                return "Very Poor"
            elif x > 400:
                return "Severe"
            else:
                return np.NaN

        new_df["AQI_bucket_calculated"] = new_df["AQI_calculated"].apply(lambda x: get_AQI_bucket(x))
        return new_df
    

    def convert(self):
        dataset = pd.read_csv(self.config.data_path)
        dataset = clean_specie_column(dataset)
        dataset = clean_city_names(dataset)
        dataset = pivot_dataset(dataset)
        dataset = convert_date_column(dataset, 'date', "%d-%m-%Y")
        dataset = filter_year(dataset, 2014)
        subindices = calculate_subindices(dataset, breakpoints_dict)
        dataset = rename_pollutants_columns(dataset)
        for column, subindex_values in subindices.items():
            dataset[f"{column}_SubIndex"] = subindex_values
        dataset = calculate_AQI(dataset)
        dataset = calculate_AQI_bucket(dataset)

