In [None]:
import pandas as pd
import os
import numpy as np

# ==========================================
# CONFIGURATION & MAPPINGS
# ==========================================

# UPDATE!!!: Local Directory Paths

DATA_DIR = r'data/raw'
OUTPUT_DIR = r'data/processed'
IMG_DIR = r'data/visuals'
OUTPUT_FILE = '01_Data_Prep_Master.csv'

# hierarchy: check regions first to catch "Exports to China" before sector logic applies
GEO_MAP = {
    'ASEAN': ['asean', 'southeast asia', 'brunei', 'cambodia', 'indonesia', 'lao', 'malaysia', 'myanmar', 'philippines', 'thailand', 'vietnam'],
    'European Union': ['european union', 'eu-27', 'belgium', 'france', 'germany', 'greece', 'italy', 'netherlands', 'spain', 'sweden', 'ireland', 'denmark', 'finland', 'portugal', 'luxembourg', 'cyprus'],
    'Greater China': ['china', 'hong kong', 'taiwan', 'macau'],
    'North America': ['usa', 'united states', 'america', 'canada', 'mexico'],
    'South Asia': ['india', 'bangladesh', 'pakistan', 'sri lanka', 'south asia'],
    'East Asia': ['japan', 'korea', 'north asia', 'east asia'],
    'Oceania': ['australia', 'new zealand', 'oceania', 'papua new guinea', 'marshall islands'],
    'Middle East': ['uae', 'united arab emirates', 'saudi', 'qatar', 'kuwait', 'israel', 'iran', 'west asia', 'turkiye', 'turkey'],
    'Europe (Non-EU)': ['united kingdom', 'uk', 'switzerland', 'norway', 'russia', 'europe'],
    'Africa': ['africa', 'south africa', 'egypt', 'nigeria', 'mauritius', 'liberia'],
    'South America': ['brazil', 'chile', 'peru', 'panama', 'argentina'],
    'Offshore / Islands': ['bermuda', 'cayman', 'virgin islands']
}

# priority mapping for SSIC 2020 alignment
SECTOR_MAP = [
    ('Agriculture & Primary Ind.', ['agriculture', 'fishing', 'quarrying']),
    ('Information & Communications', ['info', 'comm', 'tele', 'computer', 'software', 'media', 'audio-visual', 'publishing']),
    ('Wholesale & Retail Trade', ['wholesale', 'retail', 'trade', 'shopping']),
    ('Real Estate', ['real estate', 'ownership', 'property', 'leasing']),
    ('Transportation & Storage', ['transport', 'storage', 'freight', 'logistics', 'postal', 'courier', 'shipping', 'air', 'sea', 'travel', 'passenger']),
    ('Manufacturing', ['manufact', 'precision', 'chemical', 'biomedical']),
    ('Construction', ['construct']),
    ('Finance & Insurance', ['financ', 'insur', 'bank', 'fisim', 'fund']),
    ('Professional Services', ['profession', 'legal', 'account', 'consult', 'architect', 'engineer', 'business', 'headquarters']),
    ('Admin & Support Services', ['admin', 'support', 'employment agency', 'security']),
    ('Education & Health', ['education', 'health', 'medical', 'social', 'hospital']),
    ('Arts & Recreation', ['arts', 'entertainment', 'recreation', 'sightseeing']),
    ('Accommodation & Food Services', ['accom', 'food', 'beverage', 'hotel', 'restaurant']),
    ('Public Administration', ['public admin', 'government', 'defence']),
    ('Other Services', ['other services', 'personal services', 'repair', 'maintenance']),
    # Macros kept for context, separated in downstream modeling
    ('Macro: GDP', ['gdp', 'gross domestic']),
    ('Macro: Labor', ['employment', 'jobseekers', 'placements']),
    ('Macro: Tourism', ['tourism', 'visitor']),
    ('Macro: Trade Flow', ['export', 'import'])
]

# keywords that indicate header junk or aggregates I want to drop to avoid double counting
NOISE_KEYWORDS = [
    'data are', 'refer to', 'contact', 'generated', 'note:', 'nil', 'not available',
    'definitions', 'values are shown', 'numbers may not add', 'notation:', 'notes',
    'place of residence', 'unknown', 'male', 'female', 'not elsewhere classified',
    'not elsewhere specified', 'services producing', 'goods producing', 'value added',
    'taxes on', 'basic price', 'market prices'
]

# ==========================================
# HELPER FUNCTIONS
# ==========================================

def parse_universal_date(date_str):
    """Parses mixed frequencies (Quarterly, Monthly, Annual) into standard Timestamps."""
    try:
        date_str = str(date_str).strip()

        # Handle Quarterly: "2023 1Q" -> 2023-03-31
        if 'Q' in date_str:
            parts = date_str.split()
            if len(parts) == 2:
                year, quarter = int(parts[0]), int(parts[1][0])
                month = quarter * 3
                return pd.Timestamp(year=year, month=month, day=1) + pd.offsets.MonthEnd(0)

        # Handle Monthly: "2023 Jan"
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        if any(m in date_str for m in months):
            return pd.to_datetime(date_str) + pd.offsets.MonthEnd(0)

        # Handle Annual: "2023" -> 2023-12-31
        # forcing annual data to Dec 31 avoids ambiguous offsets when merging with quarterly data
        if date_str.isdigit() and len(date_str) == 4:
            return pd.Timestamp(year=int(date_str), month=12, day=31)

        return pd.NaT
    except:
        return pd.NaT

