# Data Pipeline

In [None]:
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import joblib
import os
import yaml

## 1. Data Collection

In [None]:
params_dir = "/content/params.yaml"

In [None]:
def load_params(param_dir):
    with open(param_dir, 'r') as file:
        params = yaml.safe_load(file)

    return params

params = load_params(params_dir)

In [None]:
params

{'dataset_dir': '/content/data/',
 'datetime_columns': ['tanggal'],
 'int32_columns': ['pm10', 'pm25', 'so2', 'co', 'o3', 'no2', 'max'],
 'label': 'categori',
 'label_categories': ['BAIK', 'SEDANG', 'TIDAK SEHAT'],
 'label_categories_new': ['BAIK', 'TIDAK BAIK'],
 'missing_value_co': 11,
 'missing_value_no2': 18,
 'missing_value_o3': 29,
 'missing_value_pm10': {'BAIK': 28, 'TIDAK BAIK': 55},
 'missing_value_pm25': {'BAIK': 38, 'TIDAK BAIK': 82},
 'missing_value_so2': 35,
 'object_columns': ['stasiun', 'critical', 'categori'],
 'predictors': ['stasiun', 'pm10', 'pm25', 'so2', 'co', 'o3', 'no2'],
 'range_co': [-1, 100],
 'range_no2': [-1, 100],
 'range_o3': [-1, 160],
 'range_pm10': [-1, 800],
 'range_pm25': [-1, 400],
 'range_so2': [-1, 500],
 'range_stasiun': ['DKI1 (Bunderan HI)',
  'DKI2 (Kelapa Gading)',
  'DKI3 (Jagakarsa)',
  'DKI4 (Lubang Buaya)',
  'DKI5 (Kebon Jeruk) Jakarta Barat']}

In [None]:
# fungsi untuk membaca nama file, memuat file, dan menggabungkan dataset
def read_dataset(dataset_dir):
    dataset = pd.DataFrame()

    for i in tqdm(os.listdir(dataset_dir)):
        dataset = pd.concat([pd.read_csv(dataset_dir + i), dataset])

    return dataset

In [None]:
# melakukan pembacaan nama file, memuat file, dan menggabungkan dataset
dataset = read_dataset(params["dataset_dir"])

100%|██████████| 12/12 [00:00<00:00, 315.15it/s]


In [None]:
df = dataset.copy()

In [None]:
df

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-10-01,DKI1 (Bunderan HI),57,81,30,11,32,38,81,PM25,SEDANG
1,2021-10-02,DKI1 (Bunderan HI),67,99,32,11,30,35,99,PM25,SEDANG
2,2021-10-03,DKI1 (Bunderan HI),70,85,29,10,28,28,85,PM25,SEDANG
3,2021-10-04,DKI1 (Bunderan HI),58,82,30,11,34,29,82,PM25,SEDANG
4,2021-10-05,DKI1 (Bunderan HI),55,76,29,11,30,33,76,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
150,2021-12-27,DKI5 (Kebon Jeruk) Jakarta Barat,54,76,36,14,21,47,76,PM25,SEDANG
151,2021-12-28,DKI5 (Kebon Jeruk) Jakarta Barat,44,68,20,11,21,33,68,PM25,SEDANG
152,2021-12-29,DKI5 (Kebon Jeruk) Jakarta Barat,34,54,28,8,25,29,54,PM25,SEDANG
153,2021-12-30,DKI5 (Kebon Jeruk) Jakarta Barat,53,75,25,15,23,44,75,PM25,SEDANG


### index hanya terlihat sampai 154 padahal jumlah rows sampai 1070

In [None]:
df = df.reset_index(inplace = True, drop=True)
df

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-10-01,DKI1 (Bunderan HI),57,81,30,11,32,38,81,PM25,SEDANG
1,2021-10-02,DKI1 (Bunderan HI),67,99,32,11,30,35,99,PM25,SEDANG
2,2021-10-03,DKI1 (Bunderan HI),70,85,29,10,28,28,85,PM25,SEDANG
3,2021-10-04,DKI1 (Bunderan HI),58,82,30,11,34,29,82,PM25,SEDANG
4,2021-10-05,DKI1 (Bunderan HI),55,76,29,11,30,33,76,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
1820,2021-12-27,DKI5 (Kebon Jeruk) Jakarta Barat,54,76,36,14,21,47,76,PM25,SEDANG
1821,2021-12-28,DKI5 (Kebon Jeruk) Jakarta Barat,44,68,20,11,21,33,68,PM25,SEDANG
1822,2021-12-29,DKI5 (Kebon Jeruk) Jakarta Barat,34,54,28,8,25,29,54,PM25,SEDANG
1823,2021-12-30,DKI5 (Kebon Jeruk) Jakarta Barat,53,75,25,15,23,44,75,PM25,SEDANG


