# **Air Quality Prediction**
---
**Data Pipeline**

In [10]:
# Import the required libraries.
import os

# Need to be installed.
import yaml
import joblib
import numpy as np
import pandas as pd

from tqdm import tqdm
from sklearn.model_selection import train_test_split

## **1 - Configuration File**
---

- Create two functions: load_config() and update_config().

In [11]:
# Function to load configuration parameter.
def load_config(path_config):
    """
    Load the configuration file.

    Parameters:
    -----------
    config_path : str
        Configuration file location.

    Returns:
    --------
    params : dict
        Loaded configuration file.
    """

    # Try to load config.yaml file.
    try:
        with open(path_config, 'r') as file:
            params = yaml.safe_load(file)
    except FileNotFoundError as err:
        raise RuntimeError(f"Configuration file not found in {path_config}")

    return params

In [26]:
# Function to update configuration parameter.
def update_config(key, value, params, path_config):
    """
    Update the configuration parameter values.

    Parameters:
    ----------
    key : str
        The key to be updated.

    value : any type supported in Python
        The updated value.

    params :  dict
        Loaded configuration parameters.

    path_config : str
        Configuration file location.

    Returns:
    -------
    config : dict
        Updated configuration parameters.
    """
    
    # To maintain the raw config file imutable.
    params = params.copy()

    # Update the configuration.
    params[key] = value

    # Write the configuration file.
    with open(path_config, 'w') as file:
        yaml.dump(params, file)

    print(f"Params Updated! \nKey: {key} - \nValue: {value}\n")

    # Reload the updated configuration file.
    config = load_config(path_config)

    return config

In [27]:
# Load the config.yaml
PATH_CONFIG = "../config/config.yaml"
config = load_config(PATH_CONFIG)

In [21]:
# Check the configuration parameters.
config

dict

## **2 - Data Collection**
---

- Create **load_data()** function.
- It receives one argument: **data_path**
- This function load all csv raw data and return the joined dataframe.

In [28]:
# Function to load data.
def load_data(data_path):
    """
    Load csv files and join into one dataframe.

    Parameters:
    ----------
    path_data : str
        Raw dataset location.

    Returns:
    --------
    raw_dataset : pd.DataFrame
        Loaded and joined data.
    """
    # Create dataframe.
    raw_dataset = pd.DataFrame()

    for i in tqdm(os.listdir(data_path)):
        raw_dataset = pd.concat([
            pd.read_csv(data_path + i), raw_dataset
        ])

    return raw_dataset

In [29]:
# Load the raw dataset.
PATH_RAW_DATA = config["path_raw_data"]
raw_dataset = load_data(data_path = PATH_RAW_DATA)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 12/12 [00:00<00:00, 61.14it/s]


In [21]:
# Chech the raw dataset.
raw_dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-03-01,DKI1 (Bunderan HI),37,60,20,12,12,17,60,PM25,SEDANG
1,2021-03-02,DKI1 (Bunderan HI),38,55,20,15,14,28,55,PM25,SEDANG
2,2021-03-03,DKI1 (Bunderan HI),49,67,19,21,17,35,67,PM25,SEDANG
3,2021-03-04,DKI1 (Bunderan HI),51,75,22,18,19,31,75,PM25,SEDANG
4,2021-03-05,DKI1 (Bunderan HI),52,69,20,16,16,30,69,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
150,2021-12-27,DKI5 (Kebon Jeruk) Jakarta Barat,54,76,36,14,21,47,76,PM25,SEDANG
151,2021-12-28,DKI5 (Kebon Jeruk) Jakarta Barat,44,68,20,11,21,33,68,PM25,SEDANG
152,2021-12-29,DKI5 (Kebon Jeruk) Jakarta Barat,34,54,28,8,25,29,54,PM25,SEDANG
153,2021-12-30,DKI5 (Kebon Jeruk) Jakarta Barat,53,75,25,15,23,44,75,PM25,SEDANG


