
# 📊 Logistics KPI Analysis and Redesign Evaluation

This Python script performs a detailed quantitative analysis of logistics performance data, focusing on evaluating the impact of process redesigns. It computes key performance indicators (KPIs), simulates improvements, and derives feature importance using both **Shapley values** and **AHP (Analytic Hierarchy Process)**.

## 📦 Features

* **Data Cleaning**: Handles inconsistent column names and missing or malformed numeric data.
* **KPI Calculation**: Computes logistics KPIs like delivery delay, rescheduling percentage, and crew efficiency.
* **Scenario Simulation**: Applies predefined improvement rates to simulate a post-intervention scenario.
* **Impact Analysis**: Quantifies KPI variations before and after redesign.
* **Shapley Weights**: Assigns importance scores to each KPI using Shapley value approximations.
* **AHP Weighting**: Offers an alternative, expert-driven weighting system via pairwise comparisons with consistency checking.

---

## ⚙️ Main Functions

| Function                                                           | Description                                                                             |
| ------------------------------------------------------------------ | --------------------------------------------------------------------------------------- |
| `clean_column_names(df)`                                           | Standardizes column names by removing symbols and trimming whitespace.                  |
| `clean_data(df)`                                                   | Converts strings to numeric, fills missing values, and handles locale-specific formats. |
| `apply_improvements(df, improvements)`                             | Applies relative KPI improvements to simulate a redesigned scenario.                    |
| `calculate_kpis(df)`                                               | Computes derived KPIs from raw operational metrics.                                     |
| `analyze_redesign(df_before, df_after)`                            | Compares before/after KPI performance and calculates Shapley weights.                   |
| `calculate_shapley_weights(improvements)`                          | Approximates Shapley values for KPI importance through sampling.                        |
| `build_ahp_weights(kpi_list, input_matrix=None, interactive=True)` | Builds and validates an AHP matrix, returning normalized weights and consistency ratio. |
| `format_results(results)`                                          | Formats analysis results into a human-readable table.                                   |

---

## 📈 KPIs Tracked

* `NumeroRisorse`: Crew resources assigned
* `NumeroConsegneEffettuate`: Deliveries per day
* `LeadTimeEfficiency`: Inverse delay relative to a 2h threshold
* `PercentualeFlessibili`: Flexibility from UPL (unit per load)
* `RitardoMedioConsegna`: Average delivery delay
* `CustomerSatisfaction`: Proxy from rescheduling frequency
* `PercentualeConsegneRiprogrammate`: Share of rescheduled deliveries

---

## 🧠 Weighting Methods

### 🔹 Shapley Values (Data-Driven)

Estimates KPI contributions using permutations and marginal improvements. Useful for understanding empirical impact.

### 🔸 AHP (Expert-Driven)

Supports pairwise KPI importance evaluation, calculates consistency ratio, and provides normalized weights.

---

## 🛠️ Usage

1. Place your CSV file as `dataset.csv` in the same directory.
2. Run the script to perform the redesign analysis:

```bash
python your_script_name.py
```

3. For AHP-based weighting:

```bash
python your_script_name.py  # interactive mode
```

Follow the prompts to enter pairwise comparisons for KPIs.

---

## 📌 Dependencies

* `pandas`
* `numpy`
* `tqdm`
* `logging`

Install via pip:

```bash
pip install pandas numpy tqdm
```

---

## 📝 Notes

* KPIs with negative improvement rates are considered "bad" (e.g., delay, rescheduling).
* The geometric mean is used for Shapley coalition valuation to preserve scale and handle multiplicative effects.
* AHP consistency ratio (CR) should ideally be < 0.1 for reliable judgments.

# 1. Loading dataset, computing Dataset To Be

In [None]:
import numpy as np
import pandas as pd
from itertools import combinations
from tqdm import tqdm
import logging
import warnings
from scipy.stats import beta
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple, Optional, Any