In [None]:
for col in df.select_dtypes(include='object').columns.tolist():
    print(df[col].value_counts(normalize=True)*100)
    print('\n')

2021-10-01    0.273973
2021-02-07    0.273973
2021-02-05    0.273973
2021-02-04    0.273973
2021-02-03    0.273973
                ...   
2021-03-29    0.273973
2021-03-28    0.273973
2021-03-27    0.273973
2021-03-26    0.273973
2021-12-31    0.273973
Name: tanggal, Length: 365, dtype: float64


DKI1 (Bunderan HI)                  20.0
DKI2 (Kelapa Gading)                20.0
DKI3 (Jagakarsa)                    20.0
DKI4 (Lubang Buaya)                 20.0
DKI5 (Kebon Jeruk) Jakarta Barat    20.0
Name: stasiun, dtype: float64


51     3.780822
---    3.726027
52     3.397260
55     2.958904
54     2.794521
         ...   
26     0.054795
42     0.054795
87     0.054795
100    0.054795
179    0.054795
Name: pm10, Length: 130, dtype: float64


77     2.552467
81     2.268860
---    2.155417
83     1.871809
79     1.871809
         ...   
65     0.056721
53     0.056721
119    0.056721
112    0.056721
136    0.056721
Name: pm25, Length: 192, dtype: float64


---    6.246575
27     3.0136

In [None]:
# simpan dataset yang telah digabungkan
joblib.dump(dataset, "/content/processed/dataset.pkl")

['/content/processed/dataset.pkl']

## 2. Data Definition

<b>tanggal</b>         :
    [datetime]
    [00:00 01/01/2021 - 23:59 31/12/2021]
    waktu saat pengambilan sampel

<b>stasiun</b>         :
    [string]
    ['DKI1 (Bunderan HI)', 'DKI2 (Kelapa Gading)', 'DKI3 (Jagakarsa)', 'DKI4 (Lubang Buaya)', 'DKI5 (Kebon Jeruk) Jakarta Barat']
    lokasi saat pengambilan sampel

<b>pm10</b>            :
    [integer]
    [0 - 800]
    partikel udara yang berukuran lebih kecil dari 10 mikron

<b>pm25</b>            :
    [integer]
    [0 - 400]
    partikel udara yang berukuran lebih kecil dari 2.5 mikron

<b>so2</b>             :
    [integer]
    [0 - 500]
    sulfur dioksida

<b>co</b>              :
    [integer]
    [0 - 100]
    karbon monoksida

<b>o3</b>              :
    [integer]
    [0 - 140]
    ozone

<b>no2</b>             :
    [integer]
    [0 - 100]
    nitrogen dioksida

<b>max</b>             :
    [integer]
    [0 - 800]
    nilai paling besar diantara pm10, pm25, so2, co, o3, dan no2

<b>critical</b>        :
    [string]
    [PM10, PM25, SO2, CO, O3, dan NO2]
    nama kolom untuk nilai max

<b>categori</b>        :
    [string]
    [BAIK, SEDANG, TIDAK SEHAT]
    kategori untuk data pengukuran udara

<b>location</b>        : data yang tidak memiliki informasi apapun

In [None]:
df = joblib.load('/content/data/dataset.pkl')
df.sample()

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
93,2021-04-04,DKI4 (Lubang Buaya),64,103,43,26,23,20,103,PM25,TIDAK SEHAT


## 3. Data Validatio, Cleansing and Preprocessing

In [None]:
# cek tipe data
df.dtypes

tanggal     object
stasiun     object
pm10        object
pm25        object
so2         object
co          object
o3          object
no2         object
max         object
critical    object
categori    object
dtype: object

dari pengecekan data terlihat bahwa semuanya adalah data objek (string), perlu diselidiki lebih lanjut

### 3.2. Range

In [None]:
# pengecekan cakupan data menjadi kacau jika tipe data tidak sesuai
df.describe()

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
count,1825,1825,1825,1763,1825,1825,1825,1825,1825,1809,1824
unique,365,5,130,192,121,54,121,96,187,6,4
top,2021-10-01,DKI1 (Bunderan HI),51,77,---,9,---,13,77,PM25,SEDANG
freq,5,365,69,45,114,163,68,81,50,1630,1349


