# Data Pipeline Demo

## 0. Load Required Libraries

In [1]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import pandas as pd
import numpy as np 
import joblib
import os
import yaml
import src.util as util

## 1. Load Configuration File

In [13]:
config_data = util.load_config()

## 2. Data Collection

In [14]:
def read_raw_data(config: dict) -> pd.DataFrame:
    # Create variable to store raw dataset
    raw_dataset = pd.DataFrame()

    # Raw dataset dir
    raw_dataset_dir = config["raw_dataset_dir"]

    # Columns to keep
    columns_to_keep = config["columns_to_keep"]

    # Look and load add CSV files
    for file in tqdm(os.listdir(raw_dataset_dir)):
        # Only read CSV files
        if file.endswith('.csv'):  
            csv_data = pd.read_csv(os.path.join(raw_dataset_dir, file))
            raw_dataset = pd.concat([csv_data, raw_dataset])

    # Keep only specified columns
    raw_dataset = raw_dataset[columns_to_keep]

    # Return raw dataset
    return raw_dataset

In [15]:
raw_dataset = read_raw_data(config_data)

100%|██████████| 1/1 [00:00<00:00, 126.44it/s]


In [16]:
# Check our data
raw_dataset

Unnamed: 0,hdi,continent,EFConsPerCap
0,0.340,Asia,0.648085
1,0.341,Asia,0.605820
2,0.373,Asia,0.704061
3,0.381,Asia,0.708856
4,0.396,Asia,0.619414
...,...,...,...
2152,0.452,Africa,1.241330
2153,0.464,Africa,1.306024
2154,0.488,Africa,1.173113
2155,0.498,Africa,1.140062


In [6]:
# We found something here, at a glance:
# 1. Index only ranged from 0 to 154, we know that there are 1830 rows
# 2. Date only ranged from month 8 to 9, we know that our data ranged from month 1 to 12 

# We need more investigation about this

In [17]:
# Try to reset the index to solve first problem
raw_dataset.reset_index(inplace = True, drop = True)

In [18]:
# Now check the result
raw_dataset

Unnamed: 0,hdi,continent,EFConsPerCap
0,0.340,Asia,0.648085
1,0.341,Asia,0.605820
2,0.373,Asia,0.704061
3,0.381,Asia,0.708856
4,0.396,Asia,0.619414
...,...,...,...
2152,0.452,Africa,1.241330
2153,0.464,Africa,1.306024
2154,0.488,Africa,1.173113
2155,0.498,Africa,1.140062


In [9]:
# Seems like problem number 1 has benn fixed

In [19]:
# Save raw dataset to file
util.pickle_dump(raw_dataset, config_data["raw_dataset_path"])

## 2. Data Definition

In [11]:
# Define data type, range of data and some explanation out data for each variable

## 3. Data Validation

### 3.1. Tipe Data

In [12]:
# Check data type each variable
raw_dataset.dtypes

tanggal     object
stasiun     object
pm10        object
pm25        object
so2         object
co          object
o3          object
no2         object
max         object
critical    object
categori    object
dtype: object

In [13]:
# The result shows us that all variable data type is object, which is not true. We need more investigation about it later

### 3.2. Range

In [14]:
# Check the range of data for each variable
raw_dataset.describe()

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
count,1830,1830,1830,1768,1830,1830,1830,1830,1830,1813,1829
unique,335,5,122,191,121,54,120,86,186,6,4
top,2021-07-15,DKI1 (Bunderan HI),51,81,---,10,24,17,77,PM25,SEDANG
freq,10,366,72,41,114,171,67,90,45,1631,1305


In [15]:
# This is messed up because the data type don't match with actual values for each variabel

### 3.3. Dimensi Data

In [16]:
# It will not be affected
raw_dataset.shape

(1830, 11)

### 3.4. Handling Variables Error

#### 3.4.1. Handing Variabel "Tanggal"

In [17]:
# Try to cast data in variable tanggal to datetime
raw_dataset.tanggal = pd.to_datetime(raw_dataset.tanggal)

