## **Mount Google Drive**

In [1]:
from google.colab import drive
drive.mount("/content/drive")

import os

Mounted at /content/drive


# **SETUP**

In [2]:
import pandas as pd
import json
import os
import time
import sqlite3
import logging
from datetime import datetime
from google.colab import drive

## **Path Input/Output**

In [3]:
BASE_DIR = "/content/drive/MyDrive/bigdata_final_project"
RAW_DIR = os.path.join(BASE_DIR, "raw")
WAREHOUSE_DIR = os.path.join(BASE_DIR, "warehouse")
LOG_FILE = os.path.join(BASE_DIR, "etl_pipeline.log")

os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)

SRC1 = "/content/drive/MyDrive/bigdata_final_project/raw/raw_taxi_data.csv"
SRC2 = "/content/drive/MyDrive/bigdata_final_project/raw/raw_weather_data.json"

# **EXTRACT**

In [4]:
# --- CELL 2: EXTRACT ---

def extract_taxi_data(filepath):
    """Membaca data mentah Taxi CSV"""
    print(f"üìÇ Membaca Taxi Data dari: {filepath}...")
    start_time = time.time()

    try:
        df = pd.read_csv(filepath)
        # Log Metadata
        metadata = {
            "source": "NYC Taxi CSV",
            "rows": len(df),
            "columns": list(df.columns),
            "file_size_mb": os.path.getsize(filepath) / (1024 * 1024)
        }
        logging.info(f"EXTRACT SUCCESS: {metadata}")
        print(f"‚úÖ Taxi Data Loaded: {len(df)} baris in {round(time.time()-start_time, 2)}s")
        return df
    except Exception as e:
        logging.error(f"EXTRACT FAILED (Taxi): {e}")
        print(f"‚ùå Error reading Taxi data: {e}")
        return None

def extract_weather_data(filepath):
    """Membaca data mentah Weather JSON"""
    print(f"üìÇ Membaca Weather Data dari: {filepath}...")
    start_time = time.time()

    try:
        with open(filepath, 'r') as f:
            data = json.load(f)

        # Parse JSON (Ambil bagian 'hourly')
        df = pd.DataFrame(data['hourly'])

        # Ambil hanya kolom penting
        cols_needed = ['time', 'temperature_2m', 'rain']
        df = df[cols_needed] if all(c in df.columns for c in cols_needed) else df

        logging.info(f"EXTRACT SUCCESS (Weather): {len(df)} rows")
        print(f"‚úÖ Weather Data Loaded: {len(df)} baris in {round(time.time()-start_time, 2)}s")
        return df
    except Exception as e:
        logging.error(f"EXTRACT FAILED (Weather): {e}")
        print(f"‚ùå Error reading Weather data: {e}")
        return None

# Eksekusi Extract
raw_taxi_path = os.path.join(RAW_DIR, "raw_taxi_data.csv")
raw_weather_path = os.path.join(RAW_DIR, "raw_weather_data.json")

df_taxi_raw = extract_taxi_data(raw_taxi_path)
df_weather_raw = extract_weather_data(raw_weather_path)

üìÇ Membaca Taxi Data dari: /content/drive/MyDrive/bigdata_final_project/raw/raw_taxi_data.csv...
‚úÖ Taxi Data Loaded: 150000 baris in 1.02s
üìÇ Membaca Weather Data dari: /content/drive/MyDrive/bigdata_final_project/raw/raw_weather_data.json...
‚úÖ Weather Data Loaded: 744 baris in 0.59s


# **CLEANING**

In [5]:
# --- CELL 3: CLEANING ---