### 3.3. Dimensi Data

In [None]:
# dimensi data kemungkinan besar tidak terpengaruh, namun nanti kita kembali lagi
df.shape

(1825, 11)

### 3.4. Handling Data Not Correct / Error

In [None]:
# cek tipe data pada kolom tanggal
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1825 entries, 0 to 154
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   tanggal   1825 non-null   object
 1   stasiun   1825 non-null   object
 2   pm10      1825 non-null   object
 3   pm25      1763 non-null   object
 4   so2       1825 non-null   object
 5   co        1825 non-null   object
 6   o3        1825 non-null   object
 7   no2       1825 non-null   object
 8   max       1825 non-null   object
 9   critical  1809 non-null   object
 10  categori  1824 non-null   object
dtypes: object(11)
memory usage: 171.1+ KB


In [None]:
# casting tipe data ke datetime
df['tanggal'] = pd.to_datetime(df['tanggal'])

dari penglihatan saya diatas, terdapat value yang aneh, yaitu '---' perlu disolve terlebih dahulu lalu di casting ke int

karena value ini tidak diketahui maksudnya maka saya memiliki opsi antara menghapus atau mengganti dengan nilai unik. disini saya memilih menjadikan nilai unik (-1)

In [None]:
# mencoba melihat apakah -1 benar unik?
df[(df.eq("-1").any(1)) | (df.eq(-1).any(1))]

  df[(df.eq("-1").any(1)) | (df.eq(-1).any(1))]


Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [None]:
# replace data teks dengan -1
df['pm10'] = df['pm10'].replace("---", -1).astype(int)

In [None]:
# sama halnya pada kolom p25 terdapat value yang aneh yaitu '---' maka kita akan lakukan yang sama seperti pm10

df['pm25'] = df['pm25'].replace("---", -1).astype(int)

ValueError: ignored

dapat dilihat bahwa terdapat nilai missing pada kolom tersebut. kita perlu solve terlebih dahulu

In [None]:
df.pm25.isna().sum()

62

In [None]:
# replacing NaN dengan -1
df.pm25.fillna(-1, inplace = True)

In [None]:
df['pm25'] = df['pm25'].replace("---", -1).astype(int)

lanjut kekolom berikutnya sama saja

In [None]:
df.so2 = df.so2.replace("---", -1).astype(int)

In [None]:
df.co = df.co.replace("---", -1).astype(int)

In [None]:
df.o3 = df.o3.replace("---", -1).astype(int)

In [None]:
df.no2 = df.no2.replace("---", -1).astype(int)

In [None]:
df["max"] = df["max"].astype(int)

ValueError: ignored

terdapat value yang tidak sesuai dengan kolomnya

In [None]:
df.reset_index(inplace = True, drop=True)

