# Data Pipeline Demo

## 0. Load Required Libraries

In [1]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import pandas as pd
import numpy as np
import joblib
import os
import yaml
import src.util as util

## 1. Load Configuration File

In [2]:
config_data = util.load_config()

## 2. Data Collection

In [3]:
def read_raw_data(config: dict) -> pd.DataFrame:
    # Create variable to store raw dataset
    raw_dataset = pd.DataFrame()

    # Raw dataset dir
    raw_dataset_dir = config["raw_dataset_dir"]

    # Look and load add CSV files
    for i in tqdm(os.listdir(raw_dataset_dir)):
        raw_dataset = pd.concat([pd.read_csv(raw_dataset_dir + i), raw_dataset])

    # Return raw dataset
    return raw_dataset


In [4]:
raw_dataset = read_raw_data(config_data)

100%|██████████| 12/12 [00:00<00:00, 50.31it/s]


In [5]:
# Check your data
raw_dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-05-01,DKI1 (Bunderan HI),52,71,23,10,20,33,71,PM25,SEDANG
1,2021-05-02,DKI1 (Bunderan HI),50,67,22,9,18,17,67,PM25,SEDANG
2,2021-05-03,DKI1 (Bunderan HI),53,70,26,10,20,20,70,PM25,SEDANG
3,2021-05-04,DKI1 (Bunderan HI),59,77,31,14,14,26,77,PM25,SEDANG
4,2021-05-05,DKI1 (Bunderan HI),58,82,22,18,14,38,82,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
145,2021-11-26,DKI5 (Kebon Jeruk) Jakarta Barat,32,59,16,7,15,20,59,PM25,SEDANG
146,2021-11-27,DKI5 (Kebon Jeruk) Jakarta Barat,22,36,13,7,18,22,36,PM25,BAIK
147,2021-11-28,DKI5 (Kebon Jeruk) Jakarta Barat,18,26,12,2,16,8,26,PM25,BAIK
148,2021-11-29,DKI5 (Kebon Jeruk) Jakarta Barat,15,34,13,3,13,9,34,PM25,BAIK


In [6]:
# Reset the index
raw_dataset.reset_index(inplace = True,
                        drop = True)

In [7]:
raw_dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-05-01,DKI1 (Bunderan HI),52,71,23,10,20,33,71,PM25,SEDANG
1,2021-05-02,DKI1 (Bunderan HI),50,67,22,9,18,17,67,PM25,SEDANG
2,2021-05-03,DKI1 (Bunderan HI),53,70,26,10,20,20,70,PM25,SEDANG
3,2021-05-04,DKI1 (Bunderan HI),59,77,31,14,14,26,77,PM25,SEDANG
4,2021-05-05,DKI1 (Bunderan HI),58,82,22,18,14,38,82,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
1820,2021-11-26,DKI5 (Kebon Jeruk) Jakarta Barat,32,59,16,7,15,20,59,PM25,SEDANG
1821,2021-11-27,DKI5 (Kebon Jeruk) Jakarta Barat,22,36,13,7,18,22,36,PM25,BAIK
1822,2021-11-28,DKI5 (Kebon Jeruk) Jakarta Barat,18,26,12,2,16,8,26,PM25,BAIK
1823,2021-11-29,DKI5 (Kebon Jeruk) Jakarta Barat,15,34,13,3,13,9,34,PM25,BAIK


In [8]:
util.pickle_dump(raw_dataset, config_data["raw_dataset_path"])

## 2. Data Definition

In [9]:
# Define data type, range of data and some explanation out data for each variable

## 3. Data Validation

### 3.1. Tipe Data

In [10]:
# Check data type each variable
raw_dataset.dtypes

tanggal     object
stasiun     object
pm10        object
pm25        object
so2         object
co          object
o3          object
no2         object
max         object
critical    object
categori    object
dtype: object

### 3.2. Range

In [11]:
raw_dataset.describe()

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
count,1825,1825,1825,1763,1825,1825,1825,1825,1825,1809,1824
unique,365,5,130,192,121,54,121,96,187,6,4
top,2021-05-01,DKI1 (Bunderan HI),51,77,---,9,---,13,77,PM25,SEDANG
freq,5,365,69,45,114,163,68,81,50,1630,1349


### 3.3. Dimensi Data

In [12]:
raw_dataset.shape

(1825, 11)

### 3.4. Handling Variables Error

#### 3.4.1. Handling Variable "Tanggal"

In [13]:
raw_dataset["tanggal"] = pd.to_datetime(raw_dataset["tanggal"])

