**DSS: Data Engineering in Python using Airflow**

- Part of Data Engineering Airflow
- Course Length: 9 Hours
- Last Updated: August 2024

---


- Developed by [Algoritma](https://algorit.ma/)'s product division and instructors team

# Data Engineering in Python using Airflow

## Background

Data Engineering sangat penting untuk mengatasi tantangan kebijakan modern yang berbasis oleh data. Ketika institusi menghasilkan data dalam jumlah besar dari berbagai sumber, kebutuhan akan manajemen dan pemrosesan data yang efisien menjadi sangat penting. Rekayasa data mengubah data mentah menjadi *insight* yang berharga dengan merancang dan mengimplementasikan jalur data yang kuat yang menangani proses ekstraksi, transformasi, dan pemuatan data (ETL), memastikan data bersih, dan terintegrasi dari sumber yang berbeda. 


Otomatisasi data workflow adalah salah satu hal yang dapat diselesaikan oleh data engeering. Pemrosesan data secara manual memakan waktu dan rentan terhadap kesalahan, tetapi tools seperti Apache Airflow dapat mengotomatiskan data workflow yang kompleks, mengurangi campur tangan manusia dan meminimalkan kesalahan. Dengan menyiapkan Directed Acyclic Graphs (DAGs) di Airflow, perekayasa data dapat mengotomatiskan penjadwalan dan pelaksanaan tugas, memastikan pemrosesan dan pengiriman data yang tepat waktu. Otomatisasi ini meningkatkan efisiensi operasional, memungkinkan organisasi untuk merespons dengan cepat terhadap perubahan kebutuhan data dan kebutuhan bisnis terutama untuk kebutuhan analisis data.


Materi ini bertujuan memberikan pemahaman kepada pembaca terkait penggunaan 
apache airflow dalam otomatisasi pengumpulan data, pembersihan data hingga penyimpanan ke database. Adapun setelah mempelajari materi ini pembaca diharapkan dapat memahami dasar-dasar dari cara kerja airflow untuk diimplementasikan ke proyek masing-masing.

## Output

Notebook ini akan mengenalkan prinsip-prinsip dasar dari pengolahan data di apache airflow. Output akhirnya adalah python script (.py) untuk menjalankan apache-airflow untuk otomatisasi data workflow yang akan dijalankan di local.

## Tujuan Pembelajaran

- **Pemrograman Dasar Python**  
  - Environment python
  - Pengenalan dasar-dasar python
  - Manipulasi dan pemrosesan data dengan pandas
- **Pengenalan Airflow**
  - Pengenalan airflow
  - Directed Acrylic Graph
  - *Task* dan *dependencies*
- **Membuat dan Menjalankan Airflow Script**
  - Airflow project
  - Pembuatan DAG script untuk otomatisasi ETL pipeline
  - Menjalankan dan menjadwalkan DAG
  - Monitoring task di Airflow webserver
- **Implementasi di Data Analisis**

![](assets/Data%20Engineering%20Airflow.png)

# Pemrograman Dasar Python

## Mempersiapkan Environment Anaconda

**Membuat virtual environment baru:**

1. Buka Anaconda Prompt

2. Buat virtual environment baru dengan:
    ```
    conda create -n <ENV_NAME> python=<PYTHON_VERSION>
    ```
    Contoh: `conda create -n dss_airflow python=3.10`

3. Aktifkan virtual environment baru dengan:
    ```
    conda activate <ENV_NAME>
    ```
    For example: `conda activate dss_airflow`

4. Install library dan package yang dibutuhkan dengan command.
    ```
    pip install pandas==2.2.2 apache-airflow==2.9.0 openpyxl
    ```

In [41]:
1 + 1

2

## Bekerja dengan Jupyter Notebook

### Markdown Cell dan Code Cell

Terdapat dua tipe cell dalam notebook:

#### Markdown Cell

**Markdown**: untuk menuliskan narasi

Berikut ini adalah cell markdown. Terdapat beberapa hal yang dapat dilakukan, seperti membuat beberapa hal berikut ini: 

1. **Heading**

   Pada bagian ini dapat ditambahkan heading dengan menambahkan hashtag `#`.

   - `#` -> Heading 1
   - `##` -> Heading 2
   - `###` -> Heading 3

2. **Emphasis**

   Ketika ingin mengatur jenis tulisan dengan memberikan karakter yang lebih tegas kita bisa memanfaatkan `*`.

   - *kata* -> untuk mengatur tulisan menjadi Italic
   - **kata** -> untuk mengatur tulisan menjadi Bold
   - ***kata*** -> untuk mengatur tulisan menjadi Italic & Bold
    
3. **Bullets**

   Untuk membuat beberapa point, terdapat beberapa metode yang bisa digunakan.
    
   - Untuk membuat point dalam bentuk angka, bisa menggunakan angka 1.
   - Untuk membuat point dalam bentuk bullets, bisa menggunakan - atau *.

4. **Math Equation**

\begin{equation}
x = \frac{-b \pm \sqrt{b^{2}-4ac}}{2a}
\end{equation}

   💡 Penulisan markdown untuk formula matematis menggunakan LaTeX based

#### Code Cell

**Code**: untuk menuliskan script code

💡 symbol `#` pada cell code berarti adalah sebuah `comment`. `Comment` pada cell code **tidak dijalankan**

**Contoh Code Cell**

In [42]:
# ini adalah cell code
print("Hello 2024!!")

Hello 2024!!


In [43]:
1 + 1

2

Untuk menjalankan kode:

- klik tombol play di sebelah kiri
- shortcut: ctrl + enter

### Command Mode dan Edit Mode

**1️⃣ Command Mode**

- B: Menambahkan cell baru di Bawah (Below)
- A: Menambahkan cell baru di Atas (Above)
- DD: Delete cell
- C: Copy cell
- V: Paste cell
- Y: Mengubah ke `code` cell
- M: Mengubah ke `markdown` cell
- Enter/Double Click: Mengubah command mode menjadi edit mode

**2️⃣ Edit Mode**
- Ctrl + Enter: eksekusi satu cell
- Shift + Enter: eksekusi satu cell kemudian pindah pada cell selanjutnya
- Esc: Mengubah edit mode menjadi command mode

asdfasfd 

## Variabel & *Keywords*

Mari kita telaah konsep paling fundamental dalam bahasa pemrograman: variabel.

Variabel adalah wadah untuk menyimpan sebuah nilai. Simpelnya, variabel adalah nama yang mengacu pada sebuah nilai. Dimana ketika nilai ini ingin kita gunakan, kita dapat memanggilnya dengan nama variabel yang telah kita buat.

In [44]:
# variabel = nilai
dss_name = "Data Science Series: Data Engineering with Airflow"
print(dss_name)

Data Science Series: Data Engineering with Airflow


Sebagai catatan, seperti bahasa pemrograman yang lainnya, Python bersifat **case-sensitive**, sehingga `dss_name` dan `DSS_Name` dimaknai berbeda sehingga akan dianggap variabel yang berbeda pula.

In [45]:
## code here
'dss_name' == 'dss_name'

True

Kode di atas mengembalikan `True` sebagai output. Cobalah untuk membuat variabel baru dan gunakan `True` sebagai namanya. kemudian lihat apa yang akan terjadi.

> SyntaxError: can't assign to keyword

Sebagai catatan, `True`, dan juga lawannya, False termasuk ke dalam daftar kata yang dinamakan **Python Keywords**. Kita tidak dapat menggunakan keyword sebagai nama variabel ataupun sebagai fungsi.

Semua python keyword selain **True**, **False**, dan **None** adalah huruf kecil.

___

**Keywords** adalah kata kunci yang sudah ditetapkan oleh Python sebagai nama yang tidak bisa dipakai baik untuk penamaan fungsi, variabel, dan lainnya. Keyword ditulis dalam lower-case (huruf kecil semua) kecuali keyword `True`, `False`, dan `None`. Sejauh ini (Python 3.10) keyword yang ada pada Python adalah sebagai berikut:

In [46]:
# Cek daftar keyword
import keyword
keyword.kwlist

['False',
 'None',
 'True',
 'and',
 'as',
 'assert',
 'async',
 'await',
 'break',
 'class',
 'continue',
 'def',
 'del',
 'elif',
 'else',
 'except',
 'finally',
 'for',
 'from',
 'global',
 'if',
 'import',
 'in',
 'is',
 'lambda',
 'nonlocal',
 'not',
 'or',
 'pass',
 'raise',
 'return',
 'try',
 'while',
 'with',
 'yield']

In [47]:
# terdapat perbedaan warna untuk keyword
ininama = 'variable 1'

# True = 'variable 2'
# for = 'ini ketiga'

In [48]:
# 1nama = 'ini tidak boleh'
na2ma_1 = 'ini boleh'

In [49]:
nama = 'DSS: Data Engineering'
# print(nama)
nama

'DSS: Data Engineering'

**💡 NOTES**

Syarat dan ketentuan dalam memberikan nama variable pada Python:
- Menggunakan kombinasi dari huruf kapital (A-Z), huruf nomina (a-z), angka (0-9).
- Special character `!, $ , &, dll` tidak dapat digunakan dalam penamaan variabel.
- **Tidak boleh** menggunakan angka di awal.
- Bersifat **case-sensitive** sehingga penamaan variable `algoritma`, `ALGORITMA`, dan `Algoritma` adalah 3 variable yang berbeda
- **Tidak boleh** menggunakan **keyword** pada Python

## Tipe Data

- Tipe data adalah pengelompokkan data berdasarkan jenis data. 
- Pengecekan tipe data bisa menggunakan fungsi `type()`

Berikut beberapa jenis tipe data pada python:

### 1. String

Untuk menyimpan kumpulan karakter disebut tipe data `str`. 

Ada beberapa cara untuk membuat nilai string:
- menggunakan `''` 
- menggunakan `""` 
- menggunakan `'''` atau `"""` 

In [50]:
# contoh menggunakan petik 1
kalimat1 = 'Hari ini kami belajar Python.'
kalimat1

'Hari ini kami belajar Python.'

In [51]:
## cek tipe data
type(kalimat1)

str

### 2. Number

Untuk menyimpan number, python memiliki dua tipe data number yaitu `int` dan `float`.
- `int` (integer) digunakan untuk menyimpan bilangan bulat (yaitu: 1,2,-3)
- `float` digunakan untuk menyimpan bilangan real (yaitu: 0.7, -1.8, -1000.0).

In [52]:
a = 1234
b = 0.672

In [53]:
# Cek tipe data a
type(a)

int

In [54]:
# Cek tipe data b
type(b)

float

**Operasi Angka** \
Operator Aritmatika:
- `+` - Penambahan
- `-` - Pengurangan
- `*` - Perkalian
- `/` - Pembagian
- `//` - Floor Division (pembagian dengan pembulatan ke bawah) 
- `%` - Modulus (sisa bagi)
- `**` - Eksponen (pangkat)

Operator Perbandingan:
- `<` - Lebih kecil dari (yaitu : a < b)
- `<=` - Lebih kecil atau sama dengan (yaitu : a <= b)
- `>` - Lebih besar dari (yaitu: a > b)
- `>=` - Lebih besar atau sama dengan (yaitu: a >= b)
- `==` - Sama dengan (yaitu: a == b)
- `!=` - Tidak Sama dengan (yaitu: a != b)

In [55]:
# Code here
a - b

1233.328

In [56]:
angka_str = '4'
angka_int = 2

# tidak bisa karena berbeda tipe
# angka_str - angka_int

### 3. Boolean

- Boolean menyimpan nilai yang hanya memuat 2 nilai yaitu `True` atau `False`
- Boolean berguna ketika berkaitan dengan kondisi.

In [57]:
x = True 
y = False

In [58]:
# cek tipe data  
type(x)

bool

### 4. List

`list` digunakan untuk menyimpan beberapa nilai dalam python.\
Cara deklarasi variabel `list`: `nama_variable = [nilai1, nilai2, nilai3]`

list dapat menyimpan berbagai tipe data

In [59]:
# nama_variable = [nilai1, nilai2, nilai3]
data = [1, 2, 3, 2, 4, "lima", False]
data

[1, 2, 3, 2, 4, 'lima', False]

**Operasi List**
- `x.append(a)` : tambahkan a ke list dengan nama variabel x
- `x.remove(a)` : hapus a dari list dengan nama variabel x

Operasi lain yang harus diketahui dalam daftar adalah pengindeksan:
- `x[i]` : mengakses elemen ke-i dari x

In [60]:
# menambah nilai dalam list
data.append('nilai baru')
data

[1, 2, 3, 2, 4, 'lima', False, 'nilai baru']

In [61]:
# code here
data[0] # akses nilai pertama

1

In [62]:
data.insert(1, 'ini terselip')
data

[1, 'ini terselip', 2, 3, 2, 4, 'lima', False, 'nilai baru']

Addition: data.insert(posisi, data_baru)

> Note: zero based indexing -> perhitungan python dimulai dari 0

## Struktur Kontrol

Struktur kontrol digunakan untuk mengarahkan aliran eksekusi program.

```python
if kondisi_1:
    task jika kondisi_1 terpenuhi
elif kondisi_2:
    task jika kondisi_2 terpenuhi
elif kondisi_n:
    task jika kondisi_n terpenuhi
else:
    task jika kondisi_1 hingga kondisi_n tidak terpenuhi
```

Kondisi digunakan untuk memeriksa suatu kondisi dan menjalankan blok kode tertentu berdasarkan hasil evaluasi kondisi tersebut.

**Contoh Penggunaan Kondisi:**

In [63]:
x = 4

In [64]:
if x > 5:
    print("x lebih besar dari 5")
elif x == 5:
    print("x sama dengan 5")
else:
    print("x lebih kecil dari 5")

x lebih kecil dari 5


## Fungsi

Function merupakan sekelompok perintah yang digunakan untuk melakukan tugas tertentu. Ketika kita melakukan sesuatu yang berulang dan rumit, alangkah baiknya kita menggunakan fungsi agar tidak ada langkah yang berubah maupun penulisan kode yang salah. Penulisan umum sebuah fungsi yaitu:
```python
def nama_fungsi(parameter):
    perintah
```
Pada syntax umum di atas, `def` merupakan inisiator untuk sebuah fungsi. Sementara hal-hal yang harus kita tentukan yaitu nama dari fungsi, parameter yang akan digunakan di dalamnya, serta perintah atau kode. 

Sebagai contoh, kita akan membuat sebuah fungsi luas segitiga:

In [65]:
# fungsi luas_segitiga
def luas_segitiga(alas, tinggi):
    hasil = (alas * tinggi) / 2
    return(hasil)

In [66]:
# memanggil fungsi
luas_segitiga(alas = 4, tinggi = 5)

10.0

In [67]:
luas_segitiga(7, 4)

14.0

## Perulangan (Looping)

`for loop` digunakan untuk mengulangi/mengiterasi suatu urutan (dapat berupa list atau string).

Contoh `for loop` untuk mengiterasi elemen pada list:

In [68]:
hewan = ["kucing", "kelinci", "hamster", "sapi"]

# for loop
for x in hewan:
  print(x)

kucing
kelinci
hamster
sapi


# Pandas

## Bekerja dengan Dataframe

- `pandas` adalah library yang powerful sebagai tools analisis data dan struktur pada Python. 

- `pandas` mampu mengolah data menjadi mudah karena mempunyai objek bernama **DataFrame**. 

- `pandas` memiliki function yang mampu mengolah dataframe dengan menerapkan berbagai operasi dan teknik seperti join, agregasi, grouping, dan lain sebagainya

> Lebih lengkapnya silahkan kunjungi [official documentation](https://pandas.pydata.org/)

Untuk menggunakan `pandas`, kita perlu import terlebih dahulu library dengan cara berikut ini:

In [69]:
import pandas as pd
# print(pd.__version__)

### Read Data

Untuk membaca data atau file dengan format `.csv` dapat menggunakan method `.read_csv()`.
Bacalah data `2017-01-transactions.csv` yang berada dalam folder `data`

Sintaks:
```python
pandas.read_csv("path/data")
```

In [70]:
# read data
transaction = pd.read_csv("data_input_example/2017-01-transactions.csv")

transaction

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,d,$11.66,01.01.17 00:29:41,c1,Norwalk,Connecticut
352486,T02879544,A00009528,d,$116.56,01.01.17 00:32:09,c2,Atlanta,Georgia
352487,T02356985,A00007774,d,$1023.13,01.01.17 00:32:47,c2,Chicago,Illinois
352488,T02009024,A00006806,d,$375.58,01.01.17 01:12:50,c1,New York City,New York
352489,T00472765,A00001609,d,$20.72,01.01.17 04:39:47,c2,Manchester,New Hampshire
...,...,...,...,...,...,...,...,...
380500,T01012202,A00003459,d,$0.63,31.01.17 23:44:34,c5,Yonkers,New York
380501,T00725372,A00002482,d,$0.63,31.01.17 23:52:26,c5,New York City,New York
380502,T01709574,A00005794,d,$0.63,31.01.17 23:55:21,c2,Los Angeles,California
380503,T01263220,A00004320,d,$250.39,31.01.17 23:56:47,c3,Fall River,Massachusetts


Dataset `transaction` merupakan data yang berasal dari sebuah bank di Amerika Serikat, berisi informasi tentang transaksi yang dilakukan oleh nasabahnya. Setiap baris dalam dataset merepresentasikan transaksi individual oleh nasabah.

**Deskripsi Data**:

- `trans_id`: Identifikasi untuk setiap transaksi.
- `account_id`: Identifikasi untuk setiap akun nasabah.
- `type`: Jenis transaksi sebagai debit (d) atau kredit (c).
- `amount`: Nilai transaksi dalam USD (\$).
- `transaction_date`: Tanggal dan waktu terjadinya transaksi.
- `channel`: Channel tempat transaksi dilakukan.
  - `c1`: Internet Banking
  - `c2`: Mobile Banking
  - `c3`: EDC
  - `c4`: ATM
  - `c5`: QR
- `city`: Kota tempat tinggal nasabah.
- `state_name`: Negara bagian tempat tinggal nasabah.

> Dataframe : tabel yang terdiri dari kolom-kolom (series)

Lakukan pengamatan 5 data teratas dari `transaction` menggunakan `.head()`

In [71]:
# code here
# df.head(n), n : jumlah baris yg ingin ditampilkan
transaction.head(3)

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,d,$11.66,01.01.17 00:29:41,c1,Norwalk,Connecticut
352486,T02879544,A00009528,d,$116.56,01.01.17 00:32:09,c2,Atlanta,Georgia
352487,T02356985,A00007774,d,$1023.13,01.01.17 00:32:47,c2,Chicago,Illinois


## Tipe Data `pandas`

- Dataframe merupakan tabel/data tabular dua dimensi yaitu baris dan kolom.
- Dataframe terdiri dari beberapa **Series** (kolom).
- Dalam satu series harus memiliki tipe data yang sama.
- `pandas` akan menentukan tipe data dari masing-masing Series, tapi hasil dari pandas tidak selalu benar.

Berikut rangkuman tipe data `pandas`:

**Note: Fokus pada kolom `Pandas dtype` dan `Usage`**

| Pandas dtype  | Python type  | Usage                                        |
|---------------|--------------|----------------------------------------------|
| object        | str          | Text or mixed numeric and non-numeric values |
| int64         | int          | Integer numbers                              |
| float64       | float        | Floating point numbers                       |
| bool          | bool         | True/False values                            |
| datetime64[ns]| -            | Date and time values                         |
| timedelta[ns] | -            |  Differences between two datetimes           |
| category      | -            | Finite list of text values                   |

Referensi: [Overview of Pandas Data Types](https://pbpython.com/pandas_dtypes.html)

**Tipe data Pandas:**

- `int64`: Integer (bilangan bulat, tanpa koma)
- `float64`: Bilangan desimal (berkoma)
- `object`: Text (string)
- `category`: Kategorikal 
- `datetime64[ns]`: Data waktu

Karakteristik tipe data `category` :
- Dapat dikelompokkan menjadi beberapa kelompok (category)
- Nilainya berulang

----

Saat kita membaca data dengan `pd.read_csv()`, `pandas` akan mencoba menentukan tipe data dari setiap kolom. Lakukan investigasi awal untuk melihat struktur data terhadap object DataFrame dengan menggunakan method `.info()`.

In [72]:
# Cek tipe data
transaction.info()

<class 'pandas.core.frame.DataFrame'>
Index: 28020 entries, 352485 to 380504
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   trans_id          28020 non-null  object
 1   account_id        28020 non-null  object
 2   type              28020 non-null  object
 3   amount            28020 non-null  object
 4   transaction_date  28020 non-null  object
 5   channel           28020 non-null  object
 6   city              28020 non-null  object
 7   state_name        28020 non-null  object
dtypes: object(8)
memory usage: 1.9+ MB


**💡 NOTES**

Dengan menggunakan method `.info()`, kita dapat memeriksa **informasi** lengkap dari DataFrame kita:

- Dimensi data: jumlah baris dan kolom (`.shape`)
- Nama kolom (`.columns`)
- Tipe data setiap kolom (`.dtypes`)
- Penggunaan memori

In [73]:
transaction.head(2)

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,d,$11.66,01.01.17 00:29:41,c1,Norwalk,Connecticut
352486,T02879544,A00009528,d,$116.56,01.01.17 00:32:09,c2,Atlanta,Georgia


*Kolom manakah yang memiliki format tipe data yang belum sesuai?*

> Jawaban:
- `amount` -> float
- `transaction_date` -> datetime
- `type`, `channel`, `city`, `state_name` -> category.

## Convert Data Types

### Convert Category

Untuk mengubah tipe data pada `pandas`, dapat menggunakan method `astype()`.

**Sintaks**
```python
df['column_name'] = df['column_name'].astype('new_data_types')
```

In [74]:
# code here
# cat_column = ['nilai1', 'nilai2']
cat_column = ['type', 'channel', 'city', 'state_name']

transaction[cat_column] = transaction[cat_column].astype('category')

In [75]:
transaction.info()

<class 'pandas.core.frame.DataFrame'>
Index: 28020 entries, 352485 to 380504
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype   
---  ------            --------------  -----   
 0   trans_id          28020 non-null  object  
 1   account_id        28020 non-null  object  
 2   type              28020 non-null  category
 3   amount            28020 non-null  object  
 4   transaction_date  28020 non-null  object  
 5   channel           28020 non-null  category
 6   city              28020 non-null  category
 7   state_name        28020 non-null  category
dtypes: category(4), object(4)
memory usage: 1.2+ MB


### Convert Datetime

Ada dua cara untuk mengubah menjadi tipe data `datetime64[ns]`:

- Method **`.astype()`**
- Method **`pd.to_datetime()`**

Misalkan kita memiliki kolom yang menyimpan informasi waktu harian di sekitar bulan Juni menggunakan format **mm-dd-yyyy** yang merupakan format penulisan tanggal di AS. Mari kita lihat apa yang akan terjadi ketika kita mengonversi tipe data `transaction_date` menjadi `datetime64[ns]` :

In [76]:
transaction_date = pd.Series(['30.05.19', '31.05.19', '01.06.19', '02.06.19', '03.06.19']) # Tanggal 30 Mei-3 Juni 2019
transaction_date

0    30.05.19
1    31.05.19
2    01.06.19
3    02.06.19
4    03.06.19
dtype: object

#### 1️⃣ Method **`.astype()`**

In [77]:
# code here
transaction_date.astype('datetime64[ns]')

0   2019-05-30
1   2019-05-31
2   2019-01-06
3   2019-02-06
4   2019-03-06
dtype: datetime64[ns]

#### 2️⃣ Fungsi **`pd.to_datetime()`**

`pd.to_datetime()` lebih fleksibel dibanding `astype('datetime[64ns]')` karena memiliki parameter-parameter seperti `dayfirst` dan `format`.

Syntax:

`pd.to_datetime(df['kolom_date'], dayfirst = False, format)`

- `dayfirst`: Jika urutan pertama dari date adalah tanggal
- `format`: Format datetime awal data, untuk mengenali letak hari, bulan, tahun dsb. Menggunakan format `strftime`

**🌐 Referensi** : https://strftime.org/

In [78]:
# code here
pd.to_datetime(transaction_date, dayfirst = True)

  pd.to_datetime(transaction_date, dayfirst = True)


0   2019-05-30
1   2019-05-31
2   2019-06-01
3   2019-06-02
4   2019-06-03
dtype: datetime64[ns]

In [79]:
# tanggal.bulan.tahun
# tanggalbulantahun = 300519
# tahun-bulan-tanggal = 19-05-05
pd.to_datetime(transaction_date, format = '%d.%m.%y')

0   2019-05-30
1   2019-05-31
2   2019-06-01
3   2019-06-02
4   2019-06-03
dtype: datetime64[ns]

Untuk format berbeda-beda (tapi urutannya sama) kita bisa gunakan `format = 'mixed'`

In [80]:
# pd.set_option(locale = 'id_ID')

**✏️ Notes:**

Kapan waktu yang tepat untuk menggunakan 2 cara tersebut?

- `.astype('datetime64[ns]')`: Jika data memiliki format **bulan-tanggal-tahun (mdy)**
- `pd.to_datetime()`: Jika formatnya tidak dalam **bulan-tanggal-tahun (mdy)**
___

Mari merubah kolom `transaction_date` ke bentuk datetime

In [81]:
transaction.head()

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,d,$11.66,01.01.17 00:29:41,c1,Norwalk,Connecticut
352486,T02879544,A00009528,d,$116.56,01.01.17 00:32:09,c2,Atlanta,Georgia
352487,T02356985,A00007774,d,$1023.13,01.01.17 00:32:47,c2,Chicago,Illinois
352488,T02009024,A00006806,d,$375.58,01.01.17 01:12:50,c1,New York City,New York
352489,T00472765,A00001609,d,$20.72,01.01.17 04:39:47,c2,Manchester,New Hampshire


Fungsi yang digunakan untuk merubah `transaction_date`?

> - fungsi: pd.to_datetime()

> - format: %d.%m.%y %X
> %X = %H:%M:S = 23:59:59 (waktu)

In [82]:
# change data type
# trasaction.transaction_date
transaction['transaction_date'] = pd.to_datetime(transaction['transaction_date'], format = '%d.%m.%y %X')

transaction.info()

<class 'pandas.core.frame.DataFrame'>
Index: 28020 entries, 352485 to 380504
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   trans_id          28020 non-null  object        
 1   account_id        28020 non-null  object        
 2   type              28020 non-null  category      
 3   amount            28020 non-null  object        
 4   transaction_date  28020 non-null  datetime64[ns]
 5   channel           28020 non-null  category      
 6   city              28020 non-null  category      
 7   state_name        28020 non-null  category      
dtypes: category(4), datetime64[ns](1), object(3)
memory usage: 1.2+ MB


## Conditional Subsetting

Kita dapat melakukan subsetting berdasarkan kondisi tertentu. Misalkan pada dataframe `transaction`, kita ingin mengambil beberapa data dengan kondisi sebagai berikut:

- Transaksi yang terjadi di supermarket: `transaction['format'] == 'supermarket'`
- Transaksi dengan produk yang harganya lebih dari sama dengan 200000: `transaction['unit_price'] >= 200000`

Syntax penulisan untuk conditional subsetting adalah:

```python
df[df['column_name'] <comparison_operator> <value>]
```

Contoh `comparison_operator` adalah seperti `==`, `!=`, `>`, `>=`, `<`, `<=`.

Operator Perbandingan:
- `<` - Lebih kecil dari (yaitu : a < b)
- `<=` - Lebih kecil atau sama dengan (yaitu : a <= b)
- `>` - Lebih besar dari (yaitu: a > b)
- `>=` - Lebih besar atau sama dengan (yaitu: a >= b)
- `==` - Sama dengan (yaitu: a == b)
- `!=` - Tidak Sama dengan (yaitu: a != b)

In [83]:
transaction.head(2)

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,d,$11.66,2017-01-01 00:29:41,c1,Norwalk,Connecticut
352486,T02879544,A00009528,d,$116.56,2017-01-01 00:32:09,c2,Atlanta,Georgia


🔻 (1) Tampilkan transaksi dengan `type` Debit!

In [84]:
# code here
# kondisi: transaction['type'] == 'd'
# transaction[ kondisi ]
transaction[ transaction['type'] == 'd']

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,d,$11.66,2017-01-01 00:29:41,c1,Norwalk,Connecticut
352486,T02879544,A00009528,d,$116.56,2017-01-01 00:32:09,c2,Atlanta,Georgia
352487,T02356985,A00007774,d,$1023.13,2017-01-01 00:32:47,c2,Chicago,Illinois
352488,T02009024,A00006806,d,$375.58,2017-01-01 01:12:50,c1,New York City,New York
352489,T00472765,A00001609,d,$20.72,2017-01-01 04:39:47,c2,Manchester,New Hampshire
...,...,...,...,...,...,...,...,...
380500,T01012202,A00003459,d,$0.63,2017-01-31 23:44:34,c5,Yonkers,New York
380501,T00725372,A00002482,d,$0.63,2017-01-31 23:52:26,c5,New York City,New York
380502,T01709574,A00005794,d,$0.63,2017-01-31 23:55:21,c2,Los Angeles,California
380503,T01263220,A00004320,d,$250.39,2017-01-31 23:56:47,c3,Fall River,Massachusetts


🔻 (2) Tampilkan transaksi yang memiliki `amount` lebih besar sama dengan $3000

In [85]:
# code here
# transaction[ transaction['amount'] > 3000 ]
# tidak bisa karena amount masih berbentuk object

In [86]:
# tidak bisa langsung diubah: harus menghapus $
# transaction['amount'].astype('float64')

*Notes: `amount` masih berbentuk object, sehingga harus dibersihkan agar dapat berbentuk numerik*

### Multiple Condition

Kita juga dapat menggunakan operator `&` (AND) dan `|` (OR) untuk melakukan subsetting lebih dari 1 kondisi. Misalnya kita ingin melihat data penjualan dari seorang pegawai bernama Andi yang jumlahnya lebih dari 5000, maka kita dapat menggunakan syntax:

```python
sales[ (sales['salesperson'] == 'Andi') & (sales['amount'] > 5000) ]
```

Untuk subsetting dengan kondisi lebih dari 1, setiap kondisi diletakkan **di dalam tanda kurung `()`** atau bisa **ditulis dengan variabel** seperti syntax berikut:

```python
kondisi_1 = sales['salesperson'] == 'Andi'
kondisi_2 = sales['amount'] > 5000

df[ kondisi_1 & kondisi_2 ]
```

**Poin:**
- Operator `&` (AND): harus semua kondisi terpenuhi dalam satu baris agar muncul
- Operator `|` (OR): salah satu kondisi saja sudah terpenuhi maka baris tsb muncul

🔻 (1) Tampilkan transaksi yang dilakukan melalui `QR` untuk waktu transaksi setelah `2017.01.28 19:00:00`

In [87]:
# code here
kondisi1 = transaction['channel'] == 'c5'
kondisi2 = transaction['transaction_date'] > '2017.01.28 19:00:00'

transaction[ (kondisi1) & (kondisi2) ]

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
376305,T01462994,A00004981,d,$88.07,2017-01-28 19:19:15,c5,Houston,Texas
376315,T01252369,A00004283,d,$112.24,2017-01-28 19:31:00,c5,Charleston,South Carolina
376319,T02364014,A00007795,d,$323.77,2017-01-28 19:38:55,c5,Phoenix,Arizona
376324,T00257989,A00000884,d,$7.77,2017-01-28 19:42:28,c5,Jacksonville,Florida
376358,T00375398,A00001277,d,$148.5,2017-01-28 20:14:53,c5,Des Moines,Iowa
...,...,...,...,...,...,...,...,...
380492,T03242429,A00010762,d,$0.63,2017-01-31 23:37:58,c5,New York City,New York
380498,T00428692,A00001456,d,$0.63,2017-01-31 23:43:56,c5,Boston,Massachusetts
380500,T01012202,A00003459,d,$0.63,2017-01-31 23:44:34,c5,Yonkers,New York
380501,T00725372,A00002482,d,$0.63,2017-01-31 23:52:26,c5,New York City,New York


# Data Cleansing

Jika diperhatikan, kolom `type` dan `channel` masih berbentuk kode, begitupun dengan `amount` masih belum berbentuk numerik karena masih terdapat symbol `$`. Tujuan selanjutnya, yaitu: 

- Merubah kode `type` dan `channel` menjadi nilai yang sebenarnya.
- Menghapus symbol `$` di `amount` dan merubah menjadi `float64`

Tahapan ini termasuk ke dalam pembersihan data atau biasa disebut Data Cleansing.

## Merubah Kode Type dan Channel

Mari kita lihat deskripsi dari `channel`:

`channel`: Channel tempat transaksi dilakukan.
  - `c1`: Internet Banking
  - `c2`: Mobile Banking
  - `c3`: EDC
  - `c4`: ATM
  - `c5`: QR

Tujuan: Merubah kode menjadi detail channel. Untuk melakukan hal ini, dapat menggunakan `.replace()`

Syntax:
```python
data['kolom'].replace({'nilai_lama1': 'nilai baru 1',
                        'nilai_lama2': 'nilai baru 2',
                        'nilai_laman': 'nilai baru n'})
```

In [88]:
# membuat dictionary
channel_revert = {'c1': 'Internet Banking', 
                  'c2': 'Mobile Banking',
                  'c3': 'EDC',
                  'c4': 'ATM',
                  'c5': 'QR'}

In [89]:
# merubah ke nilai baru
transaction['channel'] = transaction['channel'].replace(channel_revert)

transaction.head(3)

  transaction['channel'] = transaction['channel'].replace(channel_revert)


Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,d,$11.66,2017-01-01 00:29:41,Internet Banking,Norwalk,Connecticut
352486,T02879544,A00009528,d,$116.56,2017-01-01 00:32:09,Mobile Banking,Atlanta,Georgia
352487,T02356985,A00007774,d,$1023.13,2017-01-01 00:32:47,Mobile Banking,Chicago,Illinois


- .replace(): Kita isi manual
- df1.merge(df2, on='key'): Dari dataframe lain.

Silahkan lakukan perubahan untuk kolom `type` menggunakan `.replace()` dengan ketentuan berikut:

- `d` -> `Debit`
- `c` -> `Credit`

In [90]:
# code here
transaction['type'] = transaction['type'].replace({'d': 'Debit',
                                                  'c': 'Credit'})

transaction.head()

  transaction['type'] = transaction['type'].replace({'d': 'Debit',


Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,Debit,$11.66,2017-01-01 00:29:41,Internet Banking,Norwalk,Connecticut
352486,T02879544,A00009528,Debit,$116.56,2017-01-01 00:32:09,Mobile Banking,Atlanta,Georgia
352487,T02356985,A00007774,Debit,$1023.13,2017-01-01 00:32:47,Mobile Banking,Chicago,Illinois
352488,T02009024,A00006806,Debit,$375.58,2017-01-01 01:12:50,Internet Banking,New York City,New York
352489,T00472765,A00001609,Debit,$20.72,2017-01-01 04:39:47,Mobile Banking,Manchester,New Hampshire


## Menghapus $ dan Merubah Amount ke Float64

Mirip seperti `.replace()` untuk series, terdapat juga `.str.replace()` untuk mengganti beberapa karakter tertentu pada nilai.

Syntax: `df['nama_kolom'].str.replace('nilai_lama', 'nilai_baru')`



In [91]:
transaction_amount = pd.Series(['$21', '$94.12', '$12.65'])
transaction_amount

0       $21
1    $94.12
2    $12.65
dtype: object

In [92]:
# mengganti $ -> dollar
transaction_amount.str.replace('$', 'Dollar')

0       Dollar21
1    Dollar94.12
2    Dollar12.65
dtype: object

In [93]:
# menghilangkan $
transaction_amount.str.replace('$', '').astype('float64')

0    21.00
1    94.12
2    12.65
dtype: float64

In [94]:
# code here
transaction['amount'] = transaction['amount'].str.replace('$', '').astype('float64')

transaction.info()

<class 'pandas.core.frame.DataFrame'>
Index: 28020 entries, 352485 to 380504
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   trans_id          28020 non-null  object        
 1   account_id        28020 non-null  object        
 2   type              28020 non-null  category      
 3   amount            28020 non-null  float64       
 4   transaction_date  28020 non-null  datetime64[ns]
 5   channel           28020 non-null  category      
 6   city              28020 non-null  category      
 7   state_name        28020 non-null  category      
dtypes: category(4), datetime64[ns](1), float64(1), object(2)
memory usage: 1.2+ MB


## Membuat Fungsi Data Cleansing

Agar proses pembersihan data dapat digunakan kembali dengan mudah, mari kita masukkan ke dalam sebuah fungsi `clean_df` yang berisi semua tahapan permbersihan data dengan inputan data kotor (sebelum dibersihkan) dan menghasilkan dataframe yang telah bersih.

In [95]:
def clean_df(df):
    df['transaction_date'] = pd.to_datetime(df['transaction_date'], format = "%d.%m.%y %X")
    
    # channel
    channel_revert = {'c1': 'Internet Banking',
           'c2': 'Mobile Banking',
           'c3': 'EDC',
           'c4': 'ATM',
           'c5': 'QR'}
    df['channel'] = df['channel'].replace(channel_revert)

    # type
    type_revert = {'d': 'Debit',
               'c': 'Credit'}
    df['type'] = df['type'].replace(type_revert)

    # amount
    df['amount'] = df['amount'].str.replace('$', '').astype('float64')

    cat_column = ['type', 'channel', 'city', 'state_name']
    df[cat_column] = df[cat_column].astype('category')

    return df

Mari kita coba terapkan fungsi `clean_df` kita ke dataframe Februari 2017 (`2017-02-transactions.csv`)

In [96]:
# code here
trx17_2 = pd.read_csv('data_input_example/2017-02-transactions.csv')

trx17_2.head(3)

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
380505,T00673531,A00002301,d,$103.61,01.02.17 04:02:32,c2,Virginia Beach,Virginia
380506,T00155812,A00000521,d,$440.33,01.02.17 04:41:10,c2,Boston,Massachusetts
380507,T00204579,A00000694,d,$120.88,01.02.17 06:37:23,c2,Danbury,Connecticut


Bersihkan `trx17_2` menggunakan fungsi `clean_df(df)` kemudian simpan ke dalam `trx_17_2_clean`.

In [97]:
# code here
trx17_2_clean = clean_df(trx17_2)

trx17_2_clean.info()

<class 'pandas.core.frame.DataFrame'>
Index: 14674 entries, 380505 to 395178
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   trans_id          14674 non-null  object        
 1   account_id        14674 non-null  object        
 2   type              14674 non-null  category      
 3   amount            14674 non-null  float64       
 4   transaction_date  14674 non-null  datetime64[ns]
 5   channel           14674 non-null  category      
 6   city              14674 non-null  category      
 7   state_name        14674 non-null  category      
dtypes: category(4), datetime64[ns](1), float64(1), object(2)
memory usage: 635.9+ KB


In [98]:
trx17_2_clean.head()

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
380505,T00673531,A00002301,Debit,103.61,2017-02-01 04:02:32,Mobile Banking,Virginia Beach,Virginia
380506,T00155812,A00000521,Debit,440.33,2017-02-01 04:41:10,Mobile Banking,Boston,Massachusetts
380507,T00204579,A00000694,Debit,120.88,2017-02-01 06:37:23,Mobile Banking,Danbury,Connecticut
380508,T00464540,A00001580,Debit,353.99,2017-02-01 06:43:26,ATM,Lawrence,Massachusetts
380509,T01197483,A00004099,Debit,38.85,2017-02-01 07:32:10,EDC,Houston,Texas


# Memasukkan Data ke Database

## Pengenalan SQLite

SQLite adalah salah satu basis data yang paling populer dan mudah digunakan. Bersifat open-source tanpa konfigurasi tambahan dan tidak memerlukan instlasi.

*Why SQLite:*
- SQLite bersifat open-source, tidak memerlukan lisensi apapun setelah instalasi.
- SQLite *serverless*, tidak memerlukan proses / sistem yang berbeda untuk beroperasi.
- SQLIte dapat digunakan lintas platform/OS seperti Windows, Linux, MacOS dsb.

Alasan-alasan ini, kita menggunakan SQLite dalam materi DSS Airflow karena mudah dikonfigurasi dan tidak memerlukan instalasi apapun. 

*Note: Kita akan mempelajari tujuan dan flow utama menghubungkan database dengan airflow, jika nantinya menggunakan DBMS lain seperti mysql, postgre atau DBMS lainnya, yang perlu dilakukan tinggal melakukan konfigurasi tambahan ke python sesuai masing-masing DBMS yang diperlukan.*

## Menghubungkan SQLite di Python dan Airflow

Untuk membuat database SQLite dapat dilakukan di python dengan menggunakan library bawaan `sqlite3` (tidak perlu instalasi).

Langkah-langkah untuk membuat database di python:

- Impor library `sqlite3`
- Buat koneksi python ke database SQLite dengan
  ```python
  conn = sqlite3.connect(“path database”)
  ```

Jika database sudah ada, maka python akan terhubung dengan database yang sudah ada.

Contoh untuk membuat database SQLite baru di Python

In [99]:
import sqlite3

In [100]:
db_filepath = 'db/trial.db'
conn = sqlite3.connect(db_filepath)

Kode di atas akan membuat database bernama `trial.db` (jika sebelumnya belum ada) di dalam folder `db`.

## Menambahkan Dataframe ke Database

`pandas` telah menyediakan metode untuk menambahkan dataframe secara langsung ke dalam database. Untuk memasukkan dataframe ke dalam database bisa menggunakan `df.to_sql()`. Input yang dibutuhkan adalah nama tabel target dan dataframe itu sendiri, input lainnya dapat disesuaikan dengan kebutuhan.

```python
df.to_sql(name, con, if_exist, index=True)
```

**Parameter:**

- `nama`: Nama tabel SQL
- `con`: Koneksi ke database
- `if_exist`: Yang dilakukan jika tabel sudah ada.
  - `fail`: Menampilkan ValueError
  - `replace`: Menghapus tabel sebelum memasukkan nilai baru
  - `append`: Menambahkan nilai baru ke tabel yang sudah ada.
- `index`: (True/False) Menulis indeks Dataframe sebagai kolom.

Sebagai contoh, kita mempunyai dataframe di bawah dan ingin ditambahkan ke table `users` di dalam database `trial.db`.

In [101]:
df = pd.DataFrame({'name' : ['User 1', 'User 2', 'User 3']})
df

Unnamed: 0,name
0,User 1
1,User 2
2,User 3


In [102]:
# insert into users table
df.to_sql(name='users', con=conn, if_exists='append')

3

Proses ini akan menampilkan jumlah baris yang berhasil ditambahkan.

## Melihat Isi Table

Terdapat 2 cara untuk menampilkan data kita dalam tabel database.

1. Menggunakan `pandas`: Menggunakan pandas untuk memanggil semua data dari tabel tertentu
2. Menggunakan SQLite Viewer: Menggunakan ekstensi `SQLite` dari VSCode untuk melihat data kita dalam database.

### 1. Menggunakan `Pandas`

Untuk mengambil (biasa disebut `fetch`) data dari database, kita dapat menggunakan objek `conn` yang telah kita buat untuk menghubungkan python dengan database.

Method untuk mengambil data menggunakan pandas dari database menggunakan  `pd.read_sql_query(sql, con)` dengan memasukkan query dan objek koneksi sebagai input.

Syntax: `pd.read_sql_query(sql, con)`

**Parameter:** Parameter
- `sql`: Kueri SQL
- `con`: Objek koneksi

Mari kita ambil semua data dalam tabel `users` di basis data `trial.db`.

In [103]:
pd.read_sql_query(sql = "SELECT * FROM USERS",
                con = conn)

Unnamed: 0,index,name
0,0,User 1
1,1,User 2
2,2,User 3
3,0,User 1
4,1,User 2
5,2,User 3


#### 2. SQLite Viewer

SQLite Viewer adalah ekstensi di VSCode yang memungkinkan untuk membuka dan melihat isi dari database SQLite. Cara penggunaannya sebagai berikut:

1. Install Ekstensi SQLite Viewer
   Pertama, carai "SQLite Viewer" di tab Extensions kemudian clik "install"
   ![](assets/sqlite_viewer.png)

2. Cari dan double klik db yang ingin dibuka, dalam hal ini kita ingin buka `db/trial.db`
   
3. Akan terbuka window baru yang menunjukkan isi dari database.
   ![](assets/trialdb.png)

## Menambahkan Kolom Transaksi ke Database

Setelah kita mengetahui cara menambahkan data ke database. Selanjutnya kita akan menambahkan data `transaction` kita ke dalam SQLite.

Pertama, mari kita melihat data `transaction` kita.

In [104]:
transaction.head()

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
352485,T00630999,A00002154,Debit,11.66,2017-01-01 00:29:41,Internet Banking,Norwalk,Connecticut
352486,T02879544,A00009528,Debit,116.56,2017-01-01 00:32:09,Mobile Banking,Atlanta,Georgia
352487,T02356985,A00007774,Debit,1023.13,2017-01-01 00:32:47,Mobile Banking,Chicago,Illinois
352488,T02009024,A00006806,Debit,375.58,2017-01-01 01:12:50,Internet Banking,New York City,New York
352489,T00472765,A00001609,Debit,20.72,2017-01-01 04:39:47,Mobile Banking,Manchester,New Hampshire


Selanjutnya, kita perlu membuat database bernama `transactions.db` di dalam folder `db` (`db/transactions.db`).

In [105]:
database = 'db/transactions.db'
conn = sqlite3.connect(database)

Pada kasus ini, kita ingin menambahkan `transaction` ke dalam tabel `transactions` (pakai s) di `db/airflow.db`.

Untuk melakukan ini, kita akan menggunakan lagi `.to_sql()`. Dengan ketentuan:

- nama table = 'transactions'
- if_exists = 'append'
- index = False

In [106]:
# code here
transaction.to_sql(name = 'transactions',
                   if_exists= 'append',
                   con = conn,
                   index = False)

28020

Mari melihat data yang telah kita masukkan ke dalam tabel `transactions`.

In [107]:
# code here
pd.read_sql_query("SELECT * FROM TRANSACTIONS", conn)

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
0,T00630999,A00002154,Debit,11.66,2017-01-01 00:29:41,Internet Banking,Norwalk,Connecticut
1,T02879544,A00009528,Debit,116.56,2017-01-01 00:32:09,Mobile Banking,Atlanta,Georgia
2,T02356985,A00007774,Debit,1023.13,2017-01-01 00:32:47,Mobile Banking,Chicago,Illinois
3,T02009024,A00006806,Debit,375.58,2017-01-01 01:12:50,Internet Banking,New York City,New York
4,T00472765,A00001609,Debit,20.72,2017-01-01 04:39:47,Mobile Banking,Manchester,New Hampshire
...,...,...,...,...,...,...,...,...
70709,T01012202,A00003459,Debit,0.63,2017-01-31 23:44:34,QR,Yonkers,New York
70710,T00725372,A00002482,Debit,0.63,2017-01-31 23:52:26,QR,New York City,New York
70711,T01709574,A00005794,Debit,0.63,2017-01-31 23:55:21,Mobile Banking,Los Angeles,California
70712,T01263220,A00004320,Debit,250.39,2017-01-31 23:56:47,EDC,Fall River,Massachusetts


Selanjutnya, mari kita buatkan fungsi untuk memudahkan dataframe-dataframe lainnya dimasukkan ke dalam SQLite.

In [108]:
def df_to_db(df):
    database = 'db/transactions.db'
    conn = sqlite3.connect(database)

    df.to_sql(name = 'transactions',
              con = conn,
              if_exists = 'append',
              index = False)

Silahkan tambahkan `trx17_2_clean` ke dalam `db/transactions.db` menggunakan fungsi `df_to_db`.

In [109]:
# lakukan jg untuk trx17_2_clean
df_to_db(trx17_2_clean)

In [110]:
pd.read_sql_query("SELECT * FROM transactions", conn)

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
0,T00630999,A00002154,Debit,11.66,2017-01-01 00:29:41,Internet Banking,Norwalk,Connecticut
1,T02879544,A00009528,Debit,116.56,2017-01-01 00:32:09,Mobile Banking,Atlanta,Georgia
2,T02356985,A00007774,Debit,1023.13,2017-01-01 00:32:47,Mobile Banking,Chicago,Illinois
3,T02009024,A00006806,Debit,375.58,2017-01-01 01:12:50,Internet Banking,New York City,New York
4,T00472765,A00001609,Debit,20.72,2017-01-01 04:39:47,Mobile Banking,Manchester,New Hampshire
...,...,...,...,...,...,...,...,...
85383,T00064957,A00000216,Debit,0.63,2017-02-28 23:39:54,QR,Bridgeport,Connecticut
85384,T00165488,A00000553,Debit,0.63,2017-02-28 23:43:12,Mobile Banking,Honolulu,Hawaii
85385,T00043348,A00000145,Debit,0.63,2017-02-28 23:46:35,QR,Des Moines,Iowa
85386,T01113171,A00003802,Debit,0.63,2017-02-28 23:49:35,QR,Portland,Maine


# Iterasi Keseluruhan File

Jika kita melihat isi dari `data_input_example/`. Maka masih terdapat banyak file csv yang belum terbaca, dibersihkan dan dimasukkan ke dalam database. Akan sangat memakan waktu jika kita membaca satu-satu file ini. Maka dari itu kita akan melakukan automasi untuk membaca file ini di dalam folder `data_input_example/` secara sekaligus.

## Glob - Mendapatkan keseluruhan nama file

Untuk membaca keseluruhan file di dalam suatu folder, kita butuh untuk mendapatkan keseluruhan nama file di dalam folder tersebut. Kita dapat menggunakan `glob.glob('path')` untuk melakukan hal tersebut:

Syntax:
```python
glob.glob('path folder/*.csv')
```

*Notes:*

- `*.csv`: Membaca keseluruhan nama file yang berekstensi `.csv`
- **Output:** Seluruh list nama file di dalam folder

In [111]:
import glob

files = glob.glob('data_input_example/*.csv')

files

['data_input_example\\2017-01-transactions.csv',
 'data_input_example\\2017-02-transactions.csv',
 'data_input_example\\2017-03-transactions.csv',
 'data_input_example\\2017-04-transactions.csv',
 'data_input_example\\2017-05-transactions.csv',
 'data_input_example\\2017-06-transactions.csv',
 'data_input_example\\2017-07-transactions.csv',
 'data_input_example\\2017-08-transactions.csv',
 'data_input_example\\2017-09-transactions.csv',
 'data_input_example\\2017-10-transactions.csv',
 'data_input_example\\2017-11-transactions.csv',
 'data_input_example\\2017-12-transactions.csv']

Jika kita perhatikan, maka `glob.glob()` akan mengumpulkan keseluruhan nama file berekstensi `csv` di dalam sebuah list. Mari kita coba untuk membaca dataframe dengan `pd.read_csv` untuk file pertama `files[0]`

In [112]:
# membaca files[0] sebagai dataframe
pd.read_csv(files[2])

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
395179,T00948378,A00003232,d,$30.22,01.03.17 02:31:57,c3,Jackson,Mississippi
395180,T00209834,A00000712,d,$611.42,01.03.17 04:06:46,c4,Philadelphia,Pennsylvania
395181,T00319785,A00001092,d,$185.63,01.03.17 07:01:40,c3,Manchester,New Hampshire
395182,T01437124,A00004894,c,$276.29,01.03.17 07:04:55,c3,Las Vegas,Nevada
395183,T00528344,A00001802,d,$34.54,01.03.17 07:17:04,c4,Rochester,New York
...,...,...,...,...,...,...,...,...
411289,T00493586,A00001683,d,$379.9,31.03.17 23:53:02,c1,Syracuse,New York
411290,T00323195,A00001104,d,$341.04,31.03.17 23:53:54,c4,Charleston,South Carolina
411291,T00944603,A00003221,d,$0.63,31.03.17 23:54:27,c5,Indianapolis,Indiana
411292,T02797211,A00009263,d,$211.53,31.03.17 23:54:51,c3,Cranston,Rhode Island


Jika kita lihat, kita telah berhasil untuk membaca file pertama. Mari kita iterasi keseluruhan nama file menggunakan `for loop`.

In [113]:
for file in files:
    print(f"file: {file}")

file: data_input_example\2017-01-transactions.csv
file: data_input_example\2017-02-transactions.csv
file: data_input_example\2017-03-transactions.csv
file: data_input_example\2017-04-transactions.csv
file: data_input_example\2017-05-transactions.csv
file: data_input_example\2017-06-transactions.csv
file: data_input_example\2017-07-transactions.csv
file: data_input_example\2017-08-transactions.csv
file: data_input_example\2017-09-transactions.csv
file: data_input_example\2017-10-transactions.csv
file: data_input_example\2017-11-transactions.csv
file: data_input_example\2017-12-transactions.csv


## Membaca Keseluruhan Dataframe

Untuk menyimpan dataframe yang terbaca, kita dapat menyimpannnya ke dalam list `df_list`. `df_list` pertama adalah dataframe pertama, begitupun kedua dan seterusnya.

In [114]:
# membuat list kosong
df_list = []

for file in files:
    # membaca dataframe
    trx = pd.read_csv(file)
    # membersihkan dataframe
    trx_clean = clean_df(trx)
    
    # memasukkan dataframe bersih ke dalam df_list
    df_list.append(trx_clean)

In [115]:
# melihat shape dan juga periode bulan dari setiap dataframe di dalam `df_list`
for df in df_list:
    print(df.shape)
    print(df['transaction_date'].dt.to_period('M').unique()[0])

(28020, 8)
2017-01
(14674, 8)
2017-02
(16115, 8)
2017-03
(16424, 8)
2017-04
(16852, 8)
2017-05
(18945, 8)
2017-06
(17291, 8)
2017-07
(17819, 8)
2017-08
(18234, 8)
2017-09
(18649, 8)
2017-10
(18609, 8)
2017-11
(22740, 8)
2017-12


## Melakukan Filtering

Jika diperhatikan, dataframe di dalam `df_list` tetap memasukkan periode `2017-01` dan `2017-02` walaupun dataframe tersebut telah dimasukkan ke dalam `SQLite`. 

Tujuan kita adalah hanya memasukkan dataframe yang belum ada di database agar tidak terdapat duplikasi di dalam database. Untuk melakukan hal tersebut secara otomatis, kita akan melakukan tahapan berikut:

- Mengambil waktu (`transaction_date`) paling terupdate dari database `transactions.db`, masukkan ke dalam objek `last_timestamp`.
- Filter dataframe dengan mengambil dataframe yang waktunya lebih terbaru dari `last_timestamp`.

In [116]:
# 1. Mengambil waktu paling terupdate dari database
last_update = pd.read_sql_query("""
                  SELECT transaction_date 
                  FROM transactions
                  ORDER BY transaction_date DESC
                  LIMIT 1
                  """, con = conn)

last_timestamp = last_update['transaction_date'].to_string(index=False)

In [117]:
last_timestamp

'2017-02-28 23:54:37'

In [121]:
# melakukan filtering untuk setiap database
df_try = df_list[2]

# melakukan conditional subsetting
df_try[df_try['transaction_date'] > last_timestamp]

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
395179,T00948378,A00003232,Debit,30.22,2017-03-01 02:31:57,EDC,Jackson,Mississippi
395180,T00209834,A00000712,Debit,611.42,2017-03-01 04:06:46,ATM,Philadelphia,Pennsylvania
395181,T00319785,A00001092,Debit,185.63,2017-03-01 07:01:40,EDC,Manchester,New Hampshire
395182,T01437124,A00004894,Credit,276.29,2017-03-01 07:04:55,EDC,Las Vegas,Nevada
395183,T00528344,A00001802,Debit,34.54,2017-03-01 07:17:04,ATM,Rochester,New York
...,...,...,...,...,...,...,...,...
411289,T00493586,A00001683,Debit,379.90,2017-03-31 23:53:02,Internet Banking,Syracuse,New York
411290,T00323195,A00001104,Debit,341.04,2017-03-31 23:53:54,ATM,Charleston,South Carolina
411291,T00944603,A00003221,Debit,0.63,2017-03-31 23:54:27,QR,Indianapolis,Indiana
411292,T02797211,A00009263,Debit,211.53,2017-03-31 23:54:51,EDC,Cranston,Rhode Island


Mari kita tambahkan tahapan filtering ke dalam iterasi pembacaan file kita, dan menggunakan fungsi `clean_df()` untuk membersihkan dataframe yang sebelumnya telah kita buat.

In [122]:
df_list = []

for file in files:
    # mengambil transaksi terkini dari database
    last_update = pd.read_sql_query("""
                  SELECT transaction_date 
                  FROM transactions
                  ORDER BY transaction_date DESC
                  LIMIT 1
                  """, con = conn)
    last_timestamp = last_update['transaction_date'].to_string(index=False)

    # membaca dataframe
    trx = pd.read_csv(file)
    # membersihkan dataframe
    trx_clean = clean_df(trx)

    # filtering dataframe
    trx_clean_update = trx_clean[trx_clean['transaction_date'] > last_timestamp]
    
    # jika dataframe terisi, maka masukkan ke dalam df_list
    if trx_clean_update.shape[0] != 0:
        df_list.append(trx_clean_update)

In [123]:
for df in df_list:
    print(df.shape)
    print(df['transaction_date'].dt.to_period('M').unique()[0])

(16115, 8)
2017-03
(16424, 8)
2017-04
(16852, 8)
2017-05
(18945, 8)
2017-06
(17291, 8)
2017-07
(17819, 8)
2017-08
(18234, 8)
2017-09
(18649, 8)
2017-10
(18609, 8)
2017-11
(22740, 8)
2017-12


Terlihat bahwa setelah dilakukan filtering. `df_list` hanya menyimpan dataframe-dataframe yang belum dimasukkan ke dalam database. Dataframe-dataframe inilah yang nantinya akan dimasukkan ke dalam dataframe.

Sebelum memasukkan ke dalam dataframe, mari kita membuat fungsi `fetch_clean` yang bertugas untuk membacara keseluruhan file `.csv` yang belum dimasukkan ke dalam database.

In [124]:
def fetch_clean():
    files = glob.glob("data_input_example/*.csv")

    for file in files:
        # mengambil transaksi terkini dari database
        last_update = pd.read_sql_query("""
                    SELECT transaction_date 
                    FROM transactions
                    ORDER BY transaction_date DESC
                    LIMIT 1
                    """, con = conn)
        last_timestamp = last_update['transaction_date'].to_string(index=False)

        # membaca dataframe
        trx = pd.read_csv(file)
        # membersihkan dataframe
        trx_clean = clean_df(trx)

        # filtering dataframe
        trx_clean_update = trx_clean[trx_clean['transaction_date'] > last_timestamp]
        
        # jika dataframe terisi, maka masukkan ke dalam df_list
        if trx_clean_update.shape[0] != 0:
            df_list.append(trx_clean_update)
    
    return df_list

Selanjutnya, kita akan mengupdate fungsi `df_to_db` agar dapat menerima list yang berisi dataframe (bukan hanya 1 dataframe).

In [128]:
def df_to_db(df_list):
    database = 'db/transactions.db'
    conn = sqlite3.connect(database)

    # iterasi dataframe
    for df in df_list:
        df.to_sql(name = 'transactions',
                con = conn,
                if_exists = 'append',
                index = False)
        
        periode = df['transaction_date'].dt.to_period('M').unique()[0]
        print(f'Berhasil Menambahkan: {periode}')

## Memasukkan Keseluruhan Data ke Database

Untuk memasukkan keseluruhan data ke database, kita akan melakukan tahapan berikut:
- Memanggil fungsi `fetch_clean()` untuk membaca keseluruhan dataframe di dalam folder `data_input_example`. 
- Memanggil fungsi `df_to_db()` untuk memasukkan keseluruhan dataframe tersebut ke dalam SQLite.

In [126]:
# code here
dfs_read = fetch_clean()

dfs_read[0].head()

Unnamed: 0,trans_id,account_id,type,amount,transaction_date,channel,city,state_name
395179,T00948378,A00003232,Debit,30.22,2017-03-01 02:31:57,EDC,Jackson,Mississippi
395180,T00209834,A00000712,Debit,611.42,2017-03-01 04:06:46,ATM,Philadelphia,Pennsylvania
395181,T00319785,A00001092,Debit,185.63,2017-03-01 07:01:40,EDC,Manchester,New Hampshire
395182,T01437124,A00004894,Credit,276.29,2017-03-01 07:04:55,EDC,Las Vegas,Nevada
395183,T00528344,A00001802,Debit,34.54,2017-03-01 07:17:04,ATM,Rochester,New York


In [130]:
files

['data_input_example\\2017-01-transactions.csv',
 'data_input_example\\2017-02-transactions.csv',
 'data_input_example\\2017-03-transactions.csv',
 'data_input_example\\2017-04-transactions.csv',
 'data_input_example\\2017-05-transactions.csv',
 'data_input_example\\2017-06-transactions.csv',
 'data_input_example\\2017-07-transactions.csv',
 'data_input_example\\2017-08-transactions.csv',
 'data_input_example\\2017-09-transactions.csv',
 'data_input_example\\2017-10-transactions.csv',
 'data_input_example\\2017-11-transactions.csv',
 'data_input_example\\2017-12-transactions.csv']

In [129]:
# memasukkan ke database (df_to_db)
df_to_db(dfs_read)

Berhasil Menambahkan: 2017-03
Berhasil Menambahkan: 2017-04
Berhasil Menambahkan: 2017-05
Berhasil Menambahkan: 2017-06
Berhasil Menambahkan: 2017-07
Berhasil Menambahkan: 2017-08
Berhasil Menambahkan: 2017-09
Berhasil Menambahkan: 2017-10
Berhasil Menambahkan: 2017-11
Berhasil Menambahkan: 2017-12
Berhasil Menambahkan: 2017-03
Berhasil Menambahkan: 2017-04
Berhasil Menambahkan: 2017-05
Berhasil Menambahkan: 2017-06
Berhasil Menambahkan: 2017-07
Berhasil Menambahkan: 2017-08
Berhasil Menambahkan: 2017-09
Berhasil Menambahkan: 2017-10
Berhasil Menambahkan: 2017-11
Berhasil Menambahkan: 2017-12


# Pengolahan Data (Report Sederhana)

## Cross Tabulation

Kita dapat menggunakan fungsi `crosstab()` yang telah disediakan oleh `pandas` untuk menghitung frekuensi dan melakukan agregasi pada data. Syntax yang digunakan untuk menggunakan fungsi `crosstab()` adalah:

```python
pd.crosstab(index=x,
            columns=y,
            values=numerical_columns,
            aggfunc=aggregate_function)
```

dimana:
- index: Kolom yang akan dijadikan index baris (axis 0)
- columns: Kolom yang akan dijadikan index kolom (axis 1)
- values: Kolom yang ingin dihitung
- aggfunc: Fungsi agregasi (default: count)

Fungsi agregasi (`aggfunc`) yang sering digunakan:
- `'sum'`
- `'min'`
- `'max'`
- `'mean'`
- `'median'`
- `'count'`
- `'std'`

🔻 Mari kita tinjau banyaknya frekuensi transaksi pada setiap channel untuk 2017-01

In [131]:
# code here
pd.crosstab(index = transaction['channel'],
            columns = 'Jumlah Transaksi')

col_0,Jumlah Transaksi
channel,Unnamed: 1_level_1
Internet Banking,1680
Mobile Banking,9484
EDC,7401
ATM,6560
QR,2895


✏️ **Note**

Mengurutkan nilai pada sebuah kolom dapat menggunakan method `sort_values()`

Syntax: `df.sort_values(by, ascending)`:

- `by`: nama kolom
- `ascending=True`: mengurutkan nilai dari terkecil ke terbesar

Mari kita urutkan dan simpan ke dalam objek `freq_trx`

In [None]:
# code here
freq_trx = pd.crosstab(index = transaction['channel'],
                       columns = 'Jumlah Transaksi').sort_values(by = 'Jumlah Transaksi', ascending=False)
freq_trx

🔻 Mari kita tinjau jumlah nilai (amount) transaksi pada setiap channel untuk 2017-01 dan diurutkan dari terbesar ke terkecil dan simpan ke dalam objek `total_trx`

In [137]:
# code here
total_trx = pd.crosstab(index = transaction['channel'],
            columns = 'Total Transaksi',
            values = transaction['amount'],
            aggfunc = 'sum').sort_values(by = 'Total Transaksi', ascending=False)
total_trx

col_0,Total Transaksi
channel,Unnamed: 1_level_1
EDC,2319774.88
Mobile Banking,2012773.98
ATM,1299456.12
Internet Banking,359853.06
QR,176788.19


## Save to Excel

Untuk menyimpan ke dalam excel, kita dapat menggunakan method `.to_excel()`

Syntax:
`df.to_excel(excel_writer, sheet_name)`

- `excel_writer`: Objek writer excel atau path
- `sheet_nname`: Nama sheet

Agar dapat menyimpan beberapa sheet di dalam 1 file excel yang sama, kita membutuhkan bantuan `pd.ExcelWeriter()`

Syntax:
```python
with pd.ExcelWriter('nama_file.xlsx') as writer:
    # save to excel
    df1.to_excel(writer, sheet_name)
    df2.to_excel(writer, sheet_name)
    ...
    dfn.to_excel(writer, sheet_name)
```

*Note: `.to_excel()` membutuhkan library `openpyxl`, silahkan install menggunakan `pip install openpyxl` (aktifkan environemnt terlebih dahulu)*

Mari kita simpan ke dalam `report/2017-01.xlsx` dengan ketentuan berikut:
- dataframe `freq_trx`, nama sheet = `Frequency`
- dataframe `total_trx`, nama sheet = `Total Amount`

In [138]:
# code here
with pd.ExcelWriter('report/2017-01.xlsx') as writer:
    freq_trx.to_excel(writer, sheet_name='Frequency')
    total_trx.to_excel(writer, sheet_name = 'Total Amount')

## Fungsi Generate Report

Untuk memudahkan generate report, mari kita masukkan ke dalam fungsi `report_generator(df_list)` yang menerima inputan list dataframe.

In [139]:
def report_generator(df_list):
    for df in df_list:
        # Mengambil periode bulan, untuk nama file
        periode = df['transaction_date'].dt.to_period('M').unique()[0]

        # menghitung frequency
        freq_trx = pd.crosstab(index = df['channel'],
                       columns = 'Jumlah Transaksi').sort_values(by = 'Jumlah Transaksi', ascending=False)
        
        # menghitung total amount
        total_trx = pd.crosstab(index = df['channel'],
                                columns = 'Total Transaksi',
                                values = df['amount'],
                                aggfunc = 'sum').sort_values(by = 'Total Transaksi', ascending=False)
        
        # menyimpan ke dalam file excel
        with pd.ExcelWriter(f'report/{periode}.xlsx') as writer:
            freq_trx.to_excel(writer, sheet_name = 'Frequency')
            total_trx.to_excel(writer, sheet_name = 'Total Amount')
            print(f"Berhasil membuat report: report/{periode}.xlsx")

In [140]:
# menggunakan report_generator untuk dfs_read
report_generator(dfs_read)

Berhasil membuat report: report/2017-03.xlsx
Berhasil membuat report: report/2017-04.xlsx
Berhasil membuat report: report/2017-05.xlsx
Berhasil membuat report: report/2017-06.xlsx
Berhasil membuat report: report/2017-07.xlsx
Berhasil membuat report: report/2017-08.xlsx
Berhasil membuat report: report/2017-09.xlsx
Berhasil membuat report: report/2017-10.xlsx
Berhasil membuat report: report/2017-11.xlsx
Berhasil membuat report: report/2017-12.xlsx
Berhasil membuat report: report/2017-03.xlsx
Berhasil membuat report: report/2017-04.xlsx
Berhasil membuat report: report/2017-05.xlsx
Berhasil membuat report: report/2017-06.xlsx
Berhasil membuat report: report/2017-07.xlsx
Berhasil membuat report: report/2017-08.xlsx
Berhasil membuat report: report/2017-09.xlsx
Berhasil membuat report: report/2017-10.xlsx
Berhasil membuat report: report/2017-11.xlsx
Berhasil membuat report: report/2017-12.xlsx


In [141]:
report_generator([trx17_2_clean])

Berhasil membuat report: report/2017-02.xlsx


# Next Step: Scheduling with Airflow

Pada notebook ini, kita telah membuat fungsi-fungsi untuk membaca data raw, mengolahnya, memasukkan ke dalam database dan membuat report. Selanjutnya adalah untuk melakukan `scheduling` task ini secara otomatis menggunakan Airflow. 

Kita akan membuat python script (.py) yang akan membantu kita untuk melakukan `scheduling` menggunakan airflow. Materi membuat scheduling ini akan kita pelajari di notebook kedua, yaitu: `2_DSS_Airflow.ipynb`

![](assets/MainFlow.png)