In [None]:
df[df["max"] == "PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
1672,2021-12-03,DKI1 (Bunderan HI),49,31,9,19,7,49,PM25,BAIK,


- Terlihat pattern data dan kolom pada index 1672 ini tidak sesuai
- Kita dapat menghandle masalah tersebut dengan mengganti max dengan data terdekatnya yaitu "49", critical dengan "PM10" atau "no2" sesuai dengan data yang kita pilih pada kolom max dan kategori "BAIK"

In [None]:
# quick fix the problem
df.loc[1672, "max"] = 49
df.loc[1672, "critical"] = "PM10"
df.loc[1672, "categori"] = "BAIK"

In [None]:
# cek ulang hasilnya
df[df["max"] == "PM25"]

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori


In [None]:
df.loc[1672]

tanggal     2021-12-03 00:00:00
stasiun      DKI1 (Bunderan HI)
pm10                         49
pm25                         31
so2                           9
co                           19
o3                            7
no2                          49
max                          49
critical                   PM10
categori                   BAIK
Name: 1672, dtype: object

In [None]:
df["max"] = df["max"].astype(int)

In [None]:
# cek data unik pada kolom kategorik "critical"
df['critical'].value_counts()

PM25    1630
PM10      64
O3        56
CO        34
SO2       25
Name: critical, dtype: int64

terlihat normal sehingga kita tidak perlu untuk melakukan perubahan


In [None]:
# cek data unik untuk kolom kategorik "categori" yang merupakan data label atau dependen variabel
df['categori'].value_counts()

SEDANG            1349
TIDAK SEHAT        272
BAIK               188
TIDAK ADA DATA      16
Name: categori, dtype: int64

terdapat data "TIDAK ADA DATA" yang mengindikasikan null value
bisa kita langsung hapus (drop)

In [None]:
df.drop(index = df[df.categori == "TIDAK ADA DATA"].index, inplace = True)

In [None]:
df['categori'].value_counts()

SEDANG         1349
TIDAK SEHAT     272
BAIK            188
Name: categori, dtype: int64

Untuk memudahkan melakukan perbandingan ekstrim dan interpretasi model maka kita dapat mengubah label 'SEDANG' menjadi 'BAIK'

In [None]:
df['categori'] = df['categori'].replace('SEDANG', 'BAIK')

In [None]:
df['categori'].value_counts()

BAIK           1537
TIDAK SEHAT     272
Name: categori, dtype: int64

In [None]:
joblib.dump(df, "/content/processed/dataset_clean.pkl")

['/content/processed/dataset_clean.pkl']

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1809 entries, 0 to 1824
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   tanggal   1809 non-null   datetime64[ns]
 1   stasiun   1809 non-null   object        
 2   pm10      1809 non-null   int64         
 3   pm25      1809 non-null   int64         
 4   so2       1809 non-null   int64         
 5   co        1809 non-null   int64         
 6   o3        1809 non-null   int64         
 7   no2       1809 non-null   int64         
 8   max       1809 non-null   int64         
 9   critical  1809 non-null   object        
 10  categori  1809 non-null   object        
dtypes: datetime64[ns](1), int64(7), object(3)
memory usage: 169.6+ KB


## 4. Data Defense

In [None]:
def check_data(input_data, params):
    # check data types
    assert input_data.select_dtypes("datetime").columns.to_list() == params["datetime_columns"], "an error occurs in datetime column(s)."
    assert input_data.select_dtypes("object").columns.to_list() == params["object_columns"], "an error occurs in object column(s)."
    assert input_data.select_dtypes("int64").columns.to_list() == params["int32_columns"], "an error occurs in int32 column(s)."

    # check range of data
    assert set(input_data.stasiun).issubset(set(params["range_stasiun"])), "an error occurs in stasiun range."
    assert input_data.pm10.between(params["range_pm10"][0], params["range_pm10"][1]).sum() == len(input_data), "an error occurs in pm10 range."
    assert input_data.pm25.between(params["range_pm25"][0], params["range_pm25"][1]).sum() == len(input_data), "an error occurs in pm25 range."
    assert input_data.so2.between(params["range_so2"][0], params["range_so2"][1]).sum() == len(input_data), "an error occurs in so2 range."
    assert input_data.co.between(params["range_co"][0], params["range_co"][1]).sum() == len(input_data), "an error occurs in co range."
    assert input_data.o3.between(params["range_o3"][0], params["range_o3"][1]).sum() == len(input_data), "an error occurs in o3 range."
    assert input_data.no2.between(params["range_no2"][0], params["range_no2"][1]).sum() == len(input_data), "an error occurs in no2 range."

    print("Unit testing berhasil.")

In [None]:
check_data(df, params)

Unit testing berhasil.


## 5. Data Splitting

In [None]:
# pisahkan data x dan y (x adalah fitur, y adalah label)
x = df[params["predictors"]].copy()
y = df.categori.copy()

In [None]:
x.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1809 entries, 0 to 1824
Data columns (total 7 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   stasiun  1809 non-null   object
 1   pm10     1809 non-null   int64 
 2   pm25     1809 non-null   int64 
 3   so2      1809 non-null   int64 
 4   co       1809 non-null   int64 
 5   o3       1809 non-null   int64 
 6   no2      1809 non-null   int64 
dtypes: int64(6), object(1)
memory usage: 113.1+ KB


In [None]:
y.value_counts()

BAIK           1537
TIDAK SEHAT     272
Name: categori, dtype: int64

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3, random_state = 42, stratify = y)

In [None]:
x_valid, x_test, y_valid, y_test = train_test_split(x_test, y_test, test_size = 0.5, random_state = 42, stratify = y_test)

In [None]:
joblib.dump(x_train, "/content/processed/x_train.pkl")
joblib.dump(y_train, "/content/processed/y_train.pkl")
joblib.dump(x_valid, "/content/processed/x_valid.pkl")
joblib.dump(y_valid, "/content/processed/y_valid.pkl")
joblib.dump(x_test, "/content/processed/x_test.pkl")
joblib.dump(y_test, "/content/processed/y_test.pkl")

['/content/processed/y_test.pkl']