In [18]:
# Check the result
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   object        
 3   pm25      1768 non-null   object        
 4   so2       1830 non-null   object        
 5   co        1830 non-null   object        
 6   o3        1830 non-null   object        
 7   no2       1830 non-null   object        
 8   max       1830 non-null   object        
 9   critical  1813 non-null   object        
 10  categori  1829 non-null   object        
dtypes: datetime64[ns](1), object(10)
memory usage: 157.4+ KB


#### 3.4.2. Handling Variable "PM10"

In [19]:
# Try direrctly casting to integer
raw_dataset.pm10 = raw_dataset.pm10.astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [20]:
# Error occurs, it tells us that there is data that isn't integer ("---")
# We need to determine representative number for this kind of data
# We can't replace with NaN right now because splitting data can't tolerate NaN

In [21]:
# Based on data definition, number -1 is free to use
# But we want to make sure that there is no single data that contain -1
raw_dataset[(raw_dataset.eq("-1").any(1)) | (raw_dataset.eq(-1).any(1))]

  raw_dataset[(raw_dataset.eq("-1").any(1)) | (raw_dataset.eq(-1).any(1))]


Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [22]:
# Replace "---" with -1 and convert -1 to NaN later in preprocessing and feature Engineering step
raw_dataset.pm10 = raw_dataset.pm10.replace("---", -1).astype(int)

In [23]:
# Check the result
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   int32         
 3   pm25      1768 non-null   object        
 4   so2       1830 non-null   object        
 5   co        1830 non-null   object        
 6   o3        1830 non-null   object        
 7   no2       1830 non-null   object        
 8   max       1830 non-null   object        
 9   critical  1813 non-null   object        
 10  categori  1829 non-null   object        
dtypes: datetime64[ns](1), int32(1), object(9)
memory usage: 150.2+ KB


In [24]:
# Some feature has NaN value, pm25, critical and categori

#### 3.4.3. Handling Variable "PM25"

In [25]:
# Same like in pm10, try directly casting to int
raw_dataset.pm25 = raw_dataset.pm25.astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [26]:
# Same error occurs, so we will replace it with -1 like before

In [27]:
# From analyzing in pm10 we know that pm25 has NaN value, so we just replace it with -1
raw_dataset.pm25.fillna(-1, inplace = True)

In [28]:
# Also replace "---" with -1
raw_dataset.pm25 = raw_dataset.pm25.replace("---", -1).astype(int)

In [29]:
# Check the result
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   int32         
 3   pm25      1830 non-null   int32         
 4   so2       1830 non-null   object        
 5   co        1830 non-null   object        
 6   o3        1830 non-null   object        
 7   no2       1830 non-null   object        
 8   max       1830 non-null   object        
 9   critical  1813 non-null   object        
 10  categori  1829 non-null   object        
dtypes: datetime64[ns](1), int32(2), object(8)
memory usage: 143.1+ KB


In [30]:
# We do this to all variables

#### 3.4.4. Handling Variable "SO2"

In [31]:
raw_dataset.so2 = raw_dataset.so2.astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [32]:
raw_dataset.so2 = raw_dataset.so2.replace("---", -1).astype(int)

In [33]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   int32         
 3   pm25      1830 non-null   int32         
 4   so2       1830 non-null   int32         
 5   co        1830 non-null   object        
 6   o3        1830 non-null   object        
 7   no2       1830 non-null   object        
 8   max       1830 non-null   object        
 9   critical  1813 non-null   object        
 10  categori  1829 non-null   object        
dtypes: datetime64[ns](1), int32(3), object(7)
memory usage: 135.9+ KB


#### 3.4.5. Handling Variable "CO"

In [34]:
raw_dataset.co = raw_dataset.co.astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [35]:
raw_dataset.co = raw_dataset.co.replace("---", -1).astype(int)

In [36]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   int32         
 3   pm25      1830 non-null   int32         
 4   so2       1830 non-null   int32         
 5   co        1830 non-null   int32         
 6   o3        1830 non-null   object        
 7   no2       1830 non-null   object        
 8   max       1830 non-null   object        
 9   critical  1813 non-null   object        
 10  categori  1829 non-null   object        
