# Instacart Customer Segmentation & Reporting – Reproducible Pipeline

## README – How to run
- Place your **merged dataset from Exercise 4.9** into the `data_in/` folder at the project root. Supported formats: `.parquet` (preferred) or `.csv`.
- Accepted file examples: `instacart_merged.parquet`, `instacart_merged.csv` (any filename is fine; the script will auto-detect the first `.parquet` or `.csv`).
- Run the notebook top-to-bottom (**Kernel → Restart & Run All**). No interactivity or manual edits are required.
- Expected outputs are written to subfolders:
  - `data_out/` – exported samples and final datasets (`.parquet` + `.csv`)
  - `figures/` – all charts as PNG (DPI=200)
  - `reports/` – final Excel report with 7 tabs
  - `logs/` – rotating execution log `run.log`
- Estimated runtime and memory depend on your dataset size. The original Instacart data may take several minutes.

## Contents
1. [Environment & Imports](#Environment-&-Imports)
2. [CONFIG](#CONFIG)
3. [Helper Functions](#Helper-Functions)
4. [Load & Validate Data](#Load-&-Validate-Data)
5. [PII Handling (Hash+Salt)](#PII-Handling-(Hash+Salt))
6. [US Regional Segmentation](#US-Regional-Segmentation)
7. [Exclude Low-Activity Customers](#Exclude-Low-Activity-Customers)
8. [Rule-based Customer Profiles](#Rule-based-Customer-Profiles)
9. [Analytics & Statistical Tests](#Analytics-&-Statistical-Tests)
10. [Visualizations](#Visualizations)
11. [Export Final Datasets](#Export-Final-Datasets)
12. [Excel Report (7 Tabs)](#Excel-Report-(7-Tabs))
13. [Run Summary](#Run-Summary)


# Environment & Imports
_Set deterministic seed, configure logging, and print library versions._

In [None]:
import os
import sys
import re
import json
import math
import time
import random
import warnings
from pathlib import Path
from datetime import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import pyarrow as pa
import pyarrow.parquet as pq
from dotenv import load_dotenv, set_key
import hashlib
from textwrap import fill
from logging.handlers import RotatingFileHandler
import logging
from openpyxl import Workbook
from openpyxl.drawing.image import Image as XLImage
from openpyxl.utils import get_column_letter

warnings.filterwarnings('ignore')
np.random.seed(42)
random.seed(42)

print('Python', sys.version)
print('pandas', pd.__version__)
print('numpy', np.__version__)
print('matplotlib', plt.matplotlib.__version__)
print('seaborn', sns.__version__)
print('scipy present')
import openpyxl
print('openpyxl', openpyxl.__version__)
print('pyarrow', pa.__version__)
import dotenv
print('python-dotenv', getattr(dotenv, '__version__', 'present'))

# CONFIG
_Project-wide configuration: folders, thresholds, department groups, schema, and plotting settings._

In [None]:
CONFIG = {
    'paths': {
        'data_in': 'data_in',
        'data_out': 'data_out',
        'figures': 'figures',
        'reports': 'reports',
        'logs': 'logs',
        'env_file': '.env',
    },
    'files': {
        'active_customers_sample_base': 'active_customers_sample',
        'final_dataset_base': 'final_instacart_dataset',
        'excel_report': 'Instacart_Final_Report.xlsx',
    },
    'required_columns': [
        'customer_id','state','orders_count','order_id','order_number',
        'order_hour_of_day','order_dow','age','income','dependents',
        'department_id','price','spending_flag'
    ],
    'column_aliases': {
        'State': 'state',
        'orders_day_of_week': 'order_dow',
        'order_hour': 'order_hour_of_day',
        'n_dependents': 'dependents',
        'department': 'department_id',
        'prices': 'price',
        'order_price': 'price'
    },
    'pii_patterns': ['first_name','last_name','full_name','email','phone','address','street','zip','zipcode'],
    'region_mapping': {
        'Northeast': ['CT','ME','MA','NH','RI','VT','NJ','NY','PA','DC'],
        'Midwest': ['IL','IN','MI','OH','WI','IA','KS','MN','MO','NE','ND','SD'],
        'South': ['DE','FL','GA','MD','NC','SC','VA','WV','AL','KY','MS','TN','AR','LA','OK','TX'],
        'West': ['AZ','CO','ID','MT','NV','NM','UT','WY','AK','CA','HI','OR','WA']
    },
    # Optional: if departments.csv is available in data_in/, these labels can be verified/overridden
    'dept_groups': {
        # Examples; adjust if your data uses different IDs
        'young_parent_depts': {3, 4, 7, 10, 12},
        'health_depts': {4, 16}  # e.g., produce=4, dairy/eggs=16 in common Instacart mappings
    },
    'thresholds': {
        'young_parent_age_min': 20,
        'young_parent_age_max': 34,
        'evening_hours': list(range(18, 23)),
        'evening_share_min': 0.30,
        'morning_hours': list(range(6, 11)),
        'morning_share_min': 0.30,
        'weekday_hours': list(range(9, 18)),
        'weekday_share_min': 0.50,
        'late_hours': list(range(20, 25)),
        'late_share_min': 0.30,
        'weekend_dows': {0, 6},  # Sunday=0 in Instacart, Saturday=6
        'weekend_share_min': 0.35,
        'young_parent_dept_share_min': 0.20,
        'health_dept_share_min': 0.25,
        'family_saver_income_max': 50000,
        'affluent_income_min': 120000,
        'senior_age_min': 65
    },
    'plot': {
        'style': 'whitegrid',
        'font_scale': 1.1,
        'dpi': 200,
        'figsize': (10, 6)
    }
}

def setup_environment(cfg: dict) -> logging.Logger:
    for key in ['data_in', 'data_out', 'figures', 'reports', 'logs']:
        Path(cfg['paths'][key]).mkdir(parents=True, exist_ok=True)
    # Logging
    log_path = Path(cfg['paths']['logs']) / 'run.log'
    logger = logging.getLogger('instacart')
    logger.setLevel(logging.INFO)
    # Avoid duplicate handlers if re-running
    logger.handlers.clear()
    fh = RotatingFileHandler(log_path, maxBytes=2_000_000, backupCount=3)
    fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    fh.setFormatter(fmt)
    logger.addHandler(fh)
    ch = logging.StreamHandler(sys.stdout)
    ch.setFormatter(fmt)
    logger.addHandler(ch)
    logger.info('Environment initialized.')
    return logger

LOGGER = setup_environment(CONFIG)

# Global plotting theme
sns.set_theme(style=CONFIG['plot']['style'])
sns.set_context('notebook')
plt.rcParams['figure.dpi'] = CONFIG['plot']['dpi']
plt.rcParams['savefig.dpi'] = CONFIG['plot']['dpi']
plt.rcParams['font.size'] = 11


# Helper Functions
_Modular utilities: file discovery, IO, schema checks, PII hashing, region mapping, profiling, analytics, plotting, and reporting._

In [None]:
def find_first_file(folder: Path, extensions: tuple) -> Path:
    candidates = []
    for ext in extensions:
        candidates.extend(sorted(folder.glob(f'*{ext}')))
    return candidates[0] if candidates else None

def load_dataset_auto(cfg: dict) -> tuple[pd.DataFrame, str]:
    data_in = Path(cfg['paths']['data_in'])
    p = find_first_file(data_in, ('.parquet', '.pq'))
    fmt = None
    if p is not None:
        fmt = 'parquet'
    else:
        p = find_first_file(data_in, ('.csv',))
        if p is not None:
            fmt = 'csv'
    if p is None:
        raise FileNotFoundError(
            'No input file found in data_in/. Expected a merged dataset (.parquet preferred or .csv).')
    LOGGER.info(f'Loading dataset: {p.name} (format={fmt})')
    if fmt == 'parquet':
        df = pd.read_parquet(p, engine='pyarrow')
    else:
        df = pd.read_csv(p)
    return df, p.name

def normalize_columns(df: pd.DataFrame, cfg: dict) -> tuple[pd.DataFrame, dict]:
    original_cols = df.columns.tolist()
    col_map = {}
    # Lowercase normalization first
    lower_map = {c: c.lower() for c in df.columns}
    df = df.rename(columns=lower_map)
    # Apply explicit aliases
    for k, v in cfg['column_aliases'].items():
        if k.lower() in df.columns and v not in df.columns:
            df = df.rename(columns={k.lower(): v})
            col_map[k] = v
    # Ensure required presence or prepare to compute
    # Dependents: if missing, add and mark
    added_dependents = False
    if 'dependents' not in df.columns:
        df['dependents'] = 0
        added_dependents = True
    # spending_flag may be computed later if missing
    LOGGER.info(f'Columns before normalization: {original_cols}')
    LOGGER.info(f'Columns after normalization: {df.columns.tolist()}')
    return df, {'renamed': col_map, 'added_dependents': added_dependents}

def validate_required_columns(df: pd.DataFrame, cfg: dict):
    required = set(cfg['required_columns'])
    present = set(df.columns)
    missing = required - present
    if missing:
        raise ValueError(
            'Schema validation failed. Missing required columns: ' + ', '.join(sorted(missing)) +
            '. Use aliases in CONFIG if your file uses alternative names.')
    # Basic types coercion where safe
    for col in ['order_hour_of_day', 'order_dow', 'order_number', 'orders_count', 'department_id']:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    for col in ['age', 'income', 'price']:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    if df[['customer_id','order_id']].isna().any().any():
        raise ValueError('Nulls found in key columns customer_id/order_id. Please fix source data.')

def compute_spending_flag_if_missing(df: pd.DataFrame, cfg: dict) -> tuple[pd.DataFrame, bool]:
    created = False
    if 'spending_flag' not in df.columns:
        LOGGER.info('spending_flag not found. Creating by Q3 total spend per customer (1=high spender).')
        cust_spend = df.groupby('customer_id')['price'].sum().rename('total_spend_per_customer')
        q3 = cust_spend.quantile(0.75)
        high = (cust_spend >= q3).astype(int).rename('spending_flag')
        df = df.merge(high, on='customer_id', how='left')
        created = True
    return df, created

def get_or_create_salt(env_path: Path) -> str:
    load_dotenv(dotenv_path=env_path)
    salt = os.getenv('ANON_SALT')
    if not salt:
        # Generate cryptographically strong salt; store for reproducibility across runs on the same project
        import secrets
        salt = secrets.token_hex(16)
        set_key(str(env_path), 'ANON_SALT', salt)
        LOGGER.info(f'Generated new salt and stored in {env_path}')
    else:
        LOGGER.info(f'Using existing salt from {env_path}')
    return salt

def hash_pii_columns(df: pd.DataFrame, cfg: dict) -> tuple[pd.DataFrame, list[str]]:
    patterns = set([p.lower() for p in cfg['pii_patterns']])
    pii_cols = [c for c in df.columns if any(p == c.lower() for p in patterns)]
    if not pii_cols:
        LOGGER.info('No PII columns matched the patterns.')
        return df, []
    salt = get_or_create_salt(Path(cfg['paths']['env_file']))
    def _hash_val(x):
        if pd.isna(x):
            return x
        h = hashlib.sha256((salt + str(x)).encode('utf-8')).hexdigest()
        return 'hash_' + h[:12]
    for col in pii_cols:
        df[col] = df[col].map(_hash_val)
    LOGGER.info(f'Applied irreversible SHA-256 hashing with salt to PII columns: {pii_cols}')
    return df, pii_cols

US_STATE_ABBR = {
    'ALABAMA':'AL','ALASKA':'AK','ARIZONA':'AZ','ARKANSAS':'AR','CALIFORNIA':'CA','COLORADO':'CO','CONNECTICUT':'CT',
    'DELAWARE':'DE','DISTRICT OF COLUMBIA':'DC','FLORIDA':'FL','GEORGIA':'GA','HAWAII':'HI','IDAHO':'ID','ILLINOIS':'IL',
    'INDIANA':'IN','IOWA':'IA','KANSAS':'KS','KENTUCKY':'KY','LOUISIANA':'LA','MAINE':'ME','MARYLAND':'MD','MASSACHUSETTS':'MA',
    'MICHIGAN':'MI','MINNESOTA':'MN','MISSISSIPPI':'MS','MISSOURI':'MO','MONTANA':'MT','NEBRASKA':'NE','NEVADA':'NV',
    'NEW HAMPSHIRE':'NH','NEW JERSEY':'NJ','NEW MEXICO':'NM','NEW YORK':'NY','NORTH CAROLINA':'NC','NORTH DAKOTA':'ND',
    'OHIO':'OH','OKLAHOMA':'OK','OREGON':'OR','PENNSYLVANIA':'PA','RHODE ISLAND':'RI','SOUTH CAROLINA':'SC','SOUTH DAKOTA':'SD',
    'TENNESSEE':'TN','TEXAS':'TX','UTAH':'UT','VERMONT':'VT','VIRGINIA':'VA','WASHINGTON':'WA','WEST VIRGINIA':'WV','WISCONSIN':'WI','WYOMING':'WY'
}

def map_state_and_region(df: pd.DataFrame, cfg: dict) -> tuple[pd.DataFrame, dict]:
    df = df.copy()
    # Normalize 'state' to two-letter abbreviations
    def to_abbr(s):
        if pd.isna(s):
            return None
        s = str(s).strip().upper()
        if len(s) == 2 and s.isalpha():
            return s
        return US_STATE_ABBR.get(s, None)
    df['state'] = df['state'].map(to_abbr)
    # Map to region
    rev = {}
    for region, states in cfg['region_mapping'].items():
        for st in states:
            rev[st] = region
    df['region'] = df['state'].map(rev)
    before = len(df)
    excluded = df[df['region'].isna()].copy()
    df = df[~df['region'].isna()].copy()
    after = len(df)
    exclusion_info = {
        'excluded_count': int(before - after),
        'excluded_reason': 'Unknown or non-US state',
        'examples': excluded['state'].dropna().unique().tolist()[:10]
    }
    if exclusion_info['excluded_count'] > 0:
        LOGGER.warning(f"Excluded {exclusion_info['excluded_count']} rows due to unknown/non-US states: {exclusion_info['examples']}")
    return df, exclusion_info

def add_low_activity_flag(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df['low_activity_flag'] = (df['orders_count'] < 5).astype(int)
    return df

def build_customer_features(df: pd.DataFrame, cfg: dict) -> pd.DataFrame:
    # Per-order totals
    order_totals = df.groupby('order_id', as_index=False)['price'].sum().rename(columns={'price': 'order_total'})
    cust_orders = df[['order_id', 'customer_id', 'order_hour_of_day', 'order_dow']].drop_duplicates()
    cust_orders = cust_orders.merge(order_totals, on='order_id', how='left')
    # Aggregations per customer
    t = cfg['thresholds']
    def share_in(values, allowed):
        if len(values) == 0:
            return 0.0
        return float(np.isin(values, list(allowed)).mean())
    cust_hour = cust_orders.groupby('customer_id').agg(
        orders=('order_id','nunique'),
        median_order_total=('order_total','median'),
        share_evening=('order_hour_of_day', lambda x: share_in(x.values, t['evening_hours'])),
        share_morning=('order_hour_of_day', lambda x: share_in(x.values, t['morning_hours'])),
        share_weekday=('order_hour_of_day', lambda x: share_in(x.values, t['weekday_hours'])),
        share_late=('order_hour_of_day', lambda x: share_in(x.values, t['late_hours'])),
        share_weekend=('order_dow', lambda x: share_in(x.values, t['weekend_dows']))
    ).reset_index()
    # Department shares
    dept_counts = df.groupby(['customer_id','department_id']).size().rename('n').reset_index()
    total_counts = dept_counts.groupby('customer_id')['n'].sum().rename('total_n').reset_index()
    dept_counts = dept_counts.merge(total_counts, on='customer_id', how='left')
    dept_counts['share'] = dept_counts['n'] / dept_counts['total_n']
    # Shares for configured sets
    yp_set = set(cfg['dept_groups']['young_parent_depts'])
    hl_set = set(cfg['dept_groups']['health_depts'])
    yp_share = dept_counts[dept_counts['department_id'].isin(yp_set)].groupby('customer_id')['share'].sum().rename('share_young_parent_depts')
    hl_share = dept_counts[dept_counts['department_id'].isin(hl_set)].groupby('customer_id')['share'].sum().rename('share_health_depts')
    shares = pd.concat([yp_share, hl_share], axis=1).fillna(0.0).reset_index()
    # Total spend per customer
    spend = df.groupby('customer_id')['price'].sum().rename('total_spend_per_customer').reset_index()
    # Global median order total
    global_median_order_total = cust_orders['order_total'].median()
    # Combine
    feat = cust_hour.merge(shares, on='customer_id', how='left')\
                   .merge(spend, on='customer_id', how='left')
    feat['low_price_baskets'] = (feat['median_order_total'] < global_median_order_total).astype(int)
    feat['global_median_order_total'] = global_median_order_total
    return feat

def assign_profiles(df: pd.DataFrame, feats: pd.DataFrame, cfg: dict) -> pd.DataFrame:
    t = cfg['thresholds']
    cust = df[['customer_id','age','income','dependents','spending_flag']].drop_duplicates('customer_id')
    cust = cust.merge(feats, on='customer_id', how='left')
    profile = []
    for _, r in cust.iterrows():
        age = r['age']
        inc = r['income']
        deps = r['dependents']
        spf = r['spending_flag']
        se = r['share_evening']; sm = r['share_morning']; sw = r['share_weekday']
        sl = r['share_late']; swk = r['share_weekend']
        yp_share = r.get('share_young_parent_depts', 0.0)
        hl_share = r.get('share_health_depts', 0.0)
        low_price = r['low_price_baskets']
        med_ord = r['median_order_total']; glob_med = r['global_median_order_total']
        # Rule 1
        if (t['young_parent_age_min'] <= age <= t['young_parent_age_max'] and deps >= 1 and
            yp_share >= t['young_parent_dept_share_min'] and se >= t['evening_share_min']):
            profile.append('Young Parent')
            continue
        # Rule 2
        if (deps >= 2 and inc < t['family_saver_income_max'] and (low_price == 1 or spf == 0)):
            profile.append('Family Saver')
            continue
        # Rule 3
        if (inc >= t['affluent_income_min'] and sw >= t['weekday_share_min'] and med_ord > glob_med):
            profile.append('Affluent Professional')
            continue
        # Rule 4
        if (hl_share >= t['health_dept_share_min'] and sm >= t['morning_share_min']):
            profile.append('Health Conscious')
            continue
        # Rule 5
        if (deps == 0 and 25 <= age <= 44 and (sl >= t['late_share_min'] or swk >= t['weekend_share_min'])):
            profile.append('Single Adult')
            continue
        # Rule 6
        if (age >= t['senior_age_min']):
            profile.append('Senior')
            continue
        profile.append('Other')
    cust['profile'] = profile
    assert cust['profile'].isna().sum() == 0, 'Profile assignment produced NaN values.'
    # Return per-customer profiles; merge back to row-level
    out = df.merge(cust[['customer_id','profile']], on='customer_id', how='left')
    return out, cust

def region_spend_stats(df: pd.DataFrame) -> pd.DataFrame:
    cust_spend = df.groupby(['customer_id','region'])['price'].sum().rename('total_spend_per_customer').reset_index()
    stats_tbl = cust_spend.groupby('region')['total_spend_per_customer'].agg(
        count='count', mean='mean', median='median', std='std', q1=lambda x: x.quantile(0.25), q3=lambda x: x.quantile(0.75)
    ).reset_index()
    return stats_tbl, cust_spend

def stat_tests_by_region(cust_spend: pd.DataFrame) -> dict:
    groups = [g['total_spend_per_customer'].values for _, g in cust_spend.groupby('region')]
    # ANOVA
    anova = stats.f_oneway(*groups)
    # Kruskal-Wallis
    kruskal = stats.kruskal(*groups)
    return {
        'anova_f': float(anova.statistic), 'anova_p': float(anova.pvalue),
        'kruskal_h': float(kruskal.statistic), 'kruskal_p': float(kruskal.pvalue)
    }

def plot_boxplot_spend_by_region(cust_spend: pd.DataFrame, cfg: dict, save_path: Path):
    plt.figure(figsize=cfg['plot']['figsize'])
    ax = sns.boxplot(data=cust_spend, x='region', y='total_spend_per_customer')
    ax.set_title('Total Spend per Customer by Region')
    ax.set_xlabel('Region')
    ax.set_ylabel('Total Spend per Customer')
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

def plot_violin_spend_by_region(cust_spend: pd.DataFrame, cfg: dict, save_path: Path):
    plt.figure(figsize=cfg['plot']['figsize'])
    ax = sns.violinplot(data=cust_spend, x='region', y='total_spend_per_customer', cut=0)
    ax.set_title('Distribution of Total Spend per Customer by Region (Violin)')
    ax.set_xlabel('Region')
    ax.set_ylabel('Total Spend per Customer')
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

def plot_profile_distribution(cust_profiles: pd.DataFrame, cfg: dict, save_path: Path):
    dist = cust_profiles['profile'].value_counts().rename_axis('profile').reset_index(name='count')
    dist['share'] = dist['count'] / dist['count'].sum()
    plt.figure(figsize=cfg['plot']['figsize'])
    ax = sns.barplot(data=dist, x='profile', y='count')
    ax.set_title('Customer Profile Distribution')
    ax.set_xlabel('Profile')
    ax.set_ylabel('Count')
    for i, row in dist.iterrows():
        ax.text(i, row['count'], f"{row['count']}\n({row['share']:.1%})", ha='center', va='bottom')
    plt.xticks(rotation=30, ha='right')
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

def crosstab_profile_region(cust_profiles: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    ct = pd.crosstab(cust_profiles['profile'], cust_profiles['region'])
    ct_row = ct.div(ct.sum(axis=1), axis=0).fillna(0.0)
    ct_col = ct.div(ct.sum(axis=0), axis=1).fillna(0.0)
    return ct_row, ct_col

def plot_heatmap(df: pd.DataFrame, title: str, cfg: dict, save_path: Path):
    plt.figure(figsize=(max(cfg['plot']['figsize'][0], 10), max(cfg['plot']['figsize'][1], 7)))
    ax = sns.heatmap(df, annot=True, fmt='.1%', cmap='viridis')
    ax.set_title(title)
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

def top_departments_by_profile(df: pd.DataFrame, cust_profiles: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
    tmp = df.merge(cust_profiles[['customer_id','profile']], on='customer_id', how='left')
    counts = tmp.groupby(['profile','department_id']).size().rename('n').reset_index()
    counts['share'] = counts.groupby('profile')['n'].apply(lambda x: x / x.sum())
    counts = counts.sort_values(['profile','share'], ascending=[True, False])
    # Keep top N per profile
    counts['rank'] = counts.groupby('profile')['share'].rank(method='first', ascending=False)
    return counts[counts['rank'] <= top_n].drop(columns='rank')

def profile_department_heatmap(df: pd.DataFrame, cust_profiles: pd.DataFrame) -> pd.DataFrame:
    tmp = df.merge(cust_profiles[['customer_id','profile']], on='customer_id', how='left')
    ct = pd.crosstab(tmp['profile'], tmp['department_id'])
    ct = ct.div(ct.sum(axis=1), axis=0).fillna(0.0)
    return ct

def export_dataframe(df: pd.DataFrame, base_path: Path):
    df.to_parquet(base_path.with_suffix('.parquet'), engine='pyarrow', index=False)
    df.to_csv(base_path.with_suffix('.csv'), index=False)

def write_excel_report(cfg: dict,
                       meta: dict,
                       population_flow: pd.DataFrame,
                       consistency_checks: pd.DataFrame,
                       wrangling_security: pd.DataFrame,
                       derived_columns: pd.DataFrame,
                       figures: list[tuple[str, Path]],
                       results_reco: list[str],
                       out_path: Path,
                       crosstab_sf_region: pd.DataFrame | None = None,
                       top_depts: pd.DataFrame | None = None):
    wb = Workbook()
    # Tab1 Project Details & Sources
    ws = wb.active
    ws.title = 'Project Details & Sources'
    rows = [
        ['Project', 'Instacart – Customer Segmentation & Reporting'],
        ['Dataset Version', meta.get('dataset_name','')],
        ['Generated At', meta.get('generated_at','')],
        ['Library Versions', meta.get('lib_versions','')],
        ['PII Note', meta.get('pii_note','')],
        ['Sources', 'Instacart public dataset (merged), project Exercise 4.9 output']
    ]
    for r in rows:
        ws.append(r)

    # Tab2 Population Flow
    ws2 = wb.create_sheet('Population Flow')
    for r in [population_flow.columns.tolist()] + population_flow.values.tolist():
        ws2.append(list(r))

    # Tab3 Consistency Checks
    ws3 = wb.create_sheet('Consistency Checks')
    for r in [consistency_checks.columns.tolist()] + consistency_checks.values.tolist():
        ws3.append(list(r))
    # Append Crosstab spending_flag x region if provided
    if crosstab_sf_region is not None:
        ws3.append([])
        ws3.append(['Crosstab: spending_flag × region'])
        cols = ['spending_flag'] + list(map(str, crosstab_sf_region.columns.tolist()))
        ws3.append(cols)
        for idx, row in crosstab_sf_region.iterrows():
            ws3.append([str(idx)] + row.tolist())

    # Tab4 Data Wrangling & Security
    ws4 = wb.create_sheet('Data Wrangling & Security')
    for r in [wrangling_security.columns.tolist()] + wrangling_security.values.tolist():
        ws4.append(list(r))

    # Tab5 Derived Columns
    ws5 = wb.create_sheet('Derived Columns')
    for r in [derived_columns.columns.tolist()] + derived_columns.values.tolist():
        ws5.append(list(r))

    # Tab6 Visualizations – insert images
    ws6 = wb.create_sheet('Visualizations')
    cur_row = 1
    for caption, fig_path in figures:
        ws6.cell(row=cur_row, column=1, value=caption)
        cur_row += 1
        if fig_path.exists():
            img = XLImage(str(fig_path))
            img.anchor = f'A{cur_row}'
            ws6.add_image(img)
            cur_row += int( img.height / 20 ) + 2  # crude spacing
        else:
            ws6.cell(row=cur_row, column=1, value=f'File not found: {fig_path.name}')
            cur_row += 2

    # Optional: Top departments by profile
    if top_depts is not None:
        ws8 = wb.create_sheet('Top Depts by Profile')
        ws8.append(top_depts.columns.tolist())
        for r in top_depts.itertuples(index=False):
            ws8.append(list(r))

    # Tab7 Results & Recommendations
    ws7 = wb.create_sheet('Results & Recommendations')
    ws7.append(['Recommendations'])
    for item in results_reco:
        ws7.append([item])

    # Save
    out_path.parent.mkdir(parents=True, exist_ok=True)
    wb.save(out_path)

def summarize_artifacts(paths: dict):
    print('\n=== Artifacts ===')
    for k, v in paths.items():
        print(f"{k}: {v}")

# Load & Validate Data
_Auto-detect input file (.parquet preferred, else .csv), normalize columns, and validate schema. Compute `spending_flag` if missing._

In [None]:
RUN_META = {}
RUN_META['start_time'] = datetime.now().isoformat(timespec='seconds')
df_raw, dataset_name = load_dataset_auto(CONFIG)
RUN_META['dataset_name'] = dataset_name

df, norm_info = normalize_columns(df_raw, CONFIG)
df, spending_created = compute_spending_flag_if_missing(df, CONFIG)
validate_required_columns(df, CONFIG)
LOGGER.info(f"Normalization info: {norm_info}; spending_flag created: {spending_created}")

# PII Handling (Hash+Salt)
_Detect PII columns by pattern and irreversibly hash them with SHA-256 using a stored salt in `.env`. The anonymized dataframe is used downstream._

In [None]:
df_anon, pii_cols = hash_pii_columns(df, CONFIG)
RUN_META['pii_cols'] = pii_cols

# US Regional Segmentation
_Map `state` to USPS abbreviations and then to US Census regions; exclude unknown/non-US with logging._

In [None]:
pop_flow = []
pop_flow.append(('Start', len(df_anon)))

df_reg, excl_info = map_state_and_region(df_anon, CONFIG)
pop_flow.append(('After State Mapping', len(df_reg)))
RUN_META['excluded_states'] = excl_info
LOGGER.info(f"Population flow so far: {pop_flow}")

# Exclude Low-Activity Customers
_Create `low_activity_flag` for customers with `< 5` orders and export the active sample._

In [None]:
df_reg = add_low_activity_flag(df_reg)
active = df_reg[df_reg['low_activity_flag'] == 0].copy()
pop_flow.append(('Exclude Low Activity (>=5 orders)', len(active)))
LOGGER.info(f"Population flow with activity filter: {pop_flow}")

# Export active customers sample (row-level subset)
active_base = Path(CONFIG['paths']['data_out']) / CONFIG['files']['active_customers_sample_base']
export_dataframe(active, active_base)
ACTIVE_SAMPLE_PARQUET = str(active_base.with_suffix('.parquet'))
ACTIVE_SAMPLE_CSV = str(active_base.with_suffix('.csv'))

# Rule-based Customer Profiles
_Build per-customer behavioral features (hours, departments, spend), then assign profiles by prioritized rules. Validate no missing profiles._

In [None]:
features = build_customer_features(df_reg, CONFIG)
df_prof, cust_prof = assign_profiles(df_reg, features, CONFIG)
assert cust_prof['profile'].isna().sum() == 0, 'Profile assignment has NaNs.'
pop_flow.append(('Profiled Customers', cust_prof.shape[0]))

# Analytics & Statistical Tests
_Regional spend differences (summary, ANOVA, Kruskal–Wallis); profile distributions and aggregates._

In [None]:
stats_tbl, cust_spend = region_spend_stats(df_prof)
tests = stat_tests_by_region(cust_spend)

# Aggregates by profile
agg_by_profile = cust_prof.groupby('profile').agg(
    min_orders=('orders','min'), mean_orders=('orders','mean'), max_orders=('orders','max'),
    min_expenditure=('total_spend_per_customer','min'),
    mean_expenditure=('total_spend_per_customer','mean'),
    max_expenditure=('total_spend_per_customer','max')
).reset_index()

display(stats_tbl.head())
print('ANOVA F={:.4f}, p-value={:.6f}'.format(tests['anova_f'], tests['anova_p']))
print('Kruskal H={:.4f}, p-value={:.6f}'.format(tests['kruskal_h'], tests['kruskal_p']))
display(agg_by_profile.head())

# Visualizations
_Save all figures in `figures/` with consistent styling (seaborn whitegrid, font size 11, PNG, DPI=200)._

In [None]:
fig_dir = Path(CONFIG['paths']['figures'])
box_path = fig_dir / 'boxplot_spend_by_region.png'
violin_path = fig_dir / 'violin_spend_by_region.png'
prof_bar_path = fig_dir / 'barplot_profile_distribution.png'
heat_row_path = fig_dir / 'heatmap_profile_x_region_rownorm.png'
heat_col_path = fig_dir / 'heatmap_profile_x_region_colnorm.png'
heat_dept_path = fig_dir / 'heatmap_profile_x_department.png'

plot_boxplot_spend_by_region(cust_spend, CONFIG, box_path)
plot_violin_spend_by_region(cust_spend, CONFIG, violin_path)
plot_profile_distribution(cust_prof, CONFIG, prof_bar_path)

ct_row, ct_col = crosstab_profile_region(cust_prof.merge(df_prof[['customer_id','region']].drop_duplicates(), on='customer_id', how='left'))
plot_heatmap(ct_row, 'Profile vs Region (Row-normalized)', CONFIG, heat_row_path)
plot_heatmap(ct_col, 'Profile vs Region (Column-normalized)', CONFIG, heat_col_path)

ct_dept = profile_department_heatmap(df_prof, cust_prof)
plt.figure(figsize=(max(CONFIG['plot']['figsize'][0], 12), max(CONFIG['plot']['figsize'][1], 8)))
ax = sns.heatmap(ct_dept, cmap='magma')
ax.set_title('Profile vs Department (Row-normalized)')
plt.tight_layout()
plt.savefig(heat_dept_path)
plt.close()

# Top departments by profile (table)
top_depts = top_departments_by_profile(df_prof, cust_prof, top_n=10)
# Save top departments by profile
top_depts_base = Path(CONFIG['paths']['data_out']) / 'top_departments_by_profile'
export_dataframe(top_depts, top_depts_base)
TOP_DEPTS_PARQUET = str(top_depts_base.with_suffix('.parquet'))
TOP_DEPTS_CSV = str(top_depts_base.with_suffix('.csv'))


# Export Final Datasets
_Save anonymized active sample and the final dataset with derived columns (`region`, `low_activity_flag`, `profile`, `total_spend_per_customer`)._

In [None]:
# Compute total_spend_per_customer for row-level merge
total_spend = df_prof.groupby('customer_id')['price'].sum().rename('total_spend_per_customer')
df_final = df_prof.merge(total_spend, on='customer_id', how='left')

final_base = Path(CONFIG['paths']['data_out']) / CONFIG['files']['final_dataset_base']
export_dataframe(df_final[[
    'customer_id','state','region','orders_count','order_id','order_number','order_hour_of_day','order_dow',
    'age','income','dependents','department_id','price','spending_flag','low_activity_flag','profile',
    'total_spend_per_customer'
]], final_base)
FINAL_PARQUET = str(final_base.with_suffix('.parquet'))
FINAL_CSV = str(final_base.with_suffix('.csv'))
# Append final population flow step
pop_flow.append(('Final (export-ready)', len(df_final)))


# Excel Report (7 Tabs)
_Create `reports/Instacart_Final_Report.xlsx` with: 1) Details, 2) Population Flow, 3) Consistency Checks, 4) Wrangling & Security, 5) Derived Columns, 6) Visualizations, 7) Results & Recommendations._

In [None]:
# Build tables for report
population_flow_df = pd.DataFrame(pop_flow, columns=['Step','Row Count'])

# Consistency checks
checks = []
checks.append(['Duplicate rows (full)', int(df_final.duplicated().sum())])
checks.append(['Age in [0, 120]', int(((df_final['age'] < 0) | (df_final['age'] > 120) | df_final['age'].isna()).sum())])
checks.append(['Income >= 0', int(((df_final['income'] < 0) | df_final['income'].isna()).sum())])
checks.append(['No NaN in key cols', int(df_final[['customer_id','order_id','state','region']].isna().any(axis=1).sum())])
ct_sf_region = pd.crosstab(df_final['spending_flag'], df_final['region'])
consistency_checks_df = pd.DataFrame(checks, columns=['Check','Issues'])

# Wrangling & Security
wr_sec_rows = []
wr_sec_rows.append(['Column Normalization', json.dumps(norm_info.get('renamed', {}))])
wr_sec_rows.append(['Added dependents (missing → 0)', str(norm_info.get('added_dependents', False))])
wr_sec_rows.append(['PII Columns Hashed', ', '.join(RUN_META.get('pii_cols', [])) if RUN_META.get('pii_cols') else 'None'])
wr_sec_rows.append(['State Mapping Exclusions', json.dumps(RUN_META.get('excluded_states', {}))])
wrangling_security_df = pd.DataFrame(wr_sec_rows, columns=['Transformation / Security', 'Details'])

# Derived columns
derived_rows = [
    ['region', 'state', 'US Census region mapping (CONFIG.region_mapping)'],
    ['low_activity_flag', 'orders_count', '1 if orders_count < 5 else 0'],
    ['profile', 'age, income, dependents, department_id, order_dow, order_hour_of_day', 'Prioritized rule-engine (CONFIG.thresholds and dept_groups)'],
    ['total_spend_per_customer', 'price', 'Sum of item prices per customer_id']
]
derived_columns_df = pd.DataFrame(derived_rows, columns=['Column','Source','Logic'])

# Results & Recommendations (brief, auto-generated)
anova_sig = tests['anova_p'] < 0.05
kruskal_sig = tests['kruskal_p'] < 0.05
reco = []
reco.append(fill('Regional spend comparison indicates {} differences across regions (ANOVA p={:.4f}, Kruskal p={:.4f}). Tailor promotions by region, focusing on high-spend regions with premium assortments and in low-spend regions with price-sensitive offers.'
                 .format('significant' if (anova_sig or kruskal_sig) else 'no statistically significant', tests['anova_p'], tests['kruskal_p']), width=100))
top_prof = cust_prof['profile'].value_counts().idxmax()
reco.append(fill(f'The most prevalent segment is "{top_prof}". Prioritize creative, ad timing, and assortments aligned with this segment\'s shopping hours and departments.', width=100))
reco.append(fill('Use the low-activity flag to exclude low-revenue customers from cost-intensive campaigns while still nurturing them with lightweight, app-based incentives.', width=100))
reco.append(fill('Link department merchandising with profiles (see heatmaps) to optimize cross-selling and personalized banners.', width=100))

lib_versions = {
    'pandas': pd.__version__, 'numpy': np.__version__, 'matplotlib': plt.matplotlib.__version__,
    'seaborn': sns.__version__, 'scipy': 'present', 'openpyxl': openpyxl.__version__, 'pyarrow': pa.__version__
}
RUN_META['generated_at'] = datetime.now().isoformat(timespec='seconds')
RUN_META['lib_versions'] = lib_versions
RUN_META['pii_note'] = 'Applied SHA-256 hashing with salt to detected PII columns: ' + (', '.join(RUN_META['pii_cols']) if RUN_META.get('pii_cols') else 'None')

report_path = Path(CONFIG['paths']['reports']) / CONFIG['files']['excel_report']
fig_list = [
    ('Boxplot – Spend by Region', Path(box_path)),
    ('Violin – Spend by Region', Path(violin_path)),
    ('Barplot – Profile Distribution', Path(prof_bar_path)),
    ('Heatmap – Profile × Region (row-norm)', Path(heat_row_path)),
    ('Heatmap – Profile × Region (col-norm)', Path(heat_col_path)),
    ('Heatmap – Profile × Department', Path(heat_dept_path))
]
write_excel_report(
    CONFIG,
    meta={
        'dataset_name': RUN_META.get('dataset_name',''),
        'generated_at': RUN_META.get('generated_at',''),
        'lib_versions': json.dumps(RUN_META.get('lib_versions', {})),
        'pii_note': RUN_META.get('pii_note','')
    },
    population_flow=population_flow_df,
    consistency_checks=consistency_checks_df,
    wrangling_security=wrangling_security_df,
    derived_columns=derived_columns_df,
    figures=fig_list,
    results_reco=reco,
    out_path=report_path,
    crosstab_sf_region=ct_sf_region,
    top_depts=top_depts
)

# Run Summary
_Print paths to saved artifacts and key notes. The full execution log is in `logs/run.log`._

In [None]:
ARTIFACTS = {
    'Active sample (parquet)': ACTIVE_SAMPLE_PARQUET,
    'Active sample (csv)': ACTIVE_SAMPLE_CSV,
    'Final dataset (parquet)': FINAL_PARQUET,
    'Final dataset (csv)': FINAL_CSV,
    'Report (xlsx)': str(report_path),
    'Top depts (parquet)': TOP_DEPTS_PARQUET,
    'Top depts (csv)': TOP_DEPTS_CSV,
    'Figures dir': str(Path(CONFIG['paths']['figures']).resolve()),
    'Log file': str(Path(CONFIG['paths']['logs']) / 'run.log')
}
summarize_artifacts(ARTIFACTS)
LOGGER.info('Pipeline completed successfully.')