
# Advanced ETL Pipeline - Production Ready with Database and Cloud Simulation

Notebook ini merupakan lanjutan dari pipeline ETL modular sebelumnya. Di sini kita mensimulasikan ETL yang lebih mendekati kebutuhan produksi dengan elemen-elemen seperti:
- Struktur modul Python untuk skalabilitas
- Integrasi ke database (SQLite sebagai pengganti Postgres/MSSQL untuk simulasi)
- Simulasi upload ke cloud (GCS/S3) menggunakan file lokal
- Logging untuk monitoring
- Config file menggunakan `.env`

Struktur Folder Produksi:
```
project/
├── etl/
│   ├── extract.py
│   ├── transform.py
│   ├── load.py
├── config/
│   ├── settings.py (.env)
├── logs/
│   └── etl.log
├── main.py
├── raw_data.csv
└── README.md
```


In [None]:

import pandas as pd

df = pd.DataFrame({
    "customer_id": [201, 202, 203],
    "name": ["Dian", "Eka", "Fajar"],
    "join_date": ["2022-03-01", "2021-07-15", "2023-01-10"],
    "purchase_amount": [150.0, 220.5, 330.75]
})
df.to_csv("raw_data.csv", index=False)


In [None]:

# config/settings.py - menggunakan dotenv
import os
from dotenv import load_dotenv

load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///customer.db")
INPUT_PATH = os.getenv("INPUT_PATH", "raw_data.csv")
LOG_PATH = os.getenv("LOG_PATH", "etl.log")


In [None]:

# etl/extract.py
import pandas as pd
import logging

def extract_from_csv(path: str) -> pd.DataFrame:
    try:
        df = pd.read_csv(path)
        logging.info("Extracted data from CSV")
        return df
    except Exception as e:
        logging.error(f"Extract failed: {e}")
        return pd.DataFrame()


In [None]:

# etl/transform.py
import pandas as pd
import logging

def transform_customer_data(df: pd.DataFrame) -> pd.DataFrame:
    try:
        df['join_date'] = pd.to_datetime(df['join_date'])
        df['name'] = df['name'].str.title()
        df['purchase_amount'] = df['purchase_amount'].astype(float)
        df['days_since_join'] = (pd.Timestamp.now() - df['join_date']).dt.days
        logging.info("Transformed data successfully")
        return df
    except Exception as e:
        logging.error(f"Transformation failed: {e}")
        return df


In [None]:

# etl/load.py
import logging
from sqlalchemy import create_engine

def load_to_database(df, db_url, table_name="customers"):
    try:
        engine = create_engine(db_url)
        df.to_sql(table_name, con=engine, if_exists='replace', index=False)
        logging.info(f"Loaded data to database table '{table_name}'")
    except Exception as e:
        logging.error(f"Failed to load to database: {e}")


In [None]:

# logging setup
import logging

logging.basicConfig(filename='etl.log',
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    level=logging.INFO)


In [None]:

# main.py - Menjalankan seluruh pipeline

from config.settings import INPUT_PATH, DATABASE_URL
from etl.extract import extract_from_csv
from etl.transform import transform_customer_data
from etl.load import load_to_database

def run_etl():
    df_raw = extract_from_csv(INPUT_PATH)
    df_clean = transform_customer_data(df_raw)
    load_to_database(df_clean, DATABASE_URL)

run_etl()


In [None]:

# Simulasi ETL pipeline langsung di notebook
from config.settings import INPUT_PATH, DATABASE_URL
from etl.extract import extract_from_csv
from etl.transform import transform_customer_data
from etl.load import load_to_database

df = extract_from_csv(INPUT_PATH)
df_clean = transform_customer_data(df)
load_to_database(df_clean, DATABASE_URL)

print("✅ ETL pipeline sukses dijalankan dan dimuat ke database")