dtypes: datetime64[ns](1), int32(4), object(6)
memory usage: 128.8+ KB


#### 3.4.6. Handling Variable "O3"

In [37]:
raw_dataset.o3 = raw_dataset.o3.astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [38]:
raw_dataset.o3 = raw_dataset.o3.replace("---", -1).astype(int)

In [39]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   int32         
 3   pm25      1830 non-null   int32         
 4   so2       1830 non-null   int32         
 5   co        1830 non-null   int32         
 6   o3        1830 non-null   int32         
 7   no2       1830 non-null   object        
 8   max       1830 non-null   object        
 9   critical  1813 non-null   object        
 10  categori  1829 non-null   object        
dtypes: datetime64[ns](1), int32(5), object(5)
memory usage: 121.6+ KB


#### 3.4.7. Handling Variable "NO2"

In [40]:
raw_dataset.no2 = raw_dataset.no2.astype(int)

ValueError: invalid literal for int() with base 10: '---'

In [41]:
raw_dataset.no2 = raw_dataset.no2.replace("---", -1).astype(int)

In [42]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   int32         
 3   pm25      1830 non-null   int32         
 4   so2       1830 non-null   int32         
 5   co        1830 non-null   int32         
 6   o3        1830 non-null   int32         
 7   no2       1830 non-null   int32         
 8   max       1830 non-null   object        
 9   critical  1813 non-null   object        
 10  categori  1829 non-null   object        
dtypes: datetime64[ns](1), int32(6), object(4)
memory usage: 114.5+ KB


#### 3.4.8. Handling Variable "Max"

In [43]:
raw_dataset["max"] = raw_dataset["max"].astype(int)

ValueError: invalid literal for int() with base 10: 'PM25'

