In [2]:
import pandas as pd
import numpy as np
import holidays
import datetime
import warnings
import re
import traceback
from pathlib import Path

# Suppress specific warnings
warnings.simplefilter(action='ignore', category=UserWarning)
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)


print("Script started...")

# %% ----- Configuration -----
# Use Path object relative to the script location
SCRIPT_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd()
DATA_DIR = SCRIPT_DIR # Assume data files are in the same dir as the script
# Example if data is in a 'Data' subdirectory:
# DATA_DIR = SCRIPT_DIR / "Data"
# if not DATA_DIR.exists():
#    raise FileNotFoundError(f"Could not find the data directory: {DATA_DIR.resolve()}")

# File Names (relative to DATA_DIR)
FILE_NAMES = {
    'transfers': '../Data/simulated_car_transfer.xlsx',
    'cpi': '../Data/CPI.xlsx',
    'sales': '../Data/Försäljning Personbil.csv',
    'email': '../Data/EMMAUtskick.csv',
    'marketing': '../Data/MMM cleaned 2503.csv',
    'fuel': '../Data/Bränslepriser.xlsx'
}

# Date Range
START_DATE = pd.to_datetime('2022-01-01')
END_DATE = pd.to_datetime('2025-03-24')

# Define valid regions (Swedish regions) - From your example
VALID_REGIONS = [
    'Stockholm', 'Göteborg och Bohuslän', 'Skåne', 'Uppsala', 'Östgöta', 'Södermanland',
    'Halland', 'Kalmar', 'Kronoberg', 'Blekinge', 'Gotland', 'Värmland', 'Dalarna',
    'Gävleborg', 'Västernorrland', 'Jämtland', 'Västerbotten', 'Norrbotten', 'Jönköping',
    'Älvsborg', 'Skaraborg', 'Bergslagen', 'Göinge'
]
VALID_REGIONS_SET = set(VALID_REGIONS) # Use set for faster lookups

# Define standard column names expected *after* cleaning in this script
# We map RAW names from files to these standard names used internally.
STD_COL_MAPPINGS = {
    # Standard Name : [List of possible raw names in input files]
    'Date': ['date', 'Date', 'DATE', 'datum', 'day'],
    'Region': ['region', 'Region', 'REGION', 'Bolag', 'bolag', 'län'],
    'Sales': ['avtal', 'Avtal', 'sales', 'försäljning'], # Target for 'Raw_Sales'
    'Cost': ['cost', 'Cost', 'spend', 'Spend', 'utgift'], # Target for 'Raw_Cost'
    'Channel': ['channel_grouping', 'Channel_Grouping', 'channel', 'kanal'], # Target for 'Raw_Channel'
    # *** NOTE: Raw marketing impressions map to this standard name INTERNALLY before pivoting ***
    'Marketing_Impressions': ['impressions', 'Impressions', 'visningar'],
    'Ownership_Transfers': ['transfers', 'ownership_transfers', 'överföringar'], # Target for 'Raw_Transfers'
    'Gas_Price': ['bensin', 'Bensin', 'fuel_price', 'Fuel_Price', 'bränslepris'], # Target for 'Raw_Fuel_Price'
    'CPI': ['cpi', 'CPI', 'kpi'], # Target for 'Raw_CPI'
    'Email_Impressions': ['email_opens', 'opens', 'eimpressions', 'eImpressions'], # Target for 'Raw_Email_Impressions'
}

# Ramadan Dates
RAMADAN_PERIODS = [
    ('2022-04-02', '2022-05-01'),
    ('2023-03-22', '2023-04-20'),
    ('2024-03-10', '2024-04-09'),
    ('2025-02-28', '2025-03-29')
]

print(f"Configuration Loaded. Data Dir: {DATA_DIR.resolve()}")

# %% ----- Utility Functions (Adapted from your example) -----