def ingest_dos_csv(file_path, filename):
    """Detects variable headers and melts wide-format DOS csvs."""
    # scan first 20 lines for metadata anchor "Data Series"
    header_row = 0
    with open(file_path, 'r', encoding='ISO-8859-1') as f:
        lines = f.readlines()
        for i, line in enumerate(lines[:20]):
            if "Data Series" in line or "Variables" in line:
                header_row = i
                break

    df = pd.read_csv(file_path, encoding='ISO-8859-1', header=header_row, on_bad_lines='skip')
    df = df.rename(columns={df.columns[0]: 'raw_variable'})
    df = df.dropna(subset=['raw_variable'])

    # melt (N, T) -> (N*T, 1) for database ingestion
    id_vars = ['raw_variable']
    value_vars = [c for c in df.columns if c not in id_vars]
    df_long = df.melt(id_vars=id_vars, value_vars=value_vars, var_name='raw_date', value_name='value')
    df_long['source_file'] = filename
    return df_long

def get_region(raw_name):
    name = str(raw_name).lower()
    for region, keywords in GEO_MAP.items():
        if any(k in name for k in keywords):
            return region
    return "Singapore / Total"

def get_sector(raw_name):
    name = str(raw_name).strip()
    name_lower = name.lower()

    # noise filtering
    if any(k in name_lower for k in NOISE_KEYWORDS) or len(name) > 100:
        return "REMOVE"

    for target, keywords in SECTOR_MAP:
        if any(k in name_lower for k in keywords):
            return target

    # fallback: if it's a country name, tag as trade flow
    if get_region(name) != "Singapore / Total":
        return "Cross-Border / Trade Partner"

    return "Other Industries"

# ==========================================
# EXECUTION PIPELINE
# ==========================================

# 1. Manual Data Injection (IMDA)
# unstable source format necessitates hardcoding this critical digital adoption data
imda_raw = {
    'raw_variable': (
        ['Business Usage of Computers'] * 6 +
        ['Business Usage of Internet'] * 6 +
        ['Business Usage of Cloud Computing Services'] * 6 +
        ['Business Usage of E-Payment'] * 6 +
        ['Business Usage of Data Analytics'] * 6 +
        ['Business Usage of Artificial Intelligence'] * 6
    ),
    'year_str': ['2019', '2020', '2021', '2022', '2023', '2024'] * 6,
    'value': [
        0.92, 0.91, 0.92, 0.94, 0.94, 0.94, # Computers
        0.95, 0.93, 0.93, 0.96, 0.96, 0.97, # Internet
        0.22, 0.20, 0.27, 0.27, 0.31, 0.36, # Cloud
        0.81, 0.85, 0.87, 0.93, 0.94, 0.94, # E-Payment
        0.11, 0.10, 0.13, 0.13, 0.14, 0.17, # Data Analytics
        0.04, 0.04, 0.03, 0.04, 0.04, 0.15  # AI
    ]
}
df_imda = pd.DataFrame(imda_raw)
df_imda['source_file'] = 'Manual_Input_IMDA_Usage'
df_imda['date_obj'] = pd.to_datetime(df_imda['year_str'] + '-12-31')
df_imda = df_imda.drop(columns=['year_str'])

# 2. Universal Ingestion Loop
dfs_to_concat = [df_imda]

for root, dirs, files in os.walk(DATA_DIR):
    for file in files:
        if file.lower().endswith('.csv') and 'ERI_' not in file:
            try:
                full_path = os.path.join(root, file)
                df_part = ingest_dos_csv(full_path, file)

                # parsing dates immediately allows early filtering of garbage rows
                df_part['date_obj'] = df_part['raw_date'].apply(parse_universal_date)
                valid_rows = df_part.dropna(subset=['date_obj'])

                if not valid_rows.empty:
                    dfs_to_concat.append(valid_rows)
            except Exception as e:
                # I'm silencing errors here to allow the batch to proceed;
                # bad files are usually just metadata/readme files
                pass

# 3. Consolidation & Cleaning
df_master = pd.concat(dfs_to_concat, ignore_index=True)
df_master['value'] = pd.to_numeric(df_master['value'], errors='coerce')

# apply taxonomies
df_master['clean_sector'] = df_master['raw_variable'].apply(get_sector)
df_master['clean_region'] = df_master['raw_variable'].apply(get_region)

# strict filtering
df_master = df_master[df_master['clean_sector'] != "REMOVE"].copy()
df_master = df_master.dropna(subset=['value'])

# 4. Final Formatting
df_master['year'] = df_master['date_obj'].dt.year
df_master['quarter'] = df_master['date_obj'].dt.quarter
df_master['frequency'] = 'quarterly' # default granularity

# standardize text
df_master['clean_sector'] = df_master['clean_sector'].str.strip().str.title()
df_master['clean_region'] = df_master['clean_region'].str.strip().str.title()

# final schema enforcement
cols_order = ['clean_sector', 'clean_region', 'year', 'quarter', 'frequency', 'raw_variable', 'value', 'source_file']
df_master = df_master[cols_order].sort_values(by=['clean_sector', 'clean_region', 'year', 'quarter'])

# 5. Save Artifact
# UPDATED: Save to the Output folder
output_path = os.path.join(OUTPUT_DIR, OUTPUT_FILE)
df_master.to_csv(output_path, index=False)
print(f"Saved artifact to Local Drive: {output_path}")
print(f"Final Shape: {df_master.shape}")