warnings.filterwarnings('ignore')

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('kpi_analysis.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


In [None]:
import itertools
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from scipy.optimize import minimize
from scipy.stats import beta
import matplotlib.pyplot as plt

# ---------------------------
# SHAPLEY VALUE FUNCTIONS
# ---------------------------

def kpi_score(kpi_subset, data_AsIs, data_ToBe):
    """
    Score function: sum of % improvements of KPIs in subset.
    data_AsIs, data_ToBe: dict or dataframe with KPI values
    kpi_subset: set or list of KPI names to consider
    """
    improvement_sum = 0.0
    for kpi in kpi_subset:
        if data_AsIs[kpi] == 0:
            continue
        improvement = (data_ToBe[kpi] - data_AsIs[kpi]) / data_AsIs[kpi]
        improvement_sum += improvement
    return improvement_sum

def compute_shapley_values(kpis, data_AsIs, data_ToBe):
    n = len(kpis)
    shapley_vals = dict.fromkeys(kpis, 0.0)
    
    for kpi in kpis:
        contributions = []
        others = [x for x in kpis if x != kpi]
        
        for i in range(n):
            subsets = list(itertools.combinations(others, i))
            for subset in subsets:
                subset = set(subset)
                with_kpi = subset | {kpi}
                
                score_without = kpi_score(subset, data_AsIs, data_ToBe)
                score_with = kpi_score(with_kpi, data_AsIs, data_ToBe)
                
                marginal_contrib = score_with - score_without
                
                weight = (np.math.factorial(len(subset)) * np.math.factorial(n - len(subset) - 1)) / np.math.factorial(n)
                contributions.append(weight * marginal_contrib)
        
        shapley_vals[kpi] = sum(contributions)
    
    total = sum(shapley_vals.values())
    for k in shapley_vals:
        shapley_vals[k] /= total if total != 0 else 1
    
    return shapley_vals

# ---------------------------
# CONFIGURATION
# ---------------------------
KPI_IMPROVEMENTS: Dict[str, float] = {
    'Valore medio della merce consegnata in un giro [€/giro]': 0.05,
    '# giorni lavorati nel mese': 0.0,
    '# clienti  serviti nel mese': 0.06,
    'Delta (orario di fine montaggio EFFETTIVO - orario di fine montaggio CONCORDATO) [min]': -0.40,
    '# consegne ripianificate per problemi nel TRASPORTO': -0.50,
    '# consegne ripianificate per problemi nel MONTAGGIO di "TIPO 1" (camere, armadio e camerette)': -0.30,
    '# consegne ripianificate per problemi nel MONTAGGIO di "TIPO 2" (cucine e soggiorni)': -0.30,
    '# consegne ripianificate per problemi nel MONTAGGIO  di "TIPO 3" (arredo bagno, tavoli, sedie)': -0.25,
    '# consegne ripianificate per CAUSE LATO CLIENTE FINALE (assenza) e AZIENDA COMMITTENTE (es. pezzi mancanti)': -0.15
}

RI_DICT: Dict[int, float] = {
    1: 0.00, 2: 0.00, 3: 0.58, 4: 0.90, 5: 1.12,
    6: 1.24, 7: 1.32, 8: 1.41, 9: 1.45, 10: 1.49
}

# ---------------------------
# FUNCTIONS
# ---------------------------

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    # Drop 'Unnamed: 14' column if it exists
    if 'Unnamed: 14' in df.columns:
        df = df.drop(columns=['Unnamed: 14'])
    
    # Identify columns to convert to numeric (all except some categorical columns)
    numeric_cols = [
        col for col in df.columns 
        if col not in ['Id', 'Mese', 'Id Equipaggio', 'Classe']
    ]
    
    # Convert numeric columns that might be strings with commas as decimal separator
    for col in numeric_cols:
        if df[col].dtype == object:
            df[col] = pd.to_numeric(df[col].str.replace(',', '.'), errors='coerce')
    
    # Return dataframe unchanged otherwise (no dropping rows)
    return df


def compute_indicator(df: pd.DataFrame) -> pd.Series:
    total_rescheduling = (
        df['# consegne ripianificate per problemi nel TRASPORTO'] +
        df['# consegne ripianificate per problemi nel MONTAGGIO di "TIPO 1" (camere, armadio e camerette)'] +
        df['# consegne ripianificate per problemi nel MONTAGGIO di "TIPO 2" (cucine e soggiorni)'] +
        df['# consegne ripianificate per problemi nel MONTAGGIO  di "TIPO 3" (arredo bagno, tavoli, sedie)'] +
        df['# consegne ripianificate per CAUSE LATO CLIENTE FINALE (assenza) e AZIENDA COMMITTENTE (es. pezzi mancanti)']
    )
    
    abs_delta = np.abs(df['Delta (orario di fine montaggio EFFETTIVO - orario di fine montaggio CONCORDATO) [min]'])
    denominator = (abs_delta + 1e-5) * (total_rescheduling + 1)
    
    numerator = (
        df['# clienti  serviti nel mese'] *
        df['UPL'] *
        df['Valore medio della merce consegnata in un giro [€/giro]']
    )
    
    return numerator / denominator

def fit_beta_distribution(data: pd.Series) -> Tuple[float, float]:
    # Normalizza in [0,1] per fit Beta
    data_min, data_max = data.min(), data.max()
    if data_max == data_min:
        return 2.0, 2.0  # caso costante
    
    data_norm = (data - data_min) / (data_max - data_min)
    filtered_data = data_norm[(data_norm > 0) & (data_norm < 1)]
    
    if len(filtered_data) < 10:
        m = filtered_data.mean() if len(filtered_data) > 0 else 0.5
        m = np.clip(m, 0.01, 0.99)
        k = 2.0
        alpha_fb = m * k + 1
        beta_fb = (1 - m) * k + 1
        return alpha_fb, beta_fb

    def negative_log_likelihood(params: List[float]) -> float:
        a, b = params
        if a <= 0 or b <= 0:
            return np.inf
        return -np.sum(beta.logpdf(filtered_data, a, b))

    result = minimize(negative_log_likelihood, x0=[2, 2], bounds=[(0.01, 10), (0.01, 10)])
    if result.success:
        return result.x
    else:
        return 2.0, 2.0

def simulate_from_beta(alpha: float, beta_param: float, size: int, data_min=0, data_max=1) -> np.ndarray:
    samples = beta.rvs(alpha, beta_param, size=size)
    return samples * (data_max - data_min) + data_min

def apply_improvements(df: pd.DataFrame, improvements: Dict[str, float]) -> pd.DataFrame:
    df_improved = df.copy()
    for kpi, change in improvements.items():
        if kpi in df_improved.columns:
            df_improved[kpi] = df_improved[kpi] * (1 + change)
    return df_improved

def plot_distributions(real_data: pd.Series, sim_data_as_is: np.ndarray, sim_data_to_be: np.ndarray, title: str) -> None:
    plt.figure(figsize=(10, 6))
    plt.hist(real_data, bins=50, alpha=0.4, label="Real", density=True)
    plt.hist(sim_data_as_is, bins=50, alpha=0.4, label="Simulated As-Is", density=True)
    plt.hist(sim_data_to_be, bins=50, alpha=0.4, label="Simulated To-Be", density=True)
    plt.title(title)
    plt.xlabel("Acceptance Rate")
    plt.ylabel("Density")
    plt.legend()
    plt.show()

def prepare_shap_data(df_as_is: pd.DataFrame, df_to_be: pd.DataFrame, kpi_list: List[str]) -> Tuple[Dict[str, float], Dict[str, float]]:
    data_AsIs = {}
    data_ToBe = {}
    for kpi in kpi_list:
        if kpi in df_as_is.columns and kpi in df_to_be.columns:
            data_AsIs[kpi] = df_as_is[kpi].mean()
            data_ToBe[kpi] = df_to_be[kpi].mean()
        else:
            data_AsIs[kpi] = 0
            data_ToBe[kpi] = 0
    return data_AsIs, data_ToBe

def run_shapley_analysis(df_clean: pd.DataFrame, df_after: pd.DataFrame) -> Dict[str, float]:
    shap_kpis = list(KPI_IMPROVEMENTS.keys())
    data_AsIs, data_ToBe = prepare_shap_data(df_clean, df_after, shap_kpis)
    shapley_weights = compute_shapley_values(shap_kpis, data_AsIs, data_ToBe)
    
    print("\n--- Shapley Values for KPIs ---")
    for kpi, weight in shapley_weights.items():
        print(f"{kpi}: {weight:.4f}")
    return shapley_weights

# ---------------------------
# MAIN PIPELINE
# ---------------------------

def main_ahp_shapley(path_to_excel: str):
    df = pd.read_csv(path_to_excel)
    df.columns = [col.replace("\n", " ") for col in df.columns]
    
    df_clean = clean_data(df)
    
    indicator = compute_indicator(df_clean)
    print(f"Indicator stats: mean={indicator.mean():.4f}, min={indicator.min():.4f}, max={indicator.max():.4f}")
    
    data_min, data_max = indicator.min(), indicator.max()
    alpha_fb, beta_fb = fit_beta_distribution(indicator)
    print(f"Fitted Beta params: alpha={alpha_fb:.4f}, beta={beta_fb:.4f}")
    
    size = len(indicator)
    sim_data_as_is = simulate_from_beta(alpha_fb, beta_fb, size, data_min, data_max)
    
    df_after = apply_improvements(df_clean, KPI_IMPROVEMENTS)
    indicator_to_be = compute_indicator(df_after)
    sim_data_to_be = simulate_from_beta(alpha_fb, beta_fb, size, data_min, data_max)
    
    plot_distributions(indicator, sim_data_as_is, sim_data_to_be, "Acceptance Rate Distribution: Real vs Simulated As-Is & To-Be")
    
    shapley_weights = run_shapley_analysis(df_clean, df_after)
    
    return {
        "df_clean": df_clean,
        "df_after": df_after,
        "sim_data_as_is": sim_data_as_is,
        "sim_data_to_be": sim_data_to_be,
        "shapley_weights": shapley_weights
    }
    
#TODO: based on pct change, normalize all of them , fit it to 1, 3 or 5 punteggio

# ---------------------------
# ESEMPIO USO
# ---------------------------

if __name__ == "__main__":
    results = main_ahp_shapley("dataset.csv")
