# 4. Data Pipeline

## 4.1. Import needed libraries

In [1]:
import os
import joblib
import sqlite3
import pandas as pd
from sklearn.model_selection import train_test_split

## 4.2. Loading dataset from files

### 4.2.1. Load single file dataset

In [2]:
def load_single_file_dataset(path: str) -> pd.DataFrame:
    '''
    Brief:
    This function intended to load a single comma separated value (CSV) file to program.

    Params:
    1.  path
        String, defining the location of target CSV file.
    
    Return:
    1.  dataset
        Pandas DataFrame that contains data from target CSV file.
    '''
    dataset = pd.read_csv(path)
    
    return dataset

In [3]:
dataset = load_single_file_dataset("../data/raw/files/indeks-standar-pencemar-udara-di-spku-bulan-agustus-tahun-2021.csv")

In [4]:
dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-08-01,DKI1 (Bunderan HI),51,68,25,8,29,22,68,PM25,SEDANG
1,2021-08-02,DKI1 (Bunderan HI),47,63,24,10,25,28,63,PM25,SEDANG
2,2021-08-03,DKI1 (Bunderan HI),50,68,26,11,19,35,68,PM25,SEDANG
3,2021-08-04,DKI1 (Bunderan HI),52,70,29,8,24,26,70,PM25,SEDANG
4,2021-08-05,DKI1 (Bunderan HI),52,66,29,9,21,27,66,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
150,2021-08-27,DKI5 (Kebon Jeruk) Jakarta Barat,61,96,34,8,29,15,96,PM25,SEDANG
151,2021-08-28,DKI5 (Kebon Jeruk) Jakarta Barat,63,100,31,8,44,12,100,PM25,SEDANG
152,2021-08-29,DKI5 (Kebon Jeruk) Jakarta Barat,67,111,32,10,36,13,111,PM25,TIDAK SEHAT
153,2021-08-30,DKI5 (Kebon Jeruk) Jakarta Barat,83,126,35,16,32,29,126,PM25,TIDAK SEHAT


### 4.2.2. Load multiple files dataset

In [5]:
def load_multiple_files_dataset(root_path: str) -> pd.DataFrame:
    '''
    Brief:
    This function intended to load multiple comma separated value (CSV) files to program.

    Params:
    1.  root_path
        String, defining the folder of all target CSV files located.
    
    Return:
    1.  dataset
        Pandas DataFrame that contains combined data from all target CSV files.
    '''
    list_files = os.listdir(root_path)

    list_dataset_path = [root_path + file_name for file_name in list_files]
    
    dataset = pd.DataFrame()

    for dataset_path in list_dataset_path:
        temp = pd.read_csv(dataset_path)
        dataset = pd.concat([dataset, temp])

    return dataset

In [6]:
dataset = load_multiple_files_dataset("../data/raw/files/")

In [7]:
dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori
0,2021-08-01,DKI1 (Bunderan HI),51,68,25,8,29,22,68,PM25,SEDANG
1,2021-08-02,DKI1 (Bunderan HI),47,63,24,10,25,28,63,PM25,SEDANG
2,2021-08-03,DKI1 (Bunderan HI),50,68,26,11,19,35,68,PM25,SEDANG
3,2021-08-04,DKI1 (Bunderan HI),52,70,29,8,24,26,70,PM25,SEDANG
4,2021-08-05,DKI1 (Bunderan HI),52,66,29,9,21,27,66,PM25,SEDANG
...,...,...,...,...,...,...,...,...,...,...,...
145,2021-09-26,DKI5 (Kebon Jeruk) Jakarta Barat,48,67,---,4,30,9,67,PM25,SEDANG
146,2021-09-27,DKI5 (Kebon Jeruk) Jakarta Barat,51,78,---,9,22,18,78,PM25,SEDANG
147,2021-09-28,DKI5 (Kebon Jeruk) Jakarta Barat,42,64,---,5,26,14,64,PM25,SEDANG
148,2021-09-29,DKI5 (Kebon Jeruk) Jakarta Barat,56,87,---,11,34,19,87,PM25,SEDANG