def clean_taxi_data(df):
    """Membersihkan data Taxi"""
    df = df.copy()

    # 1. Standardisasi Nama Kolom (Snake Case)
    # Mapping nama kolom sesuai data asli Anda ke standar database
    rename_map = {
        'pickup_datetime': 'pickup_datetime',
        'tpep_dropoff_datetime': 'dropoff_datetime',
        'PULocationID': 'pu_location_id',
        'DOLocationID': 'do_location_id'
    }
    df.rename(columns=rename_map, inplace=True)
    df.columns = [c.lower() for c in df.columns] # Pastikan lowercase semua

    # 2. Konversi Tipe Data Waktu
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])

    # 3. Hapus Duplikat & Null Penting
    initial_rows = len(df)
    df.drop_duplicates(inplace=True)
    df.dropna(subset=['pickup_datetime', 'dropoff_datetime'], inplace=True)

    print(f"üßπ Taxi Cleaned: {initial_rows} -> {len(df)} baris (Dibuang: {initial_rows - len(df)})")
    return df

def clean_weather_data(df):
    """Membersihkan data Weather"""
    df = df.copy()

    # 1. Rename kolom agar konsisten
    df.rename(columns={
        'temperature_2m': 'temperature',
        'rain': 'rainfall_mm'
    }, inplace=True)

    # 2. Konversi Waktu
    df['time'] = pd.to_datetime(df['time'])

    # 3. Handle Missing Values (Forward Fill untuk data cuaca)
    df.fillna(method='ffill', inplace=True)

    print(f"üßπ Weather Cleaned: {len(df)} baris")
    return df

# Eksekusi Cleaning
df_taxi_clean = clean_taxi_data(df_taxi_raw)
df_weather_clean = clean_weather_data(df_weather_raw)

üßπ Taxi Cleaned: 150000 -> 150000 baris (Dibuang: 0)
üßπ Weather Cleaned: 744 baris


  df.fillna(method='ffill', inplace=True)


# **TRANSFORM**

In [6]:
# --- CELL 4: TRANSFORM & ENRICHMENT ---

def transform_data(df_taxi, df_weather):
    print("‚öôÔ∏è Memulai Transformasi Data...")

    # 1. Siapkan Key untuk Join (Bulatkan waktu pickup ke jam terdekat)
    df_taxi['join_key'] = df_taxi['pickup_datetime'].dt.floor('H')

    # 2. MERGE (Join Data Taxi + Cuaca)
    df_merged = pd.merge(
        df_taxi,
        df_weather,
        left_on='join_key',
        right_on='time',
        how='left'
    )

    # --- 5 FITUR BARU ---

    # Fitur 1: Durasi Perjalanan (Menit)
    df_merged['trip_duration_minutes'] = (df_merged['dropoff_datetime'] - df_merged['pickup_datetime']).dt.total_seconds() / 60
    df_merged['trip_duration_minutes'] = df_merged['trip_duration_minutes'].round(2)

    # Fitur 2: Kecepatan Rata-rata (MPH)
    # Rumus: Jarak / (Durasi Jam). Hindari bagi nol.
    df_merged['average_speed_mph'] = df_merged.apply(
        lambda x: x['trip_distance'] / (x['trip_duration_minutes']/60) if x['trip_duration_minutes'] > 0 else 0,
        axis=1
    ).round(2)

    # Fitur 3: Waktu (Time of Day)
    def get_time_of_day(h):
        if 6 <= h < 12: return 'Pagi'
        elif 12 <= h < 18: return 'Siang'
        elif 18 <= h < 24: return 'Malam'
        else: return 'Dini Hari'

    df_merged['time_of_day'] = df_merged['pickup_datetime'].dt.hour.apply(get_time_of_day)

    # Fitur 4: Is Weekend (1 = Ya, 0 = Tidak)
    # 5=Sabtu, 6=Minggu
    df_merged['is_weekend'] = df_merged['pickup_datetime'].dt.dayofweek.apply(lambda x: 1 if x >= 5 else 0)

    # Fitur 5: Cost per Mile
    df_merged['cost_per_mile'] = df_merged.apply(
        lambda x: x['total_amount'] / x['trip_distance'] if x['trip_distance'] > 0 else 0,
        axis=1
    ).round(2)

    # --- VALIDASI & FILTER AKHIR ---
    # Buang data anomali (Durasi negatif atau 0)
    df_final = df_merged[df_merged['trip_duration_minutes'] > 0].copy()

    # Pilih kolom final untuk Warehouse
    final_cols = [
        'pickup_datetime', 'dropoff_datetime', 'passenger_count',
        'trip_distance', 'pu_location_id', 'do_location_id', 'total_amount',
        'temperature', 'rainfall_mm',
        'trip_duration_minutes', 'average_speed_mph', 'time_of_day', 'is_weekend', 'cost_per_mile'
    ]

    # Pastikan hanya kolom yang ada yang diambil
    df_final = df_final[[c for c in final_cols if c in df_final.columns]]

    print(f"‚úÖ Transformasi Selesai. Data Final: {df_final.shape}")
    return df_final

