In [1]:
import pyodbc
import pandas as pd
from pathlib import Path

# --- DATABASE SETTINGS ---
DB_CONFIG = {
    'server': 'DESKTOP-PG3CATF',
    'database': 'OSHE_Analytics',
    'driver': 'ODBC Driver 18 for SQL Server'
}

# Definisikan nama tabel di sini agar terpusat
STAGING_TABLE = 'staging.accidents_raw'
FINAL_TABLE = 'dbo.Accidents_Clustered_Final'

# --- DIRECTORY SETUP ---
# Menggunakan Pathlib untuk manajemen folder yang lebih robust
BASE_DIR = Path.cwd().parent if Path.cwd().name == 'python' else Path.cwd()
DATA_DIR = BASE_DIR / 'data'
STAGING_CSV = DATA_DIR / 'staging' / 'accidents_staging.csv'

# --- CONNECTION HELPER ---
def get_conn(db_name='master'):
    """Fungsi pembantu untuk membuat koneksi SQL Server secara dinamis"""
    conn_str = (
        f"DRIVER={{{DB_CONFIG['driver']}}};"
        f"SERVER={DB_CONFIG['server']};"
        f"DATABASE={db_name};"
        f"Trusted_Connection=yes;TrustServerCertificate=yes;"
    )
    return pyodbc.connect(conn_str)

print(f"Configuration Loaded.")
print(f"Project Root: {BASE_DIR}")
print(f"Target Table: {STAGING_TABLE}")

Configuration Loaded.
Project Root: e:\MyProject-GitHub\OSHE-Interview-Project
Target Table: staging.accidents_raw


In [2]:
print("Checking SQL Infrastructure...")

try:
    with get_conn() as conn:
        cursor = conn.cursor()
        
        # Ambil informasi server dan database yang aktif
        cursor.execute("SELECT @@SERVERNAME, @@VERSION")
        server, version = cursor.fetchone()
        
        cursor.execute("SELECT name FROM sys.databases")
        db_list = [row[0] for row in cursor.fetchall()]

    print(f"[Server]: {server}")
    print(f"[Status]: Connected")
    print(f"[DB Count]: {len(db_list)} databases detected")
    print(f"Connectivity test passed.")

except Exception as e:
    print(f"Connection error: {e}")

Checking SQL Infrastructure...
[Server]: DESKTOP-PG3CATF
[Status]: Connected
[DB Count]: 5 databases detected
Connectivity test passed.


In [3]:
print("Initializing Database Objects...")

try:
    # 1. Database Creation (Master Context)
    conn = get_conn('master')
    conn.autocommit = True # Wajib untuk perintah CREATE DATABASE
    cursor = conn.cursor()
    
    # Buat database jika belum ada
    cursor.execute(f"IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{DB_CONFIG['database']}') "
                   f"BEGIN CREATE DATABASE {DB_CONFIG['database']} END")
    conn.close()

    # 2. Schema Provisioning (Target DB Context)
    with get_conn(DB_CONFIG['database']) as conn:
        conn.autocommit = True
        cursor = conn.cursor()
        
        # Buat schema 'staging' untuk memisahkan data mentah dan data bersih (Best Practice)
        cursor.execute("IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'staging') "
                       "EXEC('CREATE SCHEMA staging')")
    
    print(f"Database '{DB_CONFIG['database']}' and schema 'staging' are verified/ready.")

except Exception as e:
    print(f"Provisioning failed: {e}")

Initializing Database Objects...
Database 'OSHE_Analytics' and schema 'staging' are verified/ready.


In [4]:
print("Step 4: Provisioning Staging Infrastructure")

# Pastikan variabel sudah ada di memori
if 'STAGING_TABLE' not in globals():
    STAGING_TABLE = 'staging.accidents_raw'

if not STAGING_CSV.exists():
    print(f"âœ— Error: Data source missing at {STAGING_CSV}")
else:
    try:
        # 1. Analisis Struktur Data (Schema Discovery)
        sample_df = pd.read_csv(STAGING_CSV, nrows=1, low_memory=False)
        
        # 2. Normalisasi Nama Kolom
        clean_columns = [c.replace(' ', '_').replace('-', '_').replace('.', '_').replace('(', '').replace(')', '') 
                         for c in sample_df.columns]

        # 3. SQL DDL Generation (Safe Landing Strategy)
        col_defs = [f"[{name}] NVARCHAR(MAX)" for name in clean_columns]
        
        create_sql = f"""
        IF OBJECT_ID('{STAGING_TABLE}', 'U') IS NOT NULL DROP TABLE {STAGING_TABLE};
        CREATE TABLE {STAGING_TABLE} (
            [STG_ID] INT IDENTITY(1,1) PRIMARY KEY,
            {', '.join(col_defs)},
            [LOAD_TIMESTAMP] DATETIME DEFAULT GETDATE()
        )"""

        # Menggunakan koneksi ke OSHE_Analytics
        with get_conn(DB_CONFIG['database']) as conn:
            conn.autocommit = True
            conn.cursor().execute(create_sql)

        print(f"Staging table '{STAGING_TABLE}' created successfully.")

    except Exception as e:
        print(f"Failed to create staging infrastructure: {e}")

Step 4: Provisioning Staging Infrastructure
Staging table 'staging.accidents_raw' created successfully.


In [5]:
print("Step 5: Executing Data Ingestion Pipeline")

try:
    # 1. Load & Pre-process (Memory Efficient)
    df = pd.read_csv(STAGING_CSV, low_memory=False)
    
    # Sinkronisasi nama kolom dengan tabel SQL (tanpa spasi/karakter spesial)
    df.columns = [c.replace(' ', '_').replace('-', '_').replace('.', '_').replace('(', '').replace(')', '') 
                  for c in df.columns]
    
    # Konversi seluruh data menjadi string (NVARCHAR compatible) dan handle NULLs
    # Mengganti NaN (Not a Number) menjadi None agar masuk sebagai NULL di SQL
    df = df.where(pd.notnull(df), None).astype(str).replace('None', None)

    # 2. SQL Batch Ingestion
    with get_conn(DB_CONFIG['database']) as conn:
        cursor = conn.cursor()
        cursor.execute(f"TRUNCATE TABLE {STAGING_TABLE}") # Hapus data lama (Fresh Load)
         
        # Penyiapan template Query Insert
        cols_joined = ", ".join([f"[{c}]" for c in df.columns])
        placeholders = ", ".join(["?"] * len(df.columns))
        insert_sql = f"INSERT INTO {STAGING_TABLE} ({cols_joined}) VALUES ({placeholders})"
        
        # 3. Execution in Batches
        batch_size = 5000
        total_rows = len(df)
        print(f"Ingesting {total_rows:,} records...")

        for i in range(0, total_rows, batch_size):
            batch_data = [tuple(x) for x in df.iloc[i : i + batch_size].values]
            cursor.executemany(insert_sql, batch_data)
            conn.commit()
            
            if (i + batch_size) % 50000 == 0 or (i + batch_size) >= total_rows:
                print(f"  Progress: {min(i + batch_size, total_rows):,} / {total_rows:,}")

    print(f"Ingestion completed. Data is now available in {STAGING_TABLE}.")

except Exception as e:
    print(f"Ingestion failed: {e}")

Step 5: Executing Data Ingestion Pipeline
Ingesting 270,921 records...
  Progress: 50,000 / 270,921
  Progress: 100,000 / 270,921
  Progress: 150,000 / 270,921
  Progress: 200,000 / 270,921
  Progress: 250,000 / 270,921
  Progress: 270,921 / 270,921
Ingestion completed. Data is now available in staging.accidents_raw.