- We found that:
  1. Index only ranged from 0 to 149, while there are 1830 rows.
  2. Date only ranged from month 3 to 12, while there are 12 months.

In [30]:
# Try to reset the index to solve the first problem.
raw_dataset = raw_dataset.reset_index(drop=True)

# Check the updated index.
raw_dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-03-01,DKI1 (Bunderan HI),37,60,20,12,12,17,60,PM25,SEDANG
1,2021-03-02,DKI1 (Bunderan HI),38,55,20,15,14,28,55,PM25,SEDANG
2,2021-03-03,DKI1 (Bunderan HI),49,67,19,21,17,35,67,PM25,SEDANG
3,2021-03-04,DKI1 (Bunderan HI),51,75,22,18,19,31,75,PM25,SEDANG
4,2021-03-05,DKI1 (Bunderan HI),52,69,20,16,16,30,69,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
1825,2021-12-27,DKI5 (Kebon Jeruk) Jakarta Barat,54,76,36,14,21,47,76,PM25,SEDANG
1826,2021-12-28,DKI5 (Kebon Jeruk) Jakarta Barat,44,68,20,11,21,33,68,PM25,SEDANG
1827,2021-12-29,DKI5 (Kebon Jeruk) Jakarta Barat,34,54,28,8,25,29,54,PM25,SEDANG
1828,2021-12-30,DKI5 (Kebon Jeruk) Jakarta Barat,53,75,25,15,23,44,75,PM25,SEDANG


In [31]:
# Srialize the joined dataset.
PATH_JOINED_DATA = "../data/interim/joined_dataset.pkl"
joblib.dump(raw_dataset, PATH_JOINED_DATA)

['../data/interim/joined_dataset.pkl']

In [34]:
# Update the configuration parameter.
config = update_config(
    key = "path_joined_data",
    value = PATH_JOINED_DATA,
    params = config,
    path_config = PATH_CONFIG
)

Params Updated! Key: path_joined_data - Value: ../data/interim/joined_dataset.pkl


## **3 - Data Validation**
---

In [35]:
# Check the data type for each feature.
raw_dataset.dtypes

tanggal        str
stasiun        str
pm10        object
pm25        object
so2         object
co          object
o3          object
no2         object
max         object
critical       str
categori       str
dtype: object

- Several features don't have the same configuration data type.
- We need to handle those error columns.

### 3.1. Handling Column **tanggal**

In [36]:
# Try to cast the column to datetime type.
raw_dataset["tanggal"] = pd.to_datetime(raw_dataset["tanggal"])

### 3.2. Handling Column **pm10**

In [37]:
# Try to cast the column to int type.
raw_dataset["pm10"] = raw_dataset["pm10"].astype(int)

ValueError: invalid literal for int() with base 10: '---'

- ValueError occurs, it tells us that there are data that isn't integer ("---").
- We will replace those "---" with value that don't exists in the column.
- Based on data definition, we know that we can use -1.

In [39]:
# Ensure no single data that is -1.
raw_dataset.eq("-1").any() | raw_dataset.eq(-1).any()

tanggal     False
stasiun     False
pm10        False
pm25        False
so2         False
co          False
o3          False
no2         False
max         False
critical    False
categori    False
dtype: bool

In [40]:
# Replace the "---" with -1 and cast the column into int.
raw_dataset["pm10"] = raw_dataset["pm10"].replace("---", -1).astype(int)

### 3.3. Handling Column **pm25**

In [41]:
# Try to cast the column to int type.
raw_dataset["pm25"] = raw_dataset["pm25"].astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [42]:
# Replace the "---" with -1 and cast the column into int.
raw_dataset["pm25"] = raw_dataset["pm25"].replace("---", -1).astype(int)

ValueError: cannot convert float NaN to integer

- There is a different **ValueError**.
- There are **NaN** values, thus we can't directly convert to int.
- We need to handle this problem first.