In [14]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   object        
 3   pm25      1763 non-null   object        
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), object(10)
memory usage: 157.0+ KB


#### 3.4.2. Handling Variable "PM10"

In [15]:
raw_dataset[raw_dataset["pm10"] == "DKI4 (Lubang Buaya)"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [16]:
raw_dataset["pm10"] = raw_dataset["pm10"].replace("---", -1).astype(int)

In [17]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   int64         
 3   pm25      1763 non-null   object        
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), int64(1), object(9)
memory usage: 157.0+ KB


#### 3.4.3. Handling Variable "PM25"

In [18]:
raw_dataset["pm25"].fillna(-1, inplace = True)

In [19]:
raw_dataset["pm25"] = raw_dataset["pm25"].replace("---", -1).astype(int)

In [20]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   int64         
 3   pm25      1825 non-null   int64         
 4   so2       1825 non-null   object        
 5   co        1825 non-null   object        
 6   o3        1825 non-null   object        
 7   no2       1825 non-null   object        
 8   max       1825 non-null   object        
 9   critical  1809 non-null   object        
 10  categori  1824 non-null   object        
dtypes: datetime64[ns](1), int64(2), object(8)
memory usage: 157.0+ KB


#### 3.4.4 Handling SO2

In [21]:
raw_dataset["so2"] = raw_dataset["so2"].replace("---", -1).astype(int)

In [22]:
raw_dataset["co"] = raw_dataset["co"].replace("---", -1).astype(int)

In [23]:
raw_dataset["o3"] = raw_dataset["o3"].replace("---", -1).astype(int)

In [24]:
raw_dataset["no2"] = raw_dataset["no2"].replace("---", -1).astype(int)

