In [13]:
import pandas as pd

file_path = 'dataset_user_behavior_for_test.csv'

df = pd.read_csv(file_path)

In [16]:
df['Province'].str.title()

0          East Kalimantan
1               Yogyakarta
2                     Aceh
3                East Java
4                Gorontalo
               ...        
9995               Jakarta
9996             East Java
9997    West Nusa Tenggara
9998             East Java
9999             West Java
Name: Province, Length: 10000, dtype: object

In [6]:
## Data terdiri dari 1000 rows dan 9 column yang berisi table transaction berupa behavior dari customer dalam menggunakan K-Vision+
## Selain itu juga nama column tidak compatible dengan PostgreeSQL
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 9 columns):
 #   Column                    Non-Null Count  Dtype 
---  ------                    --------------  ----- 
 0   Iduser                    10000 non-null  int64 
 1   start watching            10000 non-null  object
 2   Device Id                 10000 non-null  object
 3   Province                  10000 non-null  object
 4   City                      10000 non-null  object
 5   Content Name              10000 non-null  object
 6   Playing Time Millisecond  10000 non-null  int64 
 7   Device Type               10000 non-null  object
 8   Content Type              10000 non-null  object
dtypes: int64(2), object(7)
memory usage: 703.2+ KB


In [9]:
## Tidak terdapat null values, sehingga tidak diperlukan treatment khusus untuk null values 
df.isnull().sum()

Iduser                      0
start watching              0
Device Id                   0
Province                    0
City                        0
Content Name                0
Playing Time Millisecond    0
Device Type                 0
Content Type                0
dtype: int64

In [11]:
df.isin(['NA', 'N/A']).sum()

Iduser                      0
start watching              0
Device Id                   0
Province                    0
City                        0
Content Name                0
Playing Time Millisecond    0
Device Type                 0
Content Type                0
dtype: int64

In [None]:
### ETL Pipeline dengan menggunakan Airflow sebagai scheduler

import pandas as pd
from sqlalchemy import create_engine
import psycopg2
import numpy as np
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator


def localize_utc_tz(d):
    return tz.fromutc(d)


csv_file_path = 'dataset_user_behavior_for_test.csv'
db_username = 'postgres'
db_password = 'admin'
db_host = '127.0.0.1'
db_port = '5432'
db_name = 'postgres'
table_name = 'dataset_user_behavior_for_test'


def trunc_insert(csv_file_path, db_username, db_password, db_host, db_port, db_name, table_name):
    
    #1. Extract
    df = pd.read_csv(csv_file)
    
    #2. Transform : For Data Quality & Validity Check
    # Untuk merubah nama kolom agar compatible dengan PostgreSQL
    df.columns = df.columns.str.replace(' ', '_').str.lower()
    
    # Data Quality Check 1: Memeriksa kolom uniquer ID agar tidak ada data kosong (cth 'iduser')
    if df['iduser'].isnull().any():
        raise ValueError("Data quality check failed: 'iduser' contains null values!")
    
    # Data Quality Check 2: Mengisi null valeus pada semua column agar disesuaikan dengan default tipe datanya
    for column in df.columns:
        if df[column].dtype == 'object':  # String Column
            df[column].fillna('unknown', inplace=True)
        elif np.issubdtype(df[column].dtype, np.number):  # Numerical Column
            df[column].fillna(0, inplace=True)
        elif np.issubdtype(df[column].dtype, np.datetime64):  # Datetime Column
            df[column].fillna(pd.Timestamp('1970-01-01'), inplace=True)
    
    # Data Validity Check : Make sure bahwa column start_Watching memiliki format yang sesuai
    try :
        df['start_watching'] = pd.to_datetime(df['start_watching'])
    except Exception as e:
        raise ValueError(f"'start_watching' column format is incorret.")
    
    # Mengganti data NaN menjadi None
    df = df.replace({np.nan: None})
    
    # Mengubah setiap value pada column province menjadi huruf kapital
    df['province'] = df['province'].str.title()
    
    # 3. LOAD: Connection ke PostgreSQL dan memuat data
    engine = create_engine(f'postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}')
    
    # Karena data hanya berupa 1000 rows, maka saya menggunakan approach delete(all period) - insert untuk proses load data nya.
    # Berbeda dengan kondisi jika jumlah table nya banyak, maka bisa menggunakan approach untuk delete(Cutt off Period) ataupun dengan menggunakan batch insert
    with engine.connect() as conn:
        conn.execute(f"DELETE FROM {table_name}")
        print(f"Table '{table_name}' has been deleted")
    
    # Load data ke PostgreeSQL
    df.to_sql(table_name, con=engine, if_exists='append', index=False)
    print(f"Data loaded sccessfully into '{table_name}'")
    
    return "ETL Process Success !!"