In [43]:
# Sanity check the missing values.
raw_dataset["pm25"].isna().sum()

np.int64(62)

- There are 62 **NaN** values. For now, we can replace it with **-1**.

In [45]:
# Replace the NaN values with -1.
raw_dataset["pm25"] = raw_dataset["pm25"].fillna(-1)

# Sanity check the missing value.
raw_dataset["pm25"].isna().sum()

np.int64(0)

In [46]:
# Replace the "---" with -1 and cast the column into int.
raw_dataset["pm25"] = raw_dataset["pm25"].replace("---", -1).astype(int)

### 3.4. Handling Column **so2**

In [47]:
# Try to cast the column to int type.
raw_dataset["so2"] = raw_dataset["so2"].astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [48]:
# Replace the "---" with -1 and cast the column into int.
raw_dataset["so2"] = raw_dataset["so2"].replace("---", -1).astype(int)

### 3.5. Handling Column **co**

In [49]:
# Try to cast the column to int type.
raw_dataset["co"] = raw_dataset["co"].astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [50]:
# Replace the "---" with -1 and cast the column into int.
raw_dataset["co"] = raw_dataset["co"].replace("---", -1).astype(int)

### 3.6. Handling Column **o3**

In [51]:
# Try to cast column to int type.
raw_dataset["o3"] = raw_dataset["o3"].astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [52]:
# Replace the "---" with -1 and cast the column into int.
raw_dataset["o3"] = raw_dataset["o3"].replace("---", -1).astype(int)

### 3.7. Handling Column **no2**

In [53]:
# Try to cast the columnn to int type.
raw_dataset["no2"] = raw_dataset["no2"].astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [54]:
# Replace the "---" with -1 and cast the column into int.
raw_dataset["no2"] = raw_dataset["no2"].replace("---", -1).astype(int)

### 3.8. Handling Column **max**

In [55]:
# Try to cast the column to int type.
raw_dataset["max"] = raw_dataset["max"].astype(int)

ValueError: invalid literal for int() with base 10: 'PM25'

- Seems like the error is different.
- There is data with value **"PM25"** in the **max** column.
- We need to investigate the data.