def clean_region(region):
    """Standardize region names to match VALID_REGIONS list"""
    if pd.isna(region): return 'Övrigt'
    region_str = str(region).strip()
    region_lower = region_str.lower()
    if region_lower in ['nationell', 'nationwide', 'all', 'övrig', 'ovrigt', 'unknown', 'okänt', 'kluster', 'sak', 'is_national_row']: return 'Övrigt'
    if region_lower.endswith(' län') or region_lower.endswith(' lan'): region_lower = region_lower[:-4].strip()
    if '-' in region_lower and 'göinge' in region_lower: region_lower = 'göinge'
    special_cases = {
        'alvsborg': 'Älvsborg', 'älvsborg': 'Älvsborg', 'bergslagen': 'Bergslagen', 'västmanlands': 'Bergslagen',
        'örebro': 'Bergslagen', 'blekinge': 'Blekinge', 'dalarna': 'Dalarna', 'dalarnas': 'Dalarna', 'gavleborg': 'Gävleborg',
        'gävleborg': 'Gävleborg', 'gävleborgs': 'Gävleborg', 'goinge': 'Göinge', 'göinge': 'Göinge', 'goinge-kristianstad': 'Göinge',
        'göinge-kristianstad': 'Göinge', 'goteborg': 'Göteborg och Bohuslän', 'göteborg': 'Göteborg och Bohuslän',
        'goteborg och bohus': 'Göteborg och Bohuslän', 'göteborg och bohus': 'Göteborg och Bohuslän', 'göteborg & bohus': 'Göteborg och Bohuslän',
        'goteborg-och-bohuslan': 'Göteborg och Bohuslän', 'göteborg och bohuslän': 'Göteborg och Bohuslän', 'västra götalands': 'Göteborg och Bohuslän',
        'gotland': 'Gotland', 'gotlands': 'Gotland', 'halland': 'Halland', 'hallands': 'Halland', 'jamtland': 'Jämtland',
        'jämtland': 'Jämtland', 'jämtlands': 'Jämtland', 'jonkoping': 'Jönköping', 'jönköping': 'Jönköping', 'jönköpings': 'Jönköping',
        'kalmar': 'Kalmar', 'kronoberg': 'Kronoberg', 'kronobergs': 'Kronoberg', 'norrbotten': 'Norrbotten', 'norrbottens': 'Norrbotten',
        'ostgota': 'Östgöta', 'östgöta': 'Östgöta', 'skaraborg': 'Skaraborg', 'skane': 'Skåne', 'skåne': 'Skåne',
        'sodermanland': 'Södermanland', 'södermanland': 'Södermanland', 'södermanlands': 'Södermanland', 'sörmland': 'Södermanland',
        'stockholm': 'Stockholm', 'stockholms': 'Stockholm', 'uppsala': 'Uppsala', 'varmland': 'Värmland', 'värmland': 'Värmland',
        'värmlands': 'Värmland', 'vasterbotten': 'Västerbotten', 'västerbotten': 'Västerbotten', 'västerbottens': 'Västerbotten',
        'vasternorrland': 'Västernorrland', 'västernorrland': 'Västernorrland', 'västernorrlands': 'Västernorrland',
    }
    if region_lower in special_cases:
        cleaned_region = special_cases[region_lower]
        return cleaned_region if cleaned_region in VALID_REGIONS_SET else 'Övrigt'
    for valid_region in VALID_REGIONS:
        if region_lower == valid_region.lower(): return valid_region
    return 'Övrigt'

def safe_to_numeric(series, fill_value=0.0):
    """Convert series to numeric, handling comma decimals and filling errors."""
    if pd.api.types.is_numeric_dtype(series): return series.fillna(fill_value)
    numeric_series = pd.to_numeric(series.astype(str).str.replace(',', '.', regex=False), errors='coerce')
    return numeric_series.fillna(fill_value)

def standardize_columns(df, column_mapping):
    """Standardize column names based on a mapping dictionary."""
    reverse_mapping = {name.lower().strip(): std_name for std_name, names in column_mapping.items() for name in names}
    rename_dict = {}
    for col in df.columns:
        col_lower_stripped = str(col).lower().strip()
        if col_lower_stripped in reverse_mapping:
            rename_dict[col] = reverse_mapping[col_lower_stripped]
    return df.rename(columns=rename_dict)

