In [1]:
import os
%pwd

'e:\\project_ineuron\\Air_Quality_Index_Predictor\\research'

In [2]:
os.chdir("../")
%pwd

'e:\\project_ineuron\\Air_Quality_Index_Predictor'

In [3]:
import warnings
warnings.filterwarnings("ignore")

In [4]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path

In [6]:
from Air_Quality_Index_Predictor.constants import *
from Air_Quality_Index_Predictor.utils.common import read_yaml, create_directories

In [7]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])


    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path
        )

        return data_transformation_config

In [8]:
import os
from Air_Quality_Index_Predictor.utils.common import read_yaml, create_directories, breakpoints_dict, get_subindex
from Air_Quality_Index_Predictor.logging import logger
import pandas as pd
import numpy as np 
from datetime import datetime,timedelta

In [9]:
class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config


    def clean_specie_column(self, df):
        df['specie'] = df['specie'].str.lower()
        df['specie'].replace(to_replace=dict(pm25="pm2.5"), inplace=True)
        return df
    
    def clean_city_names(self, df):
        df['city'].replace(to_replace=dict(Hāpur="Hapur", Ghāziābād="Ghaziabad"), inplace=True)
        return df
    
    def pivot_dataset(self, dataset):
        return (dataset
            .pivot_table(index=['date', 'city'], columns='specie', values='median')
            .reset_index()
            .rename_axis(None, axis=1)
            .rename_axis('Index'))
    
    def rename_pollutants_columns(self, df):
        df.rename(columns={'co': 'CO', 'no2': 'NO2', 'o3': 'O3', 'pm10': 'PM10', 'pm2.5': 'PM2.5', 'so2': 'SO2'}, inplace=True)
        return df
    
    def convert_date_column(self, df, date_column, date_format):
        df['date'] = df['date'].str.replace('/', '-')
        df[date_column] = pd.to_datetime(df[date_column], format=date_format)
        return df

    def filter_year(self, df, year):
        df = df[df['date'].dt.year != year]
        return df
    


    def calculate_subindices(self, df, breakpoints_dict):
        subindices = {}
        for column, breakpoints in breakpoints_dict.items():
            subindices[column] = df[column].apply(lambda x: get_subindex(x, breakpoints))
        return subindices

    def calculate_AQI(self, new_df):
        new_df["Checks"] = (new_df["PM2.5_SubIndex"] > 0).astype(int) + \
                        (new_df["PM10_SubIndex"] > 0).astype(int) + \
                        (new_df["SO2_SubIndex"] > 0).astype(int) + \
                        (new_df["NO2_SubIndex"] > 0).astype(int) + \
                        (new_df["CO_SubIndex"] > 0).astype(int) + \
                        (new_df["O3_SubIndex"] > 0).astype(int)

        new_df["AQI_calculated"] = round(new_df[["PM2.5_SubIndex", "PM10_SubIndex", "SO2_SubIndex", "NO2_SubIndex",
                                        "CO_SubIndex", "O3_SubIndex"]].max(axis=1))
        new_df.loc[new_df["PM2.5_SubIndex"] + new_df["PM10_SubIndex"] <= 0, "AQI_calculated"] = np.NaN
        new_df.loc[new_df.Checks < 3, "AQI_calculated"] = np.NaN
        return new_df

    def calculate_AQI_bucket(self, new_df):
        def get_AQI_bucket(x):
            if x <= 50:
                return "Good"
            elif x <= 100:
                return "Satisfactory"
            elif x <= 200:
                return "Moderate"
            elif x <= 300:
                return "Poor"
            elif x <= 400:
                return "Very Poor"
            elif x > 400:
                return "Severe"
            else:
                return np.NaN

        new_df["AQI_bucket_calculated"] = new_df["AQI_calculated"].apply(lambda x: get_AQI_bucket(x))
        return new_df
    
    
    def convert(self):
        dataset = pd.read_csv(self.config.data_path)
        logger.info("Data read successfully")

        dataset = self.clean_specie_column(dataset)
        dataset = self.clean_city_names(dataset)
        logger.info("Data clean successfully")

        dataset = self.pivot_dataset(dataset)
        logger.info("Data pivoted successfully")

        dataset = self.rename_pollutants_columns(dataset)
        logger.info("Data renamed successfully")

        dataset = self.convert_date_column(dataset, 'date', "%d-%m-%Y")
        dataset = self.filter_year(dataset, 2014)
        logger.info("Data date adjusted successfully")

        subindices = self.calculate_subindices(dataset, breakpoints_dict)
        logger.info("Data subindices calculated successfully")

        for column, subindex_values in subindices.items():
            dataset[f"{column}_SubIndex"] = subindex_values
        logger.info("Data subindex added  successfully")

        dataset = self.calculate_AQI(dataset)
        dataset = self.calculate_AQI_bucket(dataset)
        logger.info("Data AQI calculated successfully")
        
        dataset.set_index('date', inplace=True)
        dataset = dataset.sort_index(ascending=True)

        train_dataset_end = pd.Timestamp(datetime(2023, 5, 1))
        test_dataset_start = train_dataset_end + timedelta(days=1) 
        test_dataset_end = pd.Timestamp(datetime(2023, 10, 1))

        train_data = dataset.loc[:train_dataset_end]
        test_data = dataset.loc[test_dataset_start:test_dataset_end]
        
        train_data.to_csv(os.path.join(self.config.root_dir, "train_dataset.csv"))
        test_data.to_csv(os.path.join(self.config.root_dir, "test_dataset.csv"))
        logger.info("Train and Test data made successfully")


In [10]:
try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    data_transformation.convert()
except Exception as e:
    raise e

[2024-05-08 17:41:15,519 : INFO : common : yaml file: config\config.yaml loaded successfully]


[2024-05-08 17:41:15,572 : INFO : common : yaml file: params.yaml loaded successfully]
[2024-05-08 17:41:15,583 : INFO : common : Created directory at: artifacts]
[2024-05-08 17:41:15,585 : INFO : common : Created directory at: artifacts/data_transformation]
[2024-05-08 17:41:16,851 : INFO : 1551347022 : Data read successfully]
[2024-05-08 17:41:17,338 : INFO : 1551347022 : Data clean successfully]
[2024-05-08 17:41:18,440 : INFO : 1551347022 : Data pivoted successfully]
[2024-05-08 17:41:18,440 : INFO : 1551347022 : Data renamed successfully]
[2024-05-08 17:41:18,590 : INFO : 1551347022 : Data date adjusted successfully]
[2024-05-08 17:41:18,923 : INFO : 1551347022 : Data subindices calculated successfully]
[2024-05-08 17:41:18,938 : INFO : 1551347022 : Data subindex added  successfully]
[2024-05-08 17:41:19,032 : INFO : 1551347022 : Data AQI calculated successfully]
[2024-05-08 17:41:21,154 : INFO : 1551347022 : Train and Test data made successfully]
