In [2]:
import pandas as pd
import os
from datetime import datetime
import string
import random
import itertools
import uuid
from typing import List, Dict, Optional

def generate_matrix_code() -> str:
    """Generate a matrix code starting with 'SCO' followed by 4 random uppercase letters."""
    return "SCO" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4))

def px_escape(text: str) -> str:
    """Escape quotes in strings for PX format."""
    if isinstance(text, str):
        return text.replace('"', '""')
    return str(text)

def define_dimensions() -> Dict[str, Dict[str, List[str]]]:
    """Define dimension values and codes based on provided documentation."""
    return {
        "DateCode": {
            "values": [
                "2009/2010", "2010/2011", "2011/2012", "2012/2013", "2013/2014",
                "2014/2015", "2015/2016", "2016/2017", "2017/2018", "2018/2019",
                "2019/2020", "20020/2021", "2021/2022", "2022/2023"
            ],
            "codes": [f"{i+1:02d}" for i in range(14)]
        },
        "Accident Status": {
            "values": ["Accidental", "All", "Not Accidental"],
            "codes": ["1", "2", "3"]
        },
        "Type of Fire": {
            "values": [
                "All", "Chimney Fire", "Dwelling Fire", "Other Building Fire",
                "Other Primary Fire", "Outdoor Fire", "Refuse Fire", "Vehicle Fire"
            ],
            "codes": [f"{i+1}" for i in range(8)]
        }
    }

