In [4]:
# -----------------------------
# Data Cleaning Worflow
# -----------------------------

import pandas as pd
import os
from datetime import datetime

PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
RAW_DATA_PATH = os.path.join(PROJECT_ROOT, "data", "raw")
CLEANED_DATA_PATH = os.path.join(PROJECT_ROOT, "data", "cleaned")
os.makedirs(CLEANED_DATA_PATH, exist_ok=True)

log_file_path = os.path.join(CLEANED_DATA_PATH, "cleaned_log.txt")

# Initialize log file
with open(log_file_path, "w") as f:
    f.write(f"Provenance log started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")

def log_step(description):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"{timestamp} - {description}\n"
    with open(log_file_path, "a") as f:
        f.write(log_entry)


files = [
    "campaign_desc.csv", "campaign_table.csv", "coupon.csv",
    "coupon_redempt.csv", "causal_data.csv", "product.csv",
    "transaction_data.csv", "hh_demographic.csv"
]

# -----------------------------
# Load Raw Data
# -----------------------------
datasets = {}
for f in files:
    path = os.path.join(RAW_DATA_PATH, f)
    df = pd.read_csv(path)
    datasets[f] = df
    log_step(f"Loaded {f}, shape: {df.shape}")

# -----------------------------
# Generic Cleaning Function
# -----------------------------
missing_placeholders = ["Unknown", "None", "NONE", "None/Unknown", "", "U", "NaN"]

def clean_dataframe(df, df_name):
    n_rows = len(df)

    # Remove duplicates
    dup_count = df.duplicated().sum()
    if dup_count > 0:
        df.drop_duplicates(inplace=True)
        dup_per = (dup_count / n_rows) * 100
        log_step(f"Removed {dup_count} duplicate rows ({dup_per:.2f}%) from {df_name}.")
    else:
        log_step(f"No duplicates found in {df_name}.")

    # Standardize string columns
    for col in df.select_dtypes(include='object').columns:
        df[col] = df[col].str.strip()

    # Identify missing or unknown values and fill for all columns
    cols_with_missing = {}
    for col in df.columns:
        series_as_str = df[col].astype(str).str.strip()
        count_na = df[col].isna().sum()
        count_missing = series_as_str.isin(missing_placeholders).sum()
        total_missing = count_na + count_missing
        if total_missing > 0:
            missing_per = (total_missing / n_rows) * 100
            cols_with_missing[col] = {'missing': total_missing, 'perc': missing_per}
            # Fill missing/unknown with 'Unknown' if object type
            if df[col].dtype == 'object':
                df[col] = df[col].replace(missing_placeholders, 'Unknown')

    if cols_with_missing:
        for col, counts in cols_with_missing.items():
            log_step(f"{df_name}: {col} -> Missing={counts['missing']}, Missing Per={counts['perc']}")

    return df

# -----------------------------
# Apply Cleaning
# -----------------------------
for fname, df in datasets.items():
    datasets[fname] = clean_dataframe(df, fname)

# -----------------------------
# Filter invalid transactions (non-positive quantity or sales)
# -----------------------------
if "transaction_data.csv" in datasets:
    transactions = datasets["transaction_data.csv"]
    initial_count = len(transactions)
    transactions = transactions[(transactions['QUANTITY'] > 0) & (transactions['SALES_VALUE'] > 0)]
    log_step(f"Filtered {initial_count - len(transactions)} invalid transactions with non-positive quantity or sales value.")
    datasets["transaction_data.csv"] = transactions

# -----------------------------
# Save Cleaned Datasets
# -----------------------------
for fname, df in datasets.items():
    cleaned_file = os.path.join(CLEANED_DATA_PATH, f"{fname.replace('.csv','')}_cleaned.csv")
    df.to_csv(cleaned_file, index=False)
    log_step(f"Saved cleaned dataset: {cleaned_file}")