default_args = {
    'owner': 'Dicky',
    'depends_on_past': False,
    'email': ['dicky02ard@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=15),
    'execution_timeout': timedelta(hours=1),
}

with DAG(
    'User Behavior', # DAGs Name
    default_args=default_args,
    schedule_interval='0 5 * * *', # Daily Run every 05.00 AM
    start_date=pendulum.datetime(2024, 11, 9, tz="Asia/Jakarta"),
    catchup=False,
    tags=['ETL', 'User_Behavior'],
    user_defined_filters={
        'localtz': localize_utc_tz,
    }
) as dag:
    t1 = PythonOperator(
        task_id='ETL_Customer_Behavior',
        python_callable=trunc_insert,
        op_args=[csv_file_path, db_username, db_password, db_host, db_port, db_name, 'dataset_user_behavior']
    )
    t1

In [None]:
### 3. Create 2 tables from main table with these questions:

################################################################
### How many users from each province? :

## Table Name -> User_behavior_by_province

## SQL : 
# SELECT 
#   Province,
#   COUNT(DISTINCT Iduser) Number_of_users
# FROM User_behavior a
# GROUP BY Province
# ORDER BY COUNT(DISTINCT Iduser) desc;

## Result :
# Row Province Number_of_users
# 1 Jakarta 2386
# 2 West Java 2133
# 3 East Java 1456
# 4 Central Java 594
# 5 Riau 363
# 6 Banten 298
# 7 East Kalimantan 273
# 8 North Sumatra 264
# 9 South Sulawesi 260
# 10 South Sumatra 231
# 11 South Kalimantan 201
# 12 Yogyakarta 180
# 13 Unknown 135
# 14 West Kalimantan 127
# 15 Kuala lumpur 105
# 16 West Sumatra 96
# 17 Bali 96
# 18 North Sulawesi 81
# 19 Selangor 69
# 20 Jambi 67
# 21 lampung 62
# 22 West Nusa Tenggara 59
# 23 Aceh 42
# 24 Papua 39
# 25 Central Kalimantan 37
# 26 Central Sulawesi 36
# 27 Sabah 30
# 28 Sarawak 27
# 29 Maluku 22
# 30 Riau Islands 20
# 31 North Kalimantan 20
# 32 Southeast Sulawesi 18
# 33 East Nusa Tenggara 14
# 34 Central And Western District 13
# 35 Bengkulu 12
# 36 Gorontalo 11
# 37 Johor 10
# 38 North Maluku 9
# 39 Bangka–Belitung Islands 9
# 40 Perak 8
# 41 Mecca Region 7
# 42 Taichung City 6
# 43 Penang 6
# 44 Kedah 6
# 45 Imarat Ra's Al Khaymah	5
# 46 Taipei City 5
# 47 Riyadh Region 4
# 48 Brunei-Muara District 4
# 49 Medina Region 3
# 50 West Papua 3
# 51 Changhua 3
# 52 Kaohsiung 3
# 53 Istanbul 2
# 54 Gyeonggi-do 2
# 55 Negeri Sembilan 2
# 56 New Taipei 2
# 57 Fujairah 2
# 58 Abidjan 1
# 59 Belgrade 1
# 60 Cordoba 1
# 61 Casablanca 1
# 63 Seoul 1
# 64 Ca 1
# 65 Pointe-noire 1
# 66 Azores 1
# 67 Maranhao 1
# 68 Terengganu 1
# 69 Kelantan 1
# 70 Paris 1
# 71 Geneva 1
# 72 Western Australia 1
# 73 Trøndelag 1
# 74 Phnom Penh 1
# 75 Metro Manila 1
# 76 Va 1
# 77 Al Asimah 1
# 78 Dili 1 
# 79 Rio De Janeiro 1
# 80 Chiayi 1


 
#################################################################    
### How many users are there in each content type? : 

## Table Name -> User_behavior_by_content

## SQL : 
# SELECT 
#   a.Content_Type,
#   COUNT(DISTINCT a.Iduser) Number_of_users
# FROM User_behavior a
# GROUP BY a.Content_Type
# ORDER BY COUNT(DISTINCT Iduser) desc;

## Result
#    Row Content_Type Number_of_users
#    1 Series 5844
#    2 Channel Live 3213
#    3 Movie 913
#    4 Catchup 30