In [56]:
# Check which data that cause error.
raw_dataset[raw_dataset["max"] == "PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
1677,2021-12-03,DKI1 (Bunderan HI),49,31,9,19,7,49,PM25,BAIK,


- Looks like there are typos on row index 1677.
    - The **"BAIK"** value must be on **categori** column.
    - We need to investigate what do **max** and **critical** column represents.
- Let's randomly sample 5 data.

In [57]:
raw_dataset.sample(5)

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
601,2021-11-17,DKI5 (Kebon Jeruk) Jakarta Barat,52,77,19,10,14,35,77,PM25,SEDANG
1668,2021-07-25,DKI5 (Kebon Jeruk) Jakarta Barat,77,131,28,24,27,42,131,PM25,TIDAK SEHAT
1585,2021-07-04,DKI3 (Jagakarsa),86,132,44,9,28,14,132,PM25,TIDAK SEHAT
388,2021-01-17,DKI3 (Jagakarsa),58,86,22,11,43,9,86,PM25,SEDANG
215,2021-05-30,DKI2 (Kelapa Gading),53,65,55,17,51,28,65,PM25,SEDANG


- Looks like the **max** column represents the maximum value between any other int columns.
- And looks like the **critical** column represents the column name of **max** value.


- Thus, let's fix the error on the row index 1677:
  - Replace the **max** column with **pm10** or **no2** value, let's take from **pm10**
  - Replace the **critical** column with **"PM10"**
  - Replace the **categori** column with **"BAIK"**

In [58]:
# Fix the error.
raw_dataset.loc[1677, "max"] = raw_dataset.loc[1677, "pm10"]
raw_dataset.loc[1677, "critical"] = "PM10"
raw_dataset.loc[1677, "categori"] = "BAIK"

In [59]:
# Sanity check the result.
raw_dataset.loc[1677]

tanggal     2021-12-03 00:00:00
stasiun      DKI1 (Bunderan HI)
pm10                         49
pm25                         31
so2                           9
co                           19
o3                            7
no2                          49
max                          49
critical                   PM10
categori                   BAIK
Name: 1677, dtype: object

In [60]:
# Cast the column to int.
raw_dataset["max"] = raw_dataset["max"].astype(int)

### 3.9. Handlind Column **critical**

In [62]:
# Check the unique value.
raw_dataset["critical"].value_counts()

critical
PM25    1631
PM10      65
O3        57
CO        34
SO2       26
Name: count, dtype: int64

- Seems like no action needed.

### 3.10. Handling Column **categori**

In [63]:
# Check the unique value.
raw_dataset["categori"].value_counts()

categori
SEDANG            1305
TIDAK SEHAT        319
BAIK               189
TIDAK ADA DATA      17
Name: count, dtype: int64

- There are 17 **"TIDAK ADA DATA"** values, indicate the missing label.
- Since we don't know which label that can replace the **"TIDAK ADA DATA"**, thus we can drop those data.

In [65]:
# Drop the "TIDAK ADA DATA" category.
missing_labels = raw_dataset[raw_dataset["categori"] == "TIDAK ADA DATA"]
raw_dataset = raw_dataset.drop(index = missing_labels.index)

In [66]:
# Sanity check the result.
raw_dataset["categori"].value_counts()

categori
SEDANG         1305
TIDAK SEHAT     319
BAIK            189
Name: count, dtype: int64

- Let's rename the **categori** column into the proper name, **category**.
- Don't forget to update the configuration file.

In [67]:
# Rename "categori" into "category".
raw_dataset = raw_dataset.rename(columns= {"categori": "category"})

In [68]:
# Update the configuration parameter.
config = update_config(
    key = "label",
    value = "category",
    params = config,
    path_config = PATH_CONFIG
)

Params Updated! Key: label - Value: category


In [70]:
# Update the configuration parameter.
col_object = config["object_columns"]
col_object[-1] = "category"

config = update_config(
    key = "object_columns",
    value = col_object,
    params = config,
    path_config = PATH_CONFIG
)

Params Updated! Key: object_columns - Value: ['stasiun', 'critical', 'category']


In [71]:
# Sanity check the data types.
raw_dataset.info()

<class 'pandas.DataFrame'>
Index: 1813 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1813 non-null   datetime64[us]
 1   stasiun   1813 non-null   str           
 2   pm10      1813 non-null   int64         
 3   pm25      1813 non-null   int64         
 4   so2       1813 non-null   int64         
 5   co        1813 non-null   int64         
 6   o3        1813 non-null   int64         
 7   no2       1813 non-null   int64         
 8   max       1813 non-null   int64         
 9   critical  1813 non-null   str           
 10  category  1813 non-null   str           
dtypes: datetime64[us](1), int64(7), str(3)
memory usage: 170.0 KB


- All columns are already same as in the data definition.
- Now serialized the validated data.

In [72]:
# Serialized the validated data.
PATH_VALIDATED_DATA = f"../data/interim/validated_data.pkl"
joblib.dump(raw_dataset, PATH_VALIDATED_DATA)

['../data/interim/validated_data.pkl']

In [73]:
# Update the configuration parameter.
config = update_config(
    key = "path_validated_data",
    value = PATH_VALIDATED_DATA,
    params = config,
    path_config = PATH_CONFIG
)

Params Updated! Key: path_validated_data - Value: ../data/interim/validated_data.pkl


## **4 - Update the Range of Data in Configuration File**
---

In [75]:
# Update the range of data with the min and max value of each column.
cols = ["pm10", "pm25", "so2", "co", "o3", "no2"]
param_keys = ["range_pm10", "range_pm25", "range_so2", "range_co", "range_o3", "range_no2"]

for col, key in zip(cols, param_keys):
    config = update_config(
        key = key,
        value = [int(np.min(raw_dataset[col])), int(np.max(raw_dataset[col]))],
        params = config,
        path_config = PATH_CONFIG
    )

Params Updated! Key: range_pm10 - Value: [-1, 179]
Params Updated! Key: range_pm25 - Value: [-1, 174]
Params Updated! Key: range_so2 - Value: [-1, 82]
Params Updated! Key: range_co - Value: [-1, 47]
Params Updated! Key: range_o3 - Value: [-1, 151]
Params Updated! Key: range_no2 - Value: [-1, 65]


In [76]:
# Check the configuration parameters.
config

{'datetime_columns': ['tanggal'],
 'features': ['stasiun', 'pm10', 'pm25', 'so2', 'co', 'o3', 'no2'],
 'int32_columns': ['pm10', 'pm25', 'so2', 'co', 'o3', 'no2', 'max'],
 'label': 'category',
 'label_categories': ['BAIK', 'SEDANG', 'TIDAK SEHAT'],
 'label_categories_new': ['BAIK', 'TIDAK BAIK'],
 'object_columns': ['stasiun', 'critical', 'category'],
 'path_joined_data': '../data/interim/joined_dataset.pkl',
 'path_raw_data': '../data/raw/',
 'path_validated_data': '../data/interim/validated_data.pkl',
 'range_co': [-1, 47],
 'range_no2': [-1, 65],
 'range_o3': [-1, 151],
 'range_pm10': [-1, 179],
 'range_pm25': [-1, 174],
 'range_so2': [-1, 82],
 'range_stasiun': ['DKI1 (Bunderan HI)',
  'DKI2 (Kelapa Gading)',
  'DKI3 (Jagakarsa)',
  'DKI4 (Lubang Buaya)',
  'DKI5 (Kebon Jeruk) Jakarta Barat']}

## **5 - Data Defense**
---

- Create the **check_data()** function.
- It receives 2 arguments: **input_data** and **params**
  - **input_data** is the raw dataset
  - **params** is the configuration parameters
- It is a void function (no return value).
- If **AssertionError** happens, there are exists data that don't match the configuration.

In [81]:
# Function to do data defense.
def check_data(input_data, params):
    """
    Do data defense for checking the data types and range of data.

    Parameters:
    ----------
    input_data : pd.DataFrame
        The data to be checked.

    params : dict
        Loaded configuration parameters.

    Returns:
    -------
    None, it's a void function.
    """

    # Check data types.
    assert input_data.select_dtypes("datetime").columns.to_list == params["columns_datetime"], "an error occurs in datetime column(s)."
    assert imput_data.select_dtypes("str").columns.to_list() == params["columns_object"], "an error occurs in object in object column(s)."
    assert input_data.select_dtypes("number").columns.to_list() == params["columns_int"], "an error occurs in int32 column(s)."

    # Check range of data.
    assert set(input_data['stasiun']).issubset(set(params['range_stasiun'])), "an error occurs in stasiun range."
    assert input_data['pm10'].between(params['range_pm10'][0], params['range_pm10'][1]).sum() == len(input_data), "an error occurs in pm10 range."
    assert input_data['pm25'].between(params['range_pm25'][0], params['range_pm25'][1]).sum() == len(input_data), "an error occurs in pm25 range."
    assert input_data['so2'].between(params['range_so2'][0], params['range_so2'][1]).sum() == len(input_data), "an error occurs in so2 range."
    assert input_data['co'].between(params['range_co'][0], params['range_co'][1]).sum() == len(input_data), "an error occurs in co range."
    assert input_data['o3'].between(params['o3'][0], params['o3'][1]).sum() == len(input_data), "an error occurs in o3 range."
    assert input_data['no2'].between(params['range_no2'][0], params['range_no2'][1]).sum() == len(input_data), "an error occurs in no2 range."

In [82]:
# Do data defense
check_data(raw_dataset, config)

KeyError: 'columns_datetime'