In [25]:
raw_dataset[raw_dataset["max"]=="PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
1217,2021-12-03,DKI1 (Bunderan HI),49,31,9,19,7,49,PM25,BAIK,


In [26]:
# quick fix
raw_dataset.loc[1217, "max"] = 49
raw_dataset.loc[1217, "critical"] = "PM10"
raw_dataset.loc[1217, "categori"] = "BAIK"

In [27]:
raw_dataset[raw_dataset["max"]=="PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [28]:
raw_dataset.loc[1217]

tanggal     2021-12-03 00:00:00
stasiun      DKI1 (Bunderan HI)
pm10                         49
pm25                         31
so2                           9
co                           19
o3                            7
no2                          49
max                          49
critical                   PM10
categori                   BAIK
Name: 1217, dtype: object

In [29]:
raw_dataset["max"] = raw_dataset["max"].astype(int)

In [30]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1825 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1825 non-null   datetime64[ns]
 1   stasiun   1825 non-null   object        
 2   pm10      1825 non-null   int64         
 3   pm25      1825 non-null   int64         
 4   so2       1825 non-null   int64         
 5   co        1825 non-null   int64         
 6   o3        1825 non-null   int64         
 7   no2       1825 non-null   int64         
 8   max       1825 non-null   int64         
 9   critical  1809 non-null   object        
 10  categori  1825 non-null   object        
dtypes: datetime64[ns](1), int64(7), object(3)
memory usage: 157.0+ KB


#### 3.4.9. Handling variable Categori

In [31]:
raw_dataset["categori"].value_counts()

categori
SEDANG            1349
TIDAK SEHAT        272
BAIK               188
TIDAK ADA DATA      16
Name: count, dtype: int64

In [32]:
raw_dataset.drop(index = raw_dataset[raw_dataset["categori"] == "TIDAK ADA DATA"].index,
                 inplace = True)

In [33]:
raw_dataset["categori"].value_counts()

categori
SEDANG         1349
TIDAK SEHAT     272
BAIK            188
Name: count, dtype: int64

In [34]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1809 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1809 non-null   datetime64[ns]
 1   stasiun   1809 non-null   object        
 2   pm10      1809 non-null   int64         
 3   pm25      1809 non-null   int64         
 4   so2       1809 non-null   int64         
 5   co        1809 non-null   int64         
 6   o3        1809 non-null   int64         
 7   no2       1809 non-null   int64         
 8   max       1809 non-null   int64         
 9   critical  1809 non-null   object        
 10  categori  1809 non-null   object        
dtypes: datetime64[ns](1), int64(7), object(3)
memory usage: 169.6+ KB


In [35]:
raw_dataset.describe()

Unnamed: 0,tanggal,pm10,pm25,so2,co,o3,no2,max
count,1809,1809.0,1809.0,1809.0,1809.0,1809.0,1809.0,1809.0
mean,2021-07-01 22:31:38.507462400,50.731343,74.384743,33.22775,11.775567,30.879491,19.447761,77.644002
min,2021-01-01 00:00:00,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,17.0
25%,2021-04-02 00:00:00,42.0,60.0,24.0,9.0,21.0,13.0,62.0
50%,2021-07-02 00:00:00,53.0,77.0,33.0,11.0,28.0,18.0,77.0
75%,2021-10-01 00:00:00,61.0,91.0,44.0,14.0,38.0,25.0,91.0
max,2021-12-31 00:00:00,179.0,174.0,82.0,47.0,151.0,65.0,179.0
std,,16.936853,27.954196,14.844142,5.250099,15.575141,9.620375,22.704975


In [36]:
raw_dataset["stasiun"].value_counts()

stasiun
DKI1 (Bunderan HI)                  365
DKI5 (Kebon Jeruk) Jakarta Barat    364
DKI3 (Jagakarsa)                    363
DKI2 (Kelapa Gading)                362
DKI4 (Lubang Buaya)                 355
Name: count, dtype: int64

In [37]:
util.pickle_dump(raw_dataset, config_data["cleaned_raw_dataset_path"])

## 4. Data Defense

In [38]:
def check_data(input_data, params):
    # Check data types
    assert input_data.select_dtypes("datetime").columns.to_list() == params["datetime_columns"], "an error occurse in datetime column(s)"
    assert input_data.select_dtypes("int").columns.to_list() == params["int64_columns"], "an error occurse in int64 column(s)"
    assert input_data.select_dtypes("object").columns.to_list() == params["object_columns"], "an error occurse in object column(s)"

    # Check range data
    assert set(input_data["stasiun"]).issubset(set(params["range_stasiun"])), "an error occurse in stasiun range"
    assert input_data["pm10"].between(params["range_pm10"][0], params["range_pm10"][1]).sum() == len(input_data), "an error occurse in pm10 range"
    assert input_data["pm25"].between(params["range_pm25"][0], params["range_pm25"][1]).sum() == len(input_data), "an error occurse in pm25 range"
    assert input_data["so2"].between(params["range_so2"][0], params["range_so2"][1]).sum() == len(input_data), "an error occurse in so2 range"
    assert input_data["co"].between(params["range_co"][0], params["range_co"][1]).sum() == len(input_data), "an error occurse in co range"
    assert input_data["o3"].between(params["range_o3"][0], params["range_o3"][1]).sum() == len(input_data), "an error occurse in o3 range"
    assert input_data["no2"].between(params["range_no2"][0], params["range_no2"][1]).sum() == len(input_data), "an error occurse in no2 range"
    

In [39]:
check_data(raw_dataset, config_data)

## 5. Data Splitting

In [40]:
X = raw_dataset[config_data["predictors"]].copy()
y = raw_dataset["categori"].copy()

In [41]:
X.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1809 entries, 0 to 1824
Data columns (total 7 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   stasiun  1809 non-null   object
 1   pm10     1809 non-null   int64 
 2   pm25     1809 non-null   int64 
 3   so2      1809 non-null   int64 
 4   co       1809 non-null   int64 
 5   o3       1809 non-null   int64 
 6   no2      1809 non-null   int64 
dtypes: int64(6), object(1)
memory usage: 113.1+ KB


In [42]:
y.value_counts()

categori
SEDANG         1349
TIDAK SEHAT     272
BAIK            188
Name: count, dtype: int64

In [44]:
# First splitting
X_train, X_test, \
    y_train, y_test \
        = train_test_split(X, y,
                           test_size = 0.2,
                           random_state = 42,
                           stratify = y)

In [45]:
# Second splitting
X_train, X_valid, \
    y_train, y_valid \
        = train_test_split(X_train, y_train,
                           test_size = 0.2,
                           random_state = 42,
                           stratify = y_train)

In [46]:
len(y_train), len(y_valid), len(y_test)

(1157, 290, 362)

In [47]:
util.pickle_dump(X_train, config_data["train_set_path"][0])
util.pickle_dump(y_train, config_data["train_set_path"][1])

util.pickle_dump(X_valid, config_data["valid_set_path"][0])
util.pickle_dump(y_valid, config_data["valid_set_path"][1])

util.pickle_dump(X_test, config_data["test_set_path"][0])
util.pickle_dump(y_test, config_data["test_set_path"][1])