def get_swedish_holidays(start_year, end_year):
    """Get Swedish holidays for specified years, excluding weekends"""
    try:
        valid_years = [y for y in range(start_year, end_year + 1)]
        if not valid_years:
            return pd.DatetimeIndex([])
        
        print(f"Fetching Swedish holidays for years: {valid_years}")
        
        # Create holidays instance with specific filters
        se_holidays = holidays.SE(years=valid_years)
        
        # Filter to only include official holidays (not weekends)
        holiday_dates = []
        for date, name in se_holidays.items():
            # Skip if it's just a weekend without a specific holiday name
            if name and not name.startswith('Söndag') and not name.startswith('Lördag'):
                holiday_dates.append(date)
        
        holiday_dates = pd.DatetimeIndex(sorted(holiday_dates))
        print(f"Found {len(holiday_dates)} official holiday dates.")
        return holiday_dates
    except Exception as e:
        print(f"Warning: Could not fetch Swedish holidays: {e}. Returning empty list.")
        return pd.DatetimeIndex([])


# --- Primary Data Processing Function ---
def load_and_process_file(file_key, file_info, base_dates_df):
    """Loads, cleans, processes, and aggregates/pivots a single data file."""
    file_path = DATA_DIR / file_info['name']
    source_name = file_key.capitalize()
    print(f"\n--- Processing {source_name} ({file_info['name']}) ---")
    try:
        # Load Data
        if file_info['type'] == 'excel': df = pd.read_excel(file_path)
        elif file_info['type'] == 'csv': df = pd.read_csv(file_path, encoding=file_info['encoding'], sep=file_info['sep'])
        else: print(f"ERROR ({source_name}): Unknown file type."); return None
        print(f"Info ({source_name}): Loaded {len(df)} rows.")
        if df.empty: return None

        # Standardize Columns
        df = standardize_columns(df, STD_COL_MAPPINGS)
        if 'Date' not in df.columns:
             print(f"ERROR ({source_name}): 'Date' column not found after standardization."); return None

        # Clean Date
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
        df = df.dropna(subset=['Date'])
        df['Date'] = df['Date'].dt.normalize()
        df = df[(df['Date'] >= START_DATE) & (df['Date'] <= END_DATE)]
        if df.empty: print(f"Warning ({source_name}): No data within date range."); return None
        print(f"Info ({source_name}): {len(df)} rows remain after date cleaning/filtering.")

        # Clean Region (if applicable)
        is_regional = 'Region' in df.columns and file_info.get('is_regional', False)
        if is_regional:
            print(f"Info ({source_name}): Cleaning regions...")
            df['Region'] = df['Region'].apply(clean_region)
            df = df[df['Region'].isin(VALID_REGIONS_SET)]
            print(f"Info ({source_name}): {len(df)} rows remain after region cleaning/filtering.")
            if df.empty: print(f"Warning ({source_name}): No data remains after filtering for valid regions."); return None

        # Clean Metrics
        for metric_col in file_info.get('metrics', []):
            if metric_col in df.columns:
                # Use fill_value=0 for metrics that will be summed/pivoted
                df[metric_col] = safe_to_numeric(df[metric_col], fill_value=0.0)
            else: print(f"Warning ({source_name}): Expected metric column '{metric_col}' not found.")

        # Aggregation / Pivoting Logic
        agg_cols = file_info.get('metrics', [])
        group_by_cols = ['Date'] + file_info.get('grouping_cols', [])

        # Handle National Indicators (CPI, Fuel) separately - no sum aggregation needed
        if file_key in ['cpi', 'fuel']:
            metric = 'CPI' if file_key == 'cpi' else 'Gas_Price'
            if metric in df.columns:
                processed_df = df[['Date', metric]].dropna().drop_duplicates('Date', keep='last').sort_values('Date')
                print(f"Info ({source_name}): Processed as national indicator. {len(processed_df)} unique date entries.")
                return processed_df
            else:
                print(f"Warning ({source_name}): Expected indicator column '{metric}' not found.")
                return None

        # --- Aggregate regional/channel data to national daily ---
        agg_cols_present = [col for col in agg_cols if col in df.columns]
        if not agg_cols_present:
             print(f"ERROR ({source_name}): None of the expected metric columns {agg_cols} found for aggregation.")
             return None

        print(f"Info ({source_name}): Aggregating {agg_cols_present} by {group_by_cols}...")
        grouped_df = df.groupby(group_by_cols, as_index=False)[agg_cols_present].sum()

        # --- Pivot Marketing Data ---
        if file_key == 'marketing':
            print(f"Info ({source_name}): Pivoting marketing data...")
            if 'Channel' not in grouped_df.columns:
                 print(f"ERROR ({source_name}): 'Channel' column needed for pivoting not found."); return None
            if not all(m in grouped_df.columns for m in ['Marketing_Impressions', 'Cost']):
                 print(f"ERROR ({source_name}): 'Marketing_Impressions' or 'Cost' column missing before pivot."); return None

            grouped_df['Channel'] = grouped_df['Channel'].fillna('Unknown').astype(str)

            try:
                # Pivot using the INTERNALLY standardized metric names
                pivot_df = grouped_df.pivot_table(
                    index='Date',
                    columns='Channel',
                    values=['Marketing_Impressions', 'Cost'], # Pivot these two
                    fill_value=0
                )
                # Flatten MultiIndex columns and RENAME metrics as requested
                # e.g., ('Marketing_Impressions', 'SEM') -> 'SEM_Impressions'
                # e.g., ('Cost', 'Display') -> 'Display_Cost'
                new_cols = []
                for metric, channel in pivot_df.columns:
                    if metric == 'Marketing_Impressions':
                        new_cols.append(f'{channel}_Impressions')
                    elif metric == 'Cost':
                        new_cols.append(f'{channel}_Cost')
                    else: # Fallback, should not happen with values specified
                        new_cols.append(f'{channel}_{metric}')
                pivot_df.columns = new_cols

                processed_df = pivot_df.reset_index()
                print(f"Info ({source_name}): Pivoting successful. Columns: {processed_df.columns.tolist()}")
            except Exception as pivot_error:
                 print(f"ERROR ({source_name}): Failed to pivot marketing data: {pivot_error}"); traceback.print_exc(); return None
        else:
            # For non-marketing files, the grouped_df is the final daily national aggregate
            processed_df = grouped_df

        print(f"Info ({source_name}): Processing complete. Result has {len(processed_df)} rows.")
        return processed_df

    except FileNotFoundError: print(f"ERROR ({source_name}): File not found at {file_path}"); return None
    except Exception as e: print(f"ERROR ({source_name}): Unexpected error: {e}"); traceback.print_exc(); return None