In [44]:
# This seems like different from before, we need to investigate
raw_dataset[raw_dataset["max"] == "PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
1372,2021-12-03,DKI1 (Bunderan HI),49,31,9,19,7,49,PM25,BAIK,


In [45]:
# From row 1372, we know that data in variable max should be integer value from pm10 or no2
# Data in critical should be pm10 or no2
# For categori, we could assumpt that it is baik, but need to check it later in eda step based on other variables

In [46]:
# quick fix the problem
raw_dataset.loc[1372, "max"] = 49
raw_dataset.loc[1372, "critical"] = "PM10"
raw_dataset.loc[1372, "categori"] = "BAIK"

In [47]:
# Check the result
raw_dataset[raw_dataset["max"] == "PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [48]:
raw_dataset.loc[1372]

tanggal     2021-12-03 00:00:00
stasiun      DKI1 (Bunderan HI)
pm10                         49
pm25                         31
so2                           9
co                           19
o3                            7
no2                          49
max                          49
critical                   PM10
categori                   BAIK
Name: 1372, dtype: object

In [49]:
raw_dataset["max"] = raw_dataset["max"].astype(int)

In [50]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1830 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1830 non-null   datetime64[ns]
 1   stasiun   1830 non-null   object        
 2   pm10      1830 non-null   int32         
 3   pm25      1830 non-null   int32         
 4   so2       1830 non-null   int32         
 5   co        1830 non-null   int32         
 6   o3        1830 non-null   int32         
 7   no2       1830 non-null   int32         
 8   max       1830 non-null   int32         
 9   critical  1813 non-null   object        
 10  categori  1830 non-null   object        
dtypes: datetime64[ns](1), int32(7), object(3)
memory usage: 107.4+ KB


#### 3.4.9. Handling Variable "Categori"

In [51]:
# Check range of categori
raw_dataset.categori.value_counts()

SEDANG            1305
TIDAK SEHAT        319
BAIK               189
TIDAK ADA DATA      17
Name: categori, dtype: int64

In [52]:
# Data "tidak ada data" is indication of null value, we can drop it

In [53]:
raw_dataset.drop(index = raw_dataset[raw_dataset.categori == "TIDAK ADA DATA"].index, inplace = True)

In [54]:
raw_dataset.categori.value_counts()

SEDANG         1305
TIDAK SEHAT     319
BAIK            189
Name: categori, dtype: int64

In [55]:
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1813 entries, 0 to 1829
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1813 non-null   datetime64[ns]
 1   stasiun   1813 non-null   object        
 2   pm10      1813 non-null   int32         
 3   pm25      1813 non-null   int32         
 4   so2       1813 non-null   int32         
 5   co        1813 non-null   int32         
 6   o3        1813 non-null   int32         
 7   no2       1813 non-null   int32         
 8   max       1813 non-null   int32         
 9   critical  1813 non-null   object        
 10  categori  1813 non-null   object        
dtypes: datetime64[ns](1), int32(7), object(3)
memory usage: 120.4+ KB


In [56]:
util.pickle_dump(raw_dataset, config_data["cleaned_raw_dataset_path"])

## 4. Data Defense

In [57]:
def check_data(input_data, params):
    # Check data types
    assert input_data.select_dtypes("datetime").columns.to_list() == params["datetime_columns"], "an error occurs in datetime column(s)."
    assert input_data.select_dtypes("object").columns.to_list() == params["object_columns"], "an error occurs in object column(s)."
    assert input_data.select_dtypes("int").columns.to_list() == params["int32_columns"], "an error occurs in int32 column(s)."

    # Check range of data
    assert set(input_data.stasiun).issubset(set(params["range_stasiun"])), "an error occurs in stasiun range."
    assert input_data.pm10.between(params["range_pm10"][0], params["range_pm10"][1]).sum() == len(input_data), "an error occurs in pm10 range."
    assert input_data.pm25.between(params["range_pm25"][0], params["range_pm25"][1]).sum() == len(input_data), "an error occurs in pm25 range."
    assert input_data.so2.between(params["range_so2"][0], params["range_so2"][1]).sum() == len(input_data), "an error occurs in so2 range."
    assert input_data.co.between(params["range_co"][0], params["range_co"][1]).sum() == len(input_data), "an error occurs in co range."
    assert input_data.o3.between(params["range_o3"][0], params["range_o3"][1]).sum() == len(input_data), "an error occurs in o3 range."
    assert input_data.no2.between(params["range_no2"][0], params["range_no2"][1]).sum() == len(input_data), "an error occurs in no2 range."

In [58]:
check_data(raw_dataset, config_data)

## 5. Data Splitting

In [59]:
# Split input/variable/feature with target/labet/output
x = raw_dataset[config_data["predictors"]].copy()
y = raw_dataset.categori.copy()

In [60]:
x.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1813 entries, 0 to 1829
Data columns (total 7 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   stasiun  1813 non-null   object
 1   pm10     1813 non-null   int32 
 2   pm25     1813 non-null   int32 
 3   so2      1813 non-null   int32 
 4   co       1813 non-null   int32 
 5   o3       1813 non-null   int32 
 6   no2      1813 non-null   int32 
dtypes: int32(6), object(1)
memory usage: 70.8+ KB


In [61]:
y.value_counts()

SEDANG         1305
TIDAK SEHAT     319
BAIK            189
Name: categori, dtype: int64

In [62]:
# First split, splitting train and test set with ratio 0.7:0.3 and do stratify splitting
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3, random_state = 42, stratify = y)

In [63]:
# Second split, splitting test and valid set with ratio 0.5:0.5 and do stratify splitting
x_valid, x_test, y_valid, y_test = train_test_split(x_test, y_test, test_size = 0.5, random_state = 42, stratify = y_test)

In [64]:
util.pickle_dump(x_train, config_data["train_set_path"][0])
util.pickle_dump(y_train, config_data["train_set_path"][1])

util.pickle_dump(x_valid, config_data["valid_set_path"][0])
util.pickle_dump(y_valid, config_data["valid_set_path"][1])

util.pickle_dump(x_test, config_data["test_set_path"][0])
util.pickle_dump(y_test, config_data["test_set_path"][1])