In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Data Processing Pipeline for Village Cinemas F&B Analytics.

This script consolidates, cleans, and transforms raw transaction and session
data into analysis-ready feature matrices. It serves as the primary data
engineering step for both the training and test datasets.

The pipeline executes the following main stages:
1.  **Inventory Transactions Cleanup:**
    -   Loads and combines all raw `Inventory Transaction Data*.xlsx` files.
    -   Normalizes headers, filters for relevant F&B classes, and parses timestamps.
    -   Imputes missing prices using a hierarchical median approach.
    -   Outputs a consolidated `inventory_transactions_clean.xlsx`.

2.  **One-Hot Encoding (OHE) of Transactions:**
    -   Takes the cleaned transaction data.
    -   Creates wide-format matrices with one-hot encoded item classes and product names,
        aggregated by the transaction timestamp.
    -   Outputs intermediate files like `ohe_trx_item_class_product.xlsx`.

3.  **Movie Sessions Cleanup:**
    -   Loads and cleans raw `Movie_sessions*.xlsx` files.
    -   Applies business rules, such as filtering by genre and session time.
    -   Engineers features like duration categories and time-of-day slots.
    -   Outputs a consolidated `movie_sessions_clean.xlsx`.

4.  **Hourly Session Expansion:**
    -   Expands session data to represent every hour a session is active, including
      pre-session buffer times for early arrivals.
    -   This creates a richer, hour-level view of cinema activity.
    -   Outputs `ohe_movie_sessions_hourly_expanded.xlsx`.

5.  **Final Merge:**
    -   Combines the OHE transaction data with the expanded hourly session data using a
      left join on the timestamp.
    -   This produces the final `train_dataset.xlsx` or `test_dataset.xlsx`, which is
      the primary input for the downstream recommendation models.