# %% ----- Load and Process All Files -----
FILES_INFO = {
    'transfers': {'name': FILE_NAMES['transfers'], 'type': 'excel', 'is_regional': True, 'metrics': ['Ownership_Transfers']},
    'cpi': {'name': FILE_NAMES['cpi'], 'type': 'excel', 'is_regional': False, 'metrics': ['CPI']},
    'sales': {'name': FILE_NAMES['sales'], 'type': 'csv', 'is_regional': True, 'encoding': 'utf-8', 'sep': ';', 'metrics': ['Sales']},
    'email': {'name': FILE_NAMES['email'], 'type': 'csv', 'is_regional': True, 'encoding': 'utf-8', 'sep': ';', 'metrics': ['Email_Impressions']},
    'marketing': {'name': FILE_NAMES['marketing'], 'type': 'csv', 'is_regional': True, 'encoding': 'ISO-8859-1', 'sep': ';',
                  'metrics': ['Marketing_Impressions', 'Cost'], # Aggregated before pivot
                  'grouping_cols': ['Channel']},
    'fuel': {'name': FILE_NAMES['fuel'], 'type': 'excel', 'is_regional': False, 'metrics': ['Gas_Price']}
}

print("\n--- Initializing Base Daily DataFrame ---")
daily_dates = pd.date_range(start=START_DATE, end=END_DATE, freq='D')
national_daily_df = pd.DataFrame({'Date': daily_dates}).sort_values('Date')