## 4.3. Loading dataset from database

#### 4.3.1. Connect to database

In [8]:
def create_connection_to_db(url_to_db: str) -> sqlite3.Connection:
    '''
    Brief:
    This function intended to create connection to specified sqlite3 database.

    Params:
    1.  url_to_db
        String, defining the sqlite3 database file located.
    
    Return:
    1.  connection
        Sqlite3 connection object.
    '''
    connection = sqlite3.connect(url_to_db)

    return connection

In [9]:
connection = create_connection_to_db("../data/raw/database/dataset.db")

In [10]:
def create_cursor_to_db(current_connection: sqlite3.Connection) -> sqlite3.Cursor:
    '''
    Brief:
    This function intended to create cursor based on established connection object. Cursor is a mechanism that enables user to traversal over the records in database.

    Params:
    1.  current_connection
        Sqlite3 connection object, defining the connection between object and database.
    
    Return:
    1.  cursor
        Sqlite3 cursor object.
    '''
    cursor = current_connection.cursor()

    return cursor

In [11]:
cursor = create_cursor_to_db(connection)

#### 4.3.2. Select data from database

In [12]:
def execute_query(cursor: sqlite3.Cursor, query: str) -> list:
    '''
    Brief:
    This function intended to execute any query that supported by sqlite3.

    Params:
    1.  cursor
        Sqlite3 cursor object, object that has capability to access and do traversal in target database.
    
    2.  query
        String, the query that used want to execute.
    
    Return:
    1.  result
        List, result of executed query.
    '''
    result = cursor.execute(query)
    result = result.fetchall()

    return result

In [13]:
result = execute_query(cursor, "SELECT * FROM ispu")

OperationalError: no such table: ispu

In [None]:
result