To run, place raw data files in the `input/` subdirectory and execute this script.
All outputs will be saved to the `output/` subdirectory.
"""
from __future__ import annotations
from datetime import timedelta
from pathlib import Path
import re
from typing import Final, Dict, Tuple, List
import warnings

import pandas as pd

# =============================================================================
# --- 1. CONFIGURATION AND CONSTANTS ---
# =============================================================================

# --- Path Configuration ---
BASE_DIR: Final[Path] = Path(".")
# Assuming raw files are in an 'input' folder and outputs go to an 'output' folder
INPUT_DIR: Final[Path] = BASE_DIR / "input"
OUTPUT_DIR: Final[Path] = BASE_DIR / "output"

# --- Regex Patterns for File Discovery ---
WORKBOOK_RX_INV: Final[re.Pattern[str]] = re.compile(
    r"Inventory Transaction Data "
    r"(?:"
    r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{4}|\d{4}"
    r")"
    r"(?:\s+v\d+(?:\.\d+)?)?"
    r"\.xlsx$",
    re.I
)
SOURCE_RX_SESS: Final[re.Pattern[str]] = re.compile(
    r"Movie_sessions"
    r"(?:_(?:(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s?\d{4}))?"
    r"(?:\s+v\d+(?:\.\d+)?)?"
    r"\.xlsx$",
    re.I
)

# --- Business Logic Constants ---
ALLOWED_CLASSES: Final[set[str]] = {
    "SNACK - CHIPS", "FOOD - VJUNIOR", "ICE CREAMS - OTHER", "ICE CREAMS - CHOC TO",
    "DRINKS - EXTRA LARGE", "DRINKS - LARGE", "DRINKS - MEDIUM", "DRINKS - SMALL",
    "DRINKS - NO ICE", "DRINKS", "POPCORN",
}
EXCLUDE_GENRES: Final[set[str]] = {"GAMING", "TO BE ADVISED"}

SLOT_WINDOWS: Final[Dict[str, Tuple[int, int]]] = {
    "morning": (9*60, 11*60), "early_noon": (11*60, 13*60), "noon": (13*60, 15*60),
    "late_noon": (15*60, 17*60), "evening_1": (17*60, 17*60 + 30), "evening_2": (17*60 + 30, 18*60),
    "evening_3": (18*60, 18*60 + 15), "evening_4": (18*60 + 15, 18*60 + 30),
    "evening_5": (18*60 + 30, 18*60 + 45), "evening_6": (18*60 + 45, 19*60),
    "night_1": (19*60, 19*60 + 15), "night_2": (19*60 + 15, 19*60 + 30),
    "night_3": (19*60 + 30, 20*60), "night_4": (20*60, 20*60 + 30),
    "night_5": (20*60 + 30, 21*60), "night_6": (21*60, 22*60),
}
SHORT_MAX_SESS: Final[int] = 120
MEDIUM_MAX_SESS: Final[int] = 160

PRE_BUFFER_HRS: Final[int] = 1
POST_BUFFER_HRS: Final[int] = 0

# CORRECTED: Added this constant from the notebook, which caused the NameError
CAT_COLS_SESS: Final[List[str]] = [
    "language", "genre", "rating", "slot", "duration_category"
]

# =============================================================================
# --- 2. HELPER SUB-FUNCTIONS ---
# =============================================================================

def _find_header_row(xl_path: Path, sheet_name: str, start_col_name: str) -> int:
    """Locate the header row that contains a specific starting column name."""
    raw = pd.read_excel(xl_path, sheet_name=sheet_name, header=None, dtype=str)
    for idx, row in raw.iterrows():
        if any(str(cell).lower().startswith(start_col_name) for cell in row):
            return idx
    raise ValueError(f"{xl_path.name}: header row with '{start_col_name}' not found")

def _remove_no_flag_inv(text: str) -> str:
    return re.compile(r"\bNO\s+(ICE|SUGAR)\b", re.I).sub("", str(text)).replace("  ", " ").strip()

def _slot_from_timestamp_sess(ts: pd.Timestamp) -> str:
    minutes = ts.hour * 60 + ts.minute
    for slot, (start, end) in SLOT_WINDOWS.items():
        if start <= minutes < end: return slot
    return "out_of_range"

def _duration_bucket_sess(minutes: int) -> str:
    if minutes <= SHORT_MAX_SESS: return "short"
    if minutes <= MEDIUM_MAX_SESS: return "medium"
    return "long"

def _parse_duration_sess(duration_text: str) -> int:
    match = re.search(r"(\d+)", str(duration_text))
    return int(match.group(1)) if match else 0

def _build_ohe_matrix(df: pd.DataFrame, cat_cols: list[str], *, keep_prefix: bool = False) -> pd.DataFrame:
    if "quantity" not in df.columns: raise KeyError("Missing 'quantity' column.")
    dummy_kwargs = {} if keep_prefix else dict(prefix="", prefix_sep="")
    dummies = pd.get_dummies(df[cat_cols], columns=cat_cols, dtype="uint32", **dummy_kwargs)
    weighted = dummies.mul(df["quantity"].values, axis=0)
    return weighted.groupby(df["timestamp"]).sum().astype("uint32")

# =============================================================================
# --- 3. CORE DATA PROCESSING FUNCTIONS ---
# =============================================================================

def _load_and_clean_file_inv(xl_path: Path) -> pd.DataFrame:
    """Return a cleaned DataFrame for one source inventory workbook."""
    print(f"→ Cleaning inventory file: {xl_path.name}")
    header_row = _find_header_row(xl_path, "Inventory Trans", "transaction date")
    df_raw = pd.read_excel(xl_path, sheet_name="Inventory Trans", header=header_row, dtype=str)
    df = df_raw[~df_raw["Transaction Date"].str.contains("result", case=False, na=False)].copy()

    rename_map = {}
    for col in df.columns:
        key = str(col).lower()
        if "no of items" in key or key.strip() == "ea": rename_map[col] = "quantity"
        elif "sell price" in key or key.strip() == "aud": rename_map[col] = "price_aud"
    df.rename(columns=rename_map, inplace=True)
    df.drop(columns=[c for c in df.columns if str(c).startswith('Unnamed')], errors="ignore", inplace=True)

    df["Transaction Date"] = pd.to_datetime(df["Transaction Date"], dayfirst=True, errors="coerce")
    df["Transaction Hour"] = pd.to_numeric(df["Transaction Hour"], errors="coerce")
    df.dropna(subset=["Transaction Date", "Transaction Hour"], inplace=True)
    df["Transaction Hour"] = df["Transaction Hour"].astype(int)

    for col in df.select_dtypes(include="object").columns:
        if col in df.columns: df[col] = df[col].str.strip()
    df["Item Class"] = df["Item Class"].str.replace(r"\s+", " ", regex=True)
    df = df[df["Item Class"].str.upper().isin(ALLOWED_CLASSES)].copy()

    df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce").fillna(1).clip(lower=1).astype(int)
    df["price_aud"] = pd.to_numeric(df["price_aud"], errors="coerce")

    df["timestamp"] = df["Transaction Date"].dt.normalize() + pd.to_timedelta(df["Transaction Hour"], unit="h")

    # --- Price imputation hierarchy ---
    df["unit_price"] = df["price_aud"] / df["quantity"]
    
    # 1) Exact product match map
    unit_price_map = df[df["unit_price"] > 0].drop_duplicates("VISTA Item").set_index("VISTA Item")["unit_price"]
    
    missing_mask = df["unit_price"].isna() | (df["unit_price"] <= 0)
    if missing_mask.any(): df.loc[missing_mask, "unit_price"] = df.loc[missing_mask, "VISTA Item"].map(unit_price_map)

    # 2) "NO ICE / NO SUGAR" proxy
    missing_mask = df["unit_price"].isna() | (df["unit_price"] <= 0)
    if missing_mask.any(): df.loc[missing_mask, "unit_price"] = df.loc[missing_mask, "VISTA Item"].apply(_remove_no_flag_inv).map({k: v for k, v in unit_price_map.items()})

    # 3) CORRECTED: Added missing imputation step for SNACK - CHIPS
    missing_mask = df["unit_price"].isna() | (df["unit_price"] <= 0)
    chips_mask = missing_mask & (df["Item Class"] == "SNACK - CHIPS")
    if chips_mask.any():
        chips_median = df.loc[df["Item Class"] == "SNACK - CHIPS", "unit_price"].median()
        df.loc[chips_mask, "unit_price"] = chips_median

    # 4) Item-class median
    missing_mask = df["unit_price"].isna() | (df["unit_price"] <= 0)
    if missing_mask.any():
        class_medians = df[df["unit_price"] > 0].groupby("Item Class")["unit_price"].median()
        df.loc[missing_mask, "unit_price"] = df.loc[missing_mask, "Item Class"].map(class_medians)

    # 5) Global median
    missing_mask = df["unit_price"].isna() | (df["unit_price"] <= 0)
    if missing_mask.any(): df.loc[missing_mask, "unit_price"] = df["unit_price"].median(skipna=True)

    df["price_aud"] = (df["unit_price"] * df["quantity"]).round(2)

    return df.rename(columns={"Item Class": "item_class", "VISTA Item": "product_name"})[
        ["timestamp", "item_class", "product_name", "quantity", "price_aud"]
    ].reset_index(drop=True)

def _clean_one_workbook_sess(xl_path: Path) -> pd.DataFrame:
    """Load, clean, and return a standardized DataFrame for one session workbook."""
    print(f"→ Cleaning session file: {xl_path.name}")
    header_row = _find_header_row(xl_path, "Sheet1", "session date")
    df = pd.read_excel(xl_path, sheet_name="Sheet1", header=header_row, dtype=str)
    df = df.drop(columns={"Film"}, errors="ignore")
    df["Session Date"] = pd.to_datetime(df["Session Date"], dayfirst=True, errors="coerce")
    df["Session Hour"] = pd.to_numeric(df["Session Hour"], errors="coerce")
    df = df.dropna(subset=["Session Date", "Session Hour", "Genre", "Total Admits", "Duration"])
    df["Session Hour"] = df["Session Hour"].astype(int)
    df = df[~df["Genre"].str.upper().str.strip().isin(EXCLUDE_GENRES)]
    df["timestamp"] = df["Session Date"].dt.normalize() + pd.to_timedelta(df["Session Hour"], unit="h")
    df["duration_min"] = df["Duration"].apply(_parse_duration_sess)
    df = df[df["duration_min"] != 960]
    df["duration_category"] = df["duration_min"].apply(_duration_bucket_sess)
    df["slot"] = df["timestamp"].apply(_slot_from_timestamp_sess)
    df = df[df["slot"] != "out_of_range"]
    df = df.rename(columns={"Session Audio Language": "language", "Genre": "genre", "Censor Rating": "rating", "Total Admits": "admits"})
    df["admits"] = pd.to_numeric(df["admits"], errors='coerce').fillna(0).astype(int)
    return df[["timestamp", "language", "genre", "rating", "admits", "duration_min", "duration_category", "slot"]].sort_values("timestamp").reset_index(drop=True)


# =============================================================================
# --- 4. PIPELINE STAGE FUNCTIONS ---
# =============================================================================

def clean_inventory_transactions(input_dir: Path, output_dir: Path) -> Path | None:
    """Consolidates and cleans raw inventory transaction Excel files."""
    print("--- Stage 1: Cleaning Inventory Transactions ---")
    source_files = sorted(p for p in input_dir.iterdir() if WORKBOOK_RX_INV.fullmatch(p.name))
    if not source_files:
        print(f"📭 No source inventory workbooks found in '{input_dir}/'. Skipping.")
        return None

    cleaned_frames = [_load_and_clean_file_inv(p) for p in source_files]
    master = pd.concat(cleaned_frames, ignore_index=True).sort_values("timestamp").reset_index(drop=True)
    print(f"\n✅ Cleaned inventory rows total: {len(master):,}")

    master_output_path = output_dir / "inventory_transactions_clean.xlsx"
    master.to_excel(master_output_path, index=False)
    print(f"  • {master_output_path.name} written")

    for year, group in master.groupby(master["timestamp"].dt.year, sort=True):
        fname = f"inventory_transactions_clean_{year}.xlsx"
        year_output_path = output_dir / fname
        group.to_excel(year_output_path, index=False)
        print(f"  • {year_output_path.name}  ({len(group):,} rows)")

    return master_output_path


def one_hot_encode_transactions(input_path: Path, output_dir: Path) -> None:
    """Builds one-hot encoded (OHE) matrices from cleaned transaction data."""
    print("\n--- Stage 2: One-Hot Encoding Transactions ---")
    if not input_path or not input_path.exists():
        print(f"⚠️ Source file for OHE not found at '{input_path}'. Skipping OHE.")
        return

    transactions = pd.read_excel(input_path, parse_dates=["timestamp"])
    transactions["item_class"] = transactions["item_class"].str.upper().str.strip()
    transactions["product_name"] = transactions["product_name"].str.upper().str.strip()

    hourly_revenue = transactions.groupby("timestamp")["price_aud"].sum().round(2).rename("total_price_aud")

    ohe_combined = _build_ohe_matrix(transactions, ["item_class", "product_name"], keep_prefix=True)
    ohe_combined.insert(0, "total_price_aud", hourly_revenue)

    # We only need the combined file for the final merge
    path = output_dir / "ohe_trx_item_class_product.xlsx"
    ohe_combined.to_excel(path)
    print(f"  • {path.name} written")


def clean_movie_sessions(input_dir: Path, output_dir: Path) -> Path | None:
    """Consolidates and cleans raw movie session Excel files."""
    print("\n--- Stage 3: Cleaning Movie Sessions ---")
    workbooks = sorted(p for p in input_dir.iterdir() if SOURCE_RX_SESS.fullmatch(p.name))
    if not workbooks:
        print(f"📭 No source movie session workbooks found in '{input_dir}/'. Skipping.")
        return None

    frames = [_clean_one_workbook_sess(p) for p in workbooks]
    combined = pd.concat(frames, ignore_index=True).sort_values("timestamp")
    print(f"\n✅ Total cleaned sessions: {len(combined):,}")

    combined_output_path = output_dir / "movie_sessions_clean.xlsx"
    combined.to_excel(combined_output_path, index=False)
    print(f"• {combined_output_path.name} written")

    for year, grp in combined.groupby(combined["timestamp"].dt.year, sort=True):
        fname = f"movie_sessions_clean_{year}.xlsx"
        year_output_path = output_dir / fname
        grp.to_excel(year_output_path, index=False)
        print(f"• {year_output_path.name} ({len(grp):,} rows)")

    return combined_output_path


def expand_and_ohe_sessions(input_path: Path, output_dir: Path) -> Path | None:
    """Expands session data to an hourly level and one-hot encodes categorical features."""
    print("\n--- Stage 4: Expanding & OHE Sessions ---")
    if not input_path or not input_path.exists():
        print(f"⚠️ Source file for session expansion not found at '{input_path}'. Skipping.")
        return None

    sessions = pd.read_excel(input_path, parse_dates=["timestamp"])
    sessions["end_time"] = sessions["timestamp"] + pd.to_timedelta(sessions["duration_min"], unit="m")

    expanded_rows = []
    for _, sess in sessions.iterrows():
        site_start = sess["timestamp"] - timedelta(hours=PRE_BUFFER_HRS)
        site_end = sess["end_time"] + timedelta(hours=POST_BUFFER_HRS)
        for hr in pd.date_range(site_start, site_end, freq="h", inclusive="left"):
            row = sess.copy()
            row["timestamp"] = hr
            # CORRECTED: Replicated notebook logic for in_site and in_show flags
            row["in_site"] = int(site_start <= hr < site_end)
            row["in_show"] = int(sess["timestamp"] <= hr < sess["end_time"])
            expanded_rows.append(row)

    expanded = pd.DataFrame(expanded_rows).drop(columns=["end_time"])

    # CORRECTED: Used the defined CAT_COLS_SESS to fix the NameError
    ohe = pd.get_dummies(expanded, columns=CAT_COLS_SESS, prefix=CAT_COLS_SESS, prefix_sep="_", dtype="uint8")
    cat_dummies = [c for c in ohe.columns if any(c.startswith(f"{cat}_") for cat in CAT_COLS_SESS)]
    ohe[cat_dummies] = ohe[cat_dummies].mul(ohe["in_show"], axis=0)

    # CORRECTED: Replicated the exact admits calculation from the notebook
    ohe["admits_in_site"] = ohe["admits"] * ohe["in_site"]
    ohe["admits_in_show"] = ohe["admits"] * ohe["in_show"]
    ohe["dur_x_admits"] = ohe["duration_min"] * ohe["admits"] * ohe["in_show"]

    agg_map = {c: "sum" for c in cat_dummies}
    agg_map.update({
        "admits_in_site": "sum",
        "admits_in_show": "sum",
        "dur_x_admits": "sum",
    })

    hourly = ohe.groupby("timestamp", as_index=False).agg(agg_map)
    hourly["total_admits"] = hourly["admits_in_site"]
    hourly["avg_duration_min"] = hourly["dur_x_admits"].div(hourly["admits_in_show"]).round(1).fillna(0)

    hourly = hourly.drop(columns=["admits_in_site", "admits_in_show", "dur_x_admits", "in_site", "in_show"], errors="ignore")

    front = ["timestamp", "total_admits", "avg_duration_min"]
    hourly = hourly[front + [c for c in hourly.columns if c not in front]]

    output_path = output_dir / "ohe_movie_sessions_hourly_expanded.xlsx"
    hourly.to_excel(output_path, index=False)
    print(f"✓ Saved hourly-expanded sessions → {output_path.name}  ({len(hourly)} rows)")
    return output_path


def merge_final_dataset(trx_ohe_path: Path, sess_ohe_path: Path, output_dir: Path, output_filename: str) -> None:
    """Merges OHE transaction and session data to create the final analysis dataset."""
    print("\n--- Stage 5: Merging to Final Dataset ---")
    if not (trx_ohe_path and trx_ohe_path.exists()):
        print(f"⚠️ Transaction OHE file not found in '{output_dir}/'. Skipping merge.")
        return
    if not (sess_ohe_path and sess_ohe_path.exists()):
        print(f"⚠️ Session OHE file not found in '{output_dir}/'. Skipping merge.")
        return

    trx_df = pd.read_excel(trx_ohe_path, index_col=0, parse_dates=[0]).reset_index().rename(columns={"index": "timestamp"})
    sess_df = pd.read_excel(sess_ohe_path, parse_dates=["timestamp"])

    if trx_df["timestamp"].duplicated().any(): raise ValueError("Transactions file has duplicate timestamps.")
    if sess_df["timestamp"].duplicated().any(): raise ValueError("Sessions file has duplicate timestamps.")

    merged = pd.merge(trx_df, sess_df, on="timestamp", how="left", validate="one_to_one")
    print(f"✓ Merged table size: {len(merged):,} rows × {merged.shape[1]} columns")

    lead = ["timestamp", "total_admits", "total_price_aud", "avg_duration_min"]
    lead_existing = [c for c in lead if c in merged.columns]
    merged = merged[lead_existing + [c for c in merged.columns if c not in lead_existing]]

    final_output_path = output_dir / output_filename
    merged.to_excel(final_output_path, index=False)
    print(f"• Saved final dataset to: {final_output_path}")


# =============================================================================
# --- 5. MAIN PIPELINE WORKFLOW ---
# =============================================================================

def run_pipeline(output_filename: str):
    """
    Executes the full data processing pipeline from raw files to the final dataset.

    Args:
        output_filename: The name of the final merged dataset file
                         (e.g., "train_dataset.xlsx" or "test_dataset.xlsx").
    """
    print(f"\n--- Starting Data Cleanup Pipeline for '{output_filename}' ---\n")

    # Ensure input/output dirs exist
    INPUT_DIR.mkdir(exist_ok=True)
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    # Stage 1
    cleaned_inventory_path = clean_inventory_transactions(INPUT_DIR, OUTPUT_DIR)

    # Stage 2
    one_hot_encode_transactions(cleaned_inventory_path, OUTPUT_DIR)

    # Stage 3
    cleaned_sessions_path = clean_movie_sessions(INPUT_DIR, OUTPUT_DIR)

    # Stage 4
    expanded_sessions_path = expand_and_ohe_sessions(cleaned_sessions_path, OUTPUT_DIR)

    # Stage 5
    trx_ohe_final_path = OUTPUT_DIR / "ohe_trx_item_class_product.xlsx"
    merge_final_dataset(trx_ohe_final_path, expanded_sessions_path, OUTPUT_DIR, output_filename)

    print(f"\n🎉 Pipeline complete. Final file '{output_filename}' is in '{OUTPUT_DIR}/'.")

# =============================================================================
# --- SCRIPT ENTRY POINT ---
# =============================================================================

if __name__ == '__main__':
    # This script can be run for either training or test data by changing the output filename.
    # It automatically detects raw files in the 'input' folder and saves to 'output'.
    current_folder_name = BASE_DIR.resolve().name.lower()

    if "train" in current_folder_name:
        final_output_file = "train_dataset.xlsx"
    elif "test" in current_folder_name:
        final_output_file = "test_dataset.xlsx"
    else:
        final_output_file = "processed_dataset.xlsx"
        print(f"⚠️ Warning: Could not determine if this is for train/test based on folder name. Defaulting output to '{final_output_file}'.")

    run_pipeline(output_filename=final_output_file)