processed_data = {}
for key, info in FILES_INFO.items():
    processed = load_and_process_file(key, info, national_daily_df)
    if processed is not None: processed_data[key] = processed

print("\n--- Merging Processed Data ---")
for key, df_processed in processed_data.items():
    merge_cols = list(df_processed.columns) # Get columns to merge
    if key in ['cpi', 'fuel']:
        print(f"Merging {key} using merge_asof...")
        national_daily_df = pd.merge_asof(national_daily_df, df_processed, on='Date', direction='backward')
    elif key == 'marketing':
         print(f"Merging {key} using left merge...")
         # Only merge pivoted columns (Date + Channel_Metric columns)
         national_daily_df = pd.merge(national_daily_df, df_processed, on='Date', how='left')
    else: # Standard merge for other daily aggregates
         print(f"Merging {key} using left merge...")
         # Ensure we don't merge the 'Region' column if it accidentally remained
         cols_to_merge = [col for col in merge_cols if col != 'Region']
         national_daily_df = pd.merge(national_daily_df, df_processed[cols_to_merge], on='Date', how='left')

print(f"\nColumns after merging: {national_daily_df.columns.tolist()}")

# %% ----- Feature Engineering (Daily Flags) -----
print("\n--- Generating Daily Flags ---")
if 'Date' not in national_daily_df.columns: # Should not happen if initialized correctly
    national_daily_df = national_daily_df.reset_index() # Try resetting if index is Date
if 'Date' not in national_daily_df.columns:
     raise ValueError("Critical Error: 'Date' column lost before setting index.")

national_daily_df = national_daily_df.set_index('Date')

# Replace the existing holiday generation code with this version
print("Generating Holiday flags...")
min_year, max_year = national_daily_df.index.year.min(), national_daily_df.index.year.max()
sweden_holidays = get_swedish_holidays(min_year, max_year)

# Initialize holiday column to 0 first
national_daily_df['Is_Holiday'] = 0

# Only mark actual holiday dates as 1
if not sweden_holidays.empty:
    holiday_dates = pd.DatetimeIndex([h for h in sweden_holidays if START_DATE <= h <= END_DATE])
    if not holiday_dates.empty:
        national_daily_df.loc[holiday_dates, 'Is_Holiday'] = 1
        print(f"Holiday flags generated. Found {len(holiday_dates)} holidays in date range.")
        # Debug print
        print(f"Sample of dates marked as holidays: {holiday_dates[:5]}")
else:
    print("Warning: No holidays found. All dates marked as non-holidays (0).")


print("Generating Ramadan flags...")
national_daily_df['Is_Ramadan'] = 0
ramadan_periods_dt = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in RAMADAN_PERIODS]
if isinstance(national_daily_df.index, pd.DatetimeIndex):
    for start, end in ramadan_periods_dt:
        idx_slice = national_daily_df.index[(national_daily_df.index >= start) & (national_daily_df.index <= end)]
        if not idx_slice.empty: national_daily_df.loc[idx_slice, 'Is_Ramadan'] = 1
    print(f"Ramadan flags generated. Count: {national_daily_df['Is_Ramadan'].sum()}")
else: print("ERROR: Index is not DatetimeIndex, cannot generate Ramadan flags.")


# %% ----- Final Filling and Weekly Aggregation -----
print("\n--- Filling Final NaNs ---")

# Identify columns: sums, indicators, flags, and PIVOTED marketing cols
sum_cols = ['Ownership_Transfers', 'Sales', 'Email_Impressions']
# *** Get pivoted columns dynamically ***
pivoted_marketing_cols = [col for col in national_daily_df.columns if '_Impressions' in col or '_Cost' in col]
indicator_cols = ['CPI', 'Gas_Price']
flag_cols = ['Is_Holiday', 'Is_Ramadan']

# Fill summed columns and PIVOTED marketing columns with 0
cols_to_fill_zero = sum_cols + pivoted_marketing_cols
for col in cols_to_fill_zero:
    if col in national_daily_df.columns:
        if national_daily_df[col].isnull().any():
             # print(f"Filling NaNs in summed/pivoted column '{col}' with 0.") # Less verbose
             national_daily_df[col] = national_daily_df[col].fillna(0)
    else: print(f"Warning: Expected summed/pivoted column '{col}' not present for fillna(0).")