def tidy_to_pxstat(
    input_file: str,
    output_file: Optional[str] = None,
    stub_cols: Optional[List[str]] = None,
    heading_cols: Optional[List[str]] = None,
    title: Optional[str] = None,
    subject_area: Optional[str] = None,
    matrix_code: Optional[str] = None,
    decimals: Optional[int] = 0,
    value_col: str = "Value",
    source: str = "Scottish Fire and Rescue Service",
    agg_method: str = "sum"
) -> str:
    """
    Convert Tidy format CSV to monolingual PxStat format with complete matrix.

    Parameters:
    -----------
    input_file : str
        Path to input CSV file.
    output_file : str, optional
        Path to output PX file (default: input_name + ".px").
    stub_cols : list, optional
        Columns to use as stub dimensions (rows, default: ["DateCode"]).
    heading_cols : list, optional
        Columns to use as heading dimensions (columns, default: ["FeatureCode", "Accident Status", "Type of Fire"]).
    title : str, optional
        Title for the PX dataset.
    subject_area : str, optional
        Subject area for the dataset.
    matrix_code : str, optional
        Matrix code for the dataset (default: generated as SCO+4 random letters).
    decimals : int, optional
        Number of decimals to use (default: 0 for count data).
    value_col : str, optional
        Column name containing the values (default: "Value").
    source : str, optional
        Data source (default: "Scottish Fire and Rescue Service").
    agg_method : str, optional
        Aggregation method for duplicates ("sum" or "mean", default: "sum").
    """
    print(f"Loading data from {input_file}...")
    try:
        # Load CSV with optimized dtypes
        df = pd.read_csv(input_file, low_memory=False, dtype_backend="numpy_nullable")
        
        # Validate required columns
        required_cols = [value_col] + (stub_cols or []) + (heading_cols or [])
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        
        # Default dimensions
        stub_cols = stub_cols or ["DateCode"]
        heading_cols = heading_cols or ["FeatureCode", "Accident Status", "Type of Fire"]
        group_cols = stub_cols + heading_cols
        
        # Convert categorical columns to 'category' dtype
        for col in group_cols:
            if col in df.columns:
                df[col] = df[col].astype(str).astype('category')
        
        # Simplify to needed columns
        df_simple = df[group_cols + [value_col]].copy()
        
        # Make values numeric
        df_simple[value_col] = pd.to_numeric(df_simple[value_col], errors="coerce")
        if df_simple[value_col].isna().any():
            print(f"Warning: {df_simple[value_col].isna().sum()} non-numeric values in {value_col} set to NaN.")
        
        # Aggregate duplicates
        df_agg = df_simple.groupby(group_cols, as_index=False, observed=True).agg({value_col: agg_method})
        
        # Define dimensions
        dimensions = define_dimensions()
        feature_codes = sorted(df["FeatureCode"].dropna().unique())
        dimensions["FeatureCode"] = {
            "values": feature_codes,
            "codes": [f"{i+1:03d}" for i in range(len(feature_codes))]
        }
        
        # Generate all possible combinations
        dim_values = [dimensions[col]["values"] for col in group_cols]
        all_combinations = list(itertools.product(*dim_values))
        expected_count = len(all_combinations)
        print(f"Expected data points: {expected_count}")
        
        # Create a mapping from data
        data_map = {
            tuple(row[col] for col in group_cols): row[value_col]
            for _, row in df_agg.iterrows()
        }
        
        # Generate data values
        data_values = []
        for combo in all_combinations:
            value = data_map.get(combo, pd.NA)
            data_values.append(".." if pd.isna(value) else str(int(value) if decimals == 0 else round(value, decimals)))
        
        # Verify data count
        actual_count = len(data_values)
        if actual_count != expected_count:
            raise ValueError(f"Data count mismatch: expected {expected_count}, got {actual_count}")
        
        # Metadata
        creation_date = datetime.now().strftime("%Y%m%d %H:%M")
        title = title or f"Fire Incidents from {os.path.basename(input_file)}"
        subject_area = subject_area or "Incidents"
        matrix_code = matrix_code or generate_matrix_code()
        units = px_escape(df["Units"].iloc[0] if "Units" in df.columns and not df["Units"].isna().all() else "Fires")
        
        header = f"""CHARSET="UTF-16";
AXIS-VERSION="2013";
CREATION-DATE="{creation_date}";
MATRIX="{matrix_code}";
DECIMALS={decimals};
SUBJECT-AREA="{px_escape(subject_area)}";
SUBJECT-CODE="{matrix_code[:4] if len(matrix_code) >= 4 else matrix_code}";
CONTENTS="{px_escape(title)}";
TITLE="{px_escape(title)} - by {', '.join(px_escape(col) for col in group_cols)}";
UNITS="{units}";
STUB="{','.join(f'"{px_escape(col)}"' for col in stub_cols)}";
HEADING="{','.join(f'"{px_escape(col)}"' for col in heading_cols)}";
SOURCE="{px_escape(source)}";
"""
        
        # VALUES and CODES blocks
        def px_values_and_codes(name: str, dim: Dict) -> str:
            quoted_vals = ",".join(f'"{px_escape(str(v))}"' for v in dim["values"])
            quoted_codes = ",".join(f'"{px_escape(str(c))}"' for c in dim["codes"])
            return f'VALUES("{name}")={quoted_vals};\nCODES("{name}")={quoted_codes};\n'
        
        meta_parts = "".join(px_values_and_codes(col, dimensions[col]) for col in group_cols)
        
        # Output filename
        output_file = output_file or os.path.splitext(input_file)[0] + ".px"
        
        # Write to file
        print(f"Writing {len(data_values)} data points to {output_file}...")
        with open(output_file, "w", encoding="utf-16") as f:
            f.write(header)
            f.write(meta_parts)
            f.write("DATA=\n")
            # Write data in chunks to manage memory
            chunk_size = 1000
            for i in range(0, len(data_values), chunk_size):
                f.write(" ".join(data_values[i:i+chunk_size]) + "\n")
            f.write(";")
        
        print(f"✅ PX file saved as: {output_file}")
        return output_file
    
    except Exception as e:
        print(f"❌ Error processing file: {str(e)}")
        raise

if __name__ == "__main__":
    # Configuration
    CONFIG = {
        "input_file": "Fire - Type of Incident.csv",
        "output_file": "Fire - Type of Incident.px",
        "value_col": "Value",
        "stub_cols": ["DateCode"],
        "heading_cols": ["FeatureCode", "Accident Status", "Type of Fire"],
        "title": "Fire - Type of Incident",
        "subject_area": "Incidents",
        "matrix_code": generate_matrix_code(),
        "decimals": 0,  # Count data, no decimals
        "source": "Scottish Fire and Rescue Service",
        "agg_method": "sum"
    }
    
    # Run conversion
    print("Running conversion with the following configuration:")
    for key, value in CONFIG.items():
        print(f"{key}: {value}")
    tidy_to_pxstat(**CONFIG)

Running conversion with the following configuration:
input_file: Fire - Type of Incident.csv
output_file: Fire - Type of Incident.px
value_col: Value
stub_cols: ['DateCode']
heading_cols: ['FeatureCode', 'Accident Status', 'Type of Fire']
title: Fire - Type of Incident
subject_area: Incidents
matrix_code: SCOJQYF
decimals: 0
source: Scottish Fire and Rescue Service
agg_method: sum
Loading data from Fire - Type of Incident.csv...
Expected data points: 130368
Writing 130368 data points to Fire - Type of Incident.px...
✅ PX file saved as: Fire - Type of Incident.px