[('2021-08-01',
  'DKI1 (Bunderan HI)',
  51,
  68,
  25,
  8,
  29,
  22,
  68,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-02',
  'DKI1 (Bunderan HI)',
  47,
  63,
  24,
  10,
  25,
  28,
  63,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-03',
  'DKI1 (Bunderan HI)',
  50,
  68,
  26,
  11,
  19,
  35,
  68,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-04',
  'DKI1 (Bunderan HI)',
  52,
  70,
  29,
  8,
  24,
  26,
  70,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-05',
  'DKI1 (Bunderan HI)',
  52,
  66,
  29,
  9,
  21,
  27,
  66,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-06',
  'DKI1 (Bunderan HI)',
  56,
  73,
  28,
  15,
  11,
  28,
  73,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-07',
  'DKI1 (Bunderan HI)',
  55,
  71,
  31,
  7,
  18,
  18,
  71,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-08',
  'DKI1 (Bunderan HI)',
  53,
  69,
  33,
  6,
  23,
  16,
  69,
  'PM25',
  'SEDANG',
  None),
 ('2021-08-09',
  'DKI1 (Bunderan HI)',
  52,
  60,
  30,
  6,
  25,
  15,
  60,
  'PM25',
  'SEDANG'

In [15]:
def create_dataframe_from_query_result(data: list) -> pd.DataFrame:
    '''
    Brief:
    This function intended to convert query result to Pandas DataFrame.

    Params:
    1.  data
        List, object that has one or more elements as result from query.
    
    Return:
    1.  data
        Pandas DataFrame, DataFrame version of data.
    '''
    data = pd.DataFrame(data)
    
    return data

In [16]:
dataset = create_dataframe_from_query_result(result)

In [17]:
dataset

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11
0,2021-08-01,DKI1 (Bunderan HI),51,68,25,8,29,22,68,PM25,SEDANG,
1,2021-08-02,DKI1 (Bunderan HI),47,63,24,10,25,28,63,PM25,SEDANG,
2,2021-08-03,DKI1 (Bunderan HI),50,68,26,11,19,35,68,PM25,SEDANG,
3,2021-08-04,DKI1 (Bunderan HI),52,70,29,8,24,26,70,PM25,SEDANG,
4,2021-08-05,DKI1 (Bunderan HI),52,66,29,9,21,27,66,PM25,SEDANG,
...,...,...,...,...,...,...,...,...,...,...,...,...
1065,2021-09-26,DKI5 (Kebon Jeruk) Jakarta Barat,48,67,---,4,30,9,67,PM25,SEDANG,
1066,2021-09-27,DKI5 (Kebon Jeruk) Jakarta Barat,51,78,---,9,22,18,78,PM25,SEDANG,
1067,2021-09-28,DKI5 (Kebon Jeruk) Jakarta Barat,42,64,---,5,26,14,64,PM25,SEDANG,
1068,2021-09-29,DKI5 (Kebon Jeruk) Jakarta Barat,56,87,---,11,34,19,87,PM25,SEDANG,


In [18]:
def get_columns_name_from_query(cursor: sqlite3.Cursor, table_name: str) -> list:
    '''
    Brief:
    This function intended to get columns name of particular table in database.

    Params:
    1.  cursor
        Sqlite3 cursor object, object that has capability to access and do traversal in target database.
        
    2.  table_name
        String, name of target table.
    
    Return:
    1.  columns_name
        List, list of columns name in target table.
    '''
    result = execute_query(cursor, "PRAGMA table_info({})".format(table_name))
    columns_name = [item[1] for item in result]

    return columns_name

In [19]:
columns_name = get_columns_name_from_query(cursor, "ispu")

In [20]:
columns_name

['tanggal',
 'stasiun',
 'pm10',
 'pm25',
 'so2',
 'co',
 'o3',
 'no2',
 'max',
 'critical',
 'categori',
 'location']

In [21]:
def assign_column_name_to_dataframe(target_dataframe: pd.DataFrame, columns_name: list) -> pd.DataFrame:
    '''
    Brief:
    This function intended to assign new columns name to a dataframe object, make sure that the order and lenght is correct.

    Params:
    1.  target_dataframe
        Pandas DataFrame, object that user wanted to change the columns name.
    
    2.  columns_name
        List, object that contains new columns name.
    
    Return:
    1.  target_dataframe
        Pandas DataFrame, target_dataframe but with new columns name.
    '''
    target_dataframe = target_dataframe.copy(deep=True)
    target_dataframe.columns = columns_name

    return target_dataframe

In [22]:
dataset = assign_column_name_to_dataframe(dataset, columns_name)

In [23]:
dataset

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori,location
0,2021-08-01,DKI1 (Bunderan HI),51,68,25,8,29,22,68,PM25,SEDANG,
1,2021-08-02,DKI1 (Bunderan HI),47,63,24,10,25,28,63,PM25,SEDANG,
2,2021-08-03,DKI1 (Bunderan HI),50,68,26,11,19,35,68,PM25,SEDANG,
3,2021-08-04,DKI1 (Bunderan HI),52,70,29,8,24,26,70,PM25,SEDANG,
4,2021-08-05,DKI1 (Bunderan HI),52,66,29,9,21,27,66,PM25,SEDANG,
...,...,...,...,...,...,...,...,...,...,...,...,...
1065,2021-09-26,DKI5 (Kebon Jeruk) Jakarta Barat,48,67,---,4,30,9,67,PM25,SEDANG,
1066,2021-09-27,DKI5 (Kebon Jeruk) Jakarta Barat,51,78,---,9,22,18,78,PM25,SEDANG,
1067,2021-09-28,DKI5 (Kebon Jeruk) Jakarta Barat,42,64,---,5,26,14,64,PM25,SEDANG,
1068,2021-09-29,DKI5 (Kebon Jeruk) Jakarta Barat,56,87,---,11,34,19,87,PM25,SEDANG,


## 4.4. Drop Duplicate

#### 4.4.1. Check for duplipcate

In [24]:
dataset.duplicated().sum()

0

#### 4.4.2. Adding some same data to demonstrate dropping duplicate

In [25]:
new_same_data = dataset[dataset["tanggal"] == "2021-08-01"].reset_index(drop=True)

In [26]:
new_same_data

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori,location
0,2021-08-01,DKI1 (Bunderan HI),51,68,25,8,29,22,68,PM25,SEDANG,
1,2021-08-01,DKI2 (Kelapa Gading),55,75,50,9,67,13,75,PM25,SEDANG,
2,2021-08-01,DKI3 (Jagakarsa),51,66,44,9,31,11,66,PM25,SEDANG,
3,2021-08-01,DKI4 (Lubang Buaya),51,86,39,8,30,22,86,PM25,SEDANG,
4,2021-08-01,DKI5 (Kebon Jeruk) Jakarta Barat,44,70,30,8,29,11,70,PM25,SEDANG,


In [27]:
dataset = pd.concat([dataset, new_same_data])

In [28]:
dataset.reset_index(drop=True, inplace=True)

#### 4.4.3. Rechecking duplicate data

In [29]:
duplicate_data = dataset.duplicated(keep=False)

In [30]:
duplicate_data

0        True
1       False
2       False
3       False
4       False
        ...  
1070     True
1071     True
1072     True
1073     True
1074     True
Length: 1075, dtype: bool

In [31]:
duplicate_data_index = duplicate_data[duplicate_data == True].index

In [32]:
duplicate_data_index

Index([0, 31, 62, 93, 124, 1070, 1071, 1072, 1073, 1074], dtype='int64')

In [33]:
dataset.loc[duplicate_data_index].sort_values(by="stasiun")

Unnamed: 0,tanggal,stasiun,pm10,pm25,so2,co,o3,no2,max,critical,categori,location
0,2021-08-01,DKI1 (Bunderan HI),51,68,25,8,29,22,68,PM25,SEDANG,
1070,2021-08-01,DKI1 (Bunderan HI),51,68,25,8,29,22,68,PM25,SEDANG,
31,2021-08-01,DKI2 (Kelapa Gading),55,75,50,9,67,13,75,PM25,SEDANG,
1071,2021-08-01,DKI2 (Kelapa Gading),55,75,50,9,67,13,75,PM25,SEDANG,
62,2021-08-01,DKI3 (Jagakarsa),51,66,44,9,31,11,66,PM25,SEDANG,
1072,2021-08-01,DKI3 (Jagakarsa),51,66,44,9,31,11,66,PM25,SEDANG,
93,2021-08-01,DKI4 (Lubang Buaya),51,86,39,8,30,22,86,PM25,SEDANG,
1073,2021-08-01,DKI4 (Lubang Buaya),51,86,39,8,30,22,86,PM25,SEDANG,
124,2021-08-01,DKI5 (Kebon Jeruk) Jakarta Barat,44,70,30,8,29,11,70,PM25,SEDANG,
1074,2021-08-01,DKI5 (Kebon Jeruk) Jakarta Barat,44,70,30,8,29,11,70,PM25,SEDANG,


In [34]:
dataset.duplicated().sum()

5

#### 4.4.4. Dropping duplicate

In [35]:
def dropping_duplicate(target_dataframe: pd.DataFrame, subset: list=[]) -> pd.DataFrame:
    '''
    Brief:
    This function intended to drop duplicate on all or only particular column in a dataframe.

    Params:
    1.  target_dataframe
        Pandas DataFrame, object that user wanted to drop the duplicate data.
    
    2.  subset
        List, object that contains name of column user wanted to drop duplicate.
    
    Return:
    1.  target_dataframe
        Pandas DataFrame, target_dataframe but with dropped duplicate.
    '''
    target_dataframe = target_dataframe.copy(deep=True)
    if(subset == []):
        target_dataframe.drop_duplicates(inplace=True)
    else:
        target_dataframe.drop_duplicates(inplace=True, subset=subset)
    
    return target_dataframe

In [36]:
dataset = dropping_duplicate(dataset)

In [37]:
dataset.duplicated().sum()

0

## 4.5. Serialization

In [38]:
def save_data(data: any, save_path: str) -> None:
    '''
    Brief:
    This function intended to serialize data in RAM to file in disk.

    Params:
    1.  data
        Any, object that user wanted to save.
    
    2.  save_path
        str, location of saved data.
    
    Return:
    None.
    '''
    joblib.dump(data, save_path)

In [39]:
save_data(dataset, "../data/processed/dataset.pkl")