# Fill indicator columns with ffill/bfill
for col in indicator_cols:
    if col in national_daily_df.columns:
         if national_daily_df[col].isnull().any():
             # print(f"Filling NaNs in indicator column '{col}' with ffill/bfill.") # Less verbose
             national_daily_df[col] = pd.to_numeric(national_daily_df[col], errors='coerce')
             national_daily_df[col] = national_daily_df[col].ffill().bfill()
    else: print(f"Warning: Expected indicator column '{col}' not present for ffill/bfill.")

# Ensure flag columns are integer and filled
for col in flag_cols:
    if col in national_daily_df.columns:
         national_daily_df[col] = national_daily_df[col].fillna(0).astype(int)

print("\n--- Aggregating to Weekly Level ---")
agg_funcs = {}
for col in national_daily_df.columns:
    if col in indicator_cols:
        agg_funcs[col] = 'last'
    elif col == 'Is_Holiday':  # Special case for holidays
        agg_funcs[col] = lambda x: 1 if any(x) else 0
    elif col == 'Is_Ramadan':  # Keep existing max aggregation for Ramadan
        agg_funcs[col] = 'max'
    elif pd.api.types.is_numeric_dtype(national_daily_df[col]):
        agg_funcs[col] = 'sum'


if not agg_funcs: print("CRITICAL ERROR: No columns available for weekly aggregation.")
else:
    print(f"Weekly aggregation functions (Count: {len(agg_funcs)}): {list(agg_funcs.keys())}") # Print keys for brevity
    try:
        if not isinstance(national_daily_df.index, pd.DatetimeIndex): raise TypeError("Index not DatetimeIndex.")
        weekly_agg_df = national_daily_df.resample('W-MON').agg(agg_funcs)
        weekly_agg_df.index.name = 'Date'
        print(f"Weekly aggregation complete. Result shape: {weekly_agg_df.shape}")
        print(weekly_agg_df.head())

        # %% ----- Save Results -----
        print("\n--- Saving Results ---")
        #output_filename_csv = DATA_DIR / 'aggregated_weekly_dat.csv' # Increment version
        output_filename_excel = DATA_DIR / 'aggregated_weekly_data.xlsx' # Increment version

        #weekly_agg_df.to_csv(output_filename_csv)
        #print(f"Successfully saved aggregated weekly data to {output_filename_csv.resolve()}")
        weekly_agg_df.to_excel(output_filename_excel)
        print(f"Successfully saved aggregated weekly data to {output_filename_excel.resolve()}")

    except TypeError as e: print(f"ERROR during resampling: {e}"); traceback.print_exc()
    except Exception as e: print(f"An unexpected error occurred: {e}"); traceback.print_exc()

print("\nScript finished.")

Script started...
Configuration Loaded. Data Dir: /mnt/c/users/b816i1/git/reports/lf-mmm/code/PYMC MMM/2025-04-10

--- Initializing Base Daily DataFrame ---

--- Processing Transfers (../Data/simulated_car_transfer.xlsx) ---
Info (Transfers): Loaded 34023 rows.
Info (Transfers): 25842 rows remain after date cleaning/filtering.
Info (Transfers): Cleaning regions...
Info (Transfers): 24686 rows remain after region cleaning/filtering.
Info (Transfers): Aggregating ['Ownership_Transfers'] by ['Date']...
Info (Transfers): Processing complete. Result has 1179 rows.

--- Processing Cpi (../Data/CPI.xlsx) ---
Info (Cpi): Loaded 38 rows.
Info (Cpi): 38 rows remain after date cleaning/filtering.
Info (Cpi): Processed as national indicator. 38 unique date entries.

--- Processing Sales (../Data/Försäljning Personbil.csv) ---
Info (Sales): Loaded 28233 rows.
Info (Sales): 28233 rows remain after date cleaning/filtering.
Info (Sales): Cleaning regions...
Info (Sales): 27116 rows remain after region