# Eksekusi Transform
df_fact_etl = transform_data(df_taxi_clean, df_weather_clean)

# Cek hasil 5 baris teratas
df_fact_etl.head()

‚öôÔ∏è Memulai Transformasi Data...


  df_taxi['join_key'] = df_taxi['pickup_datetime'].dt.floor('H')


‚úÖ Transformasi Selesai. Data Final: (149921, 14)


Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pu_location_id,do_location_id,total_amount,temperature,rainfall_mm,trip_duration_minutes,average_speed_mph,time_of_day,is_weekend,cost_per_mile
0,2025-01-18 20:53:30,2025-01-18 21:00:47,1.0,0.97,238,166,13.32,4.3,0.0,7.28,7.99,Malam,1,13.73
1,2025-01-25 11:12:51,2025-01-25 11:17:57,1.0,0.6,50,48,10.55,-4.4,0.0,5.1,7.06,Pagi,1,17.58
2,2025-01-21 15:09:31,2025-01-21 15:19:02,1.0,0.8,236,237,16.65,-8.5,0.0,9.52,5.04,Siang,0,20.81
3,2025-01-11 22:25:45,2025-01-11 22:34:22,2.0,1.93,231,68,20.37,-1.7,0.0,8.62,13.43,Malam,1,10.55
4,2025-01-04 23:37:07,2025-01-04 23:45:58,,4.44,137,88,35.12,-2.9,0.0,8.85,30.1,Malam,1,7.91


# **LOAD**

In [7]:
# --- CELL 5: LOAD TO WAREHOUSE (ANTI-LOCK VERSION) ---

def load_to_sqlite(df, db_path):
    print(f"üíæ Menyimpan data ke SQLite: {db_path}...")

    # PERBAIKAN: Tambahkan timeout=30 (Menunggu maksimal 30 detik jika DB sibuk)
    conn = sqlite3.connect(db_path, timeout=30)
    cursor = conn.cursor()

    try:
        # 1. Buat Tabel Dimensi
        unique_locations = pd.DataFrame(df['pu_location_id'].unique(), columns=['location_id']).dropna()
        unique_locations.to_sql('dim_location', conn, if_exists='replace', index=False)

        # 2. Buat Tabel Fakta
        df.to_sql('fact_trips_etl', conn, if_exists='replace', index_label='trip_id')

        # 3. Verifikasi
        cursor.execute("SELECT count(*) FROM fact_trips_etl")
        count = cursor.fetchone()[0]

        print(f"‚úÖ LOAD SUKSES! Tabel 'fact_trips_etl' berisi {count} baris.")
        logging.info(f"ETL PIPELINE COMPLETED. Loaded {count} rows.")

    except Exception as e:
        print(f"‚ùå Error saat Load: {e}")
        # Hint untuk user jika masih locked
        if "locked" in str(e):
            print("üí° TIPS: Jika masih 'locked', lakukan 'Runtime > Restart Session' di menu atas.")
        logging.error(f"LOAD FAILED: {e}")
    finally:
        # Pastikan koneksi selalu ditutup
        conn.close()

# Eksekusi Load
db_file = os.path.join(WAREHOUSE_DIR, "warehouse.db")
load_to_sqlite(df_fact_etl, db_file)

üíæ Menyimpan data ke SQLite: /content/drive/MyDrive/bigdata_final_project/warehouse/warehouse.db...
‚úÖ LOAD SUKSES! Tabel 'fact_trips_etl' berisi 149921 baris.
