<a href="https://colab.research.google.com/github/MLDreamer/Forecasting-Deployed/blob/main/Diebold_Mariano_in_Intermittent_TS_setting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
# Intermittent Demand Forecasting with Diebold-Mariano Testing


import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.ensemble import GradientBoostingRegressor
import lightgbm as lgb
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

class IntermittentDataGenerator:
    def __init__(self, seed: int = 42):
        np.random.seed(seed)

    def spare_parts_demand(self, n: int = 200, occurrence_prob: float = 0.12) -> np.ndarray:
        demand_occurs = np.random.binomial(1, occurrence_prob, n)
        demand_size = np.random.lognormal(mean=2.5, sigma=0.9, size=n)
        return demand_occurs * demand_size

    def pharmaceutical_demand(self, n: int = 200) -> np.ndarray:
        t = np.arange(n)
        seasonal_prob = 0.15 + 0.08 * np.sin(2 * np.pi * t / 52)
        seasonal_prob = np.clip(seasonal_prob, 0.05, 0.35)
        demand_occurs = np.random.binomial(1, seasonal_prob, n)
        demand_size = np.random.gamma(2, 25, n)
        return demand_occurs * demand_size

    def luxury_electronics(self, n: int = 200) -> np.ndarray:
        demand = np.zeros(n)
        launch_points = np.random.choice(range(20, n-20), 3, replace=False)

        for launch in launch_points:
            spike_duration = 15
            for i in range(spike_duration):
                if launch + i < n:
                    prob = 0.7 * np.exp(-i/5)
                    if np.random.random() < prob:
                        demand[launch + i] = np.random.lognormal(3, 0.5)

        background = np.random.binomial(1, 0.05, n) * np.random.lognormal(2, 0.3, n)
        return demand + background

class IntermittentMetrics:
    @staticmethod
    def mase(actual: np.ndarray, forecast: np.ndarray, seasonal_period: int = 1) -> float:
        if len(actual) <= seasonal_period:
            return np.inf
        naive_errors = np.abs(np.diff(actual))
        if np.mean(naive_errors) == 0:
            return 0 if np.mean(np.abs(actual - forecast)) == 0 else np.inf
        return np.mean(np.abs(actual - forecast)) / np.mean(naive_errors)

    @staticmethod
    def hit_rate(actual: np.ndarray, forecast: np.ndarray, tolerance: float = 0.1) -> float:
        forecast = np.asarray(forecast)

        non_zero_mask = actual > 0
        if np.sum(non_zero_mask) == 0:
            return 1.0
        hits = np.abs(actual[non_zero_mask] - forecast[non_zero_mask]) <= tolerance * actual[non_zero_mask]
        return np.mean(hits)

    @staticmethod
    def demand_classification_accuracy(actual: np.ndarray, forecast: np.ndarray, threshold: float = 0.5) -> float:

        forecast = np.asarray(forecast)
        actual_demand_occurs = (actual > threshold).astype(int)
        forecast_demand_occurs = (forecast > threshold).astype(int)
        return np.mean(actual_demand_occurs == forecast_demand_occurs)

    @staticmethod
    def periods_in_stock(actual: np.ndarray, forecast: np.ndarray) -> float:
        forecast = np.asarray(forecast)
        return np.mean(forecast >= actual)

    @staticmethod
    def scaled_pinball_loss(actual: np.ndarray, forecast: np.ndarray, quantile: float = 0.5) -> float:
        forecast = np.asarray(forecast)
        errors = actual - forecast
        loss = np.maximum(quantile * errors, (quantile - 1) * errors)
        scale = np.mean(actual) if np.mean(actual) > 0 else 1
        return np.mean(loss) / scale

class IntermittentForecastingMethods:
    @staticmethod
    def croston_method(series: np.ndarray, alpha: float = 0.1) -> float:
        non_zero_indices = np.where(series > 0)[0]

        if len(non_zero_indices) < 2:
            return np.mean(series[series > 0]) if np.any(series > 0) else 0

        z_hat = series[non_zero_indices[0]]
        x_hat = non_zero_indices[1] - non_zero_indices[0]

        for i in range(1, len(non_zero_indices)):
            current_demand = series[non_zero_indices[i]]
            current_interval = non_zero_indices[i] - non_zero_indices[i-1]

            z_hat = alpha * current_demand + (1 - alpha) * z_hat
            x_hat = alpha * current_interval + (1 - alpha) * x_hat

        return z_hat / x_hat if x_hat > 0 else 0

    @staticmethod
    def tsb_method(series: np.ndarray, alpha: float = 0.1, beta: float = 0.1) -> float:
        non_zero_indices = np.where(series > 0)[0]

        if len(non_zero_indices) < 3:
            return IntermittentForecastingMethods.croston_method(series, alpha)

        intervals = np.diff(non_zero_indices)
        cv_squared = (np.std(intervals) / np.mean(intervals)) ** 2 if np.mean(intervals) > 0 else 0

        bias_correction = (2 - alpha) / (2 - alpha + alpha * cv_squared)
        croston_forecast = IntermittentForecastingMethods.croston_method(series, alpha)

        return croston_forecast * bias_correction

    @staticmethod
    def lgbm_standard(series: np.ndarray, window: int = 12) -> float:
        if len(series) < window + 5:
            return np.mean(series)

        X, y = [], []
        for i in range(window, len(series)):
            window_data = series[i-window:i]

            features = [
                *window_data[-6:],
                np.sum(window_data > 0),
                np.mean(window_data[window_data > 0]) if np.any(window_data > 0) else 0,
                np.std(window_data),
                np.max(window_data),
                np.mean(np.diff(window_data)),
                np.sum(window_data[-3:] > 0),
                len(window_data) - np.where(window_data > 0)[0][-1] if np.any(window_data > 0) else len(window_data)
            ]

            X.append(features)
            y.append(series[i])

        if len(X) < 10:
            return np.mean(series)

        model = lgb.LGBMRegressor(
            objective='regression',
            n_estimators=50,
            max_depth=6,
            learning_rate=0.1,
            feature_fraction=0.8,
            bagging_fraction=0.8,
            verbose=-1,
            random_state=42
        )

        model.fit(X, y)

        last_window = series[-window:]
        features = [
            *last_window[-6:],
            np.sum(last_window > 0),
            np.mean(last_window[last_window > 0]) if np.any(last_window > 0) else 0,
            np.std(last_window),
            np.max(last_window),
            np.mean(np.diff(last_window)),
            np.sum(last_window[-3:] > 0),
            len(last_window) - np.where(last_window > 0)[0][-1] if np.any(last_window > 0) else len(last_window)
        ]

        return max(0, model.predict([features])[0])

    @staticmethod
    def lgbm_tweedie(series: np.ndarray, window: int = 12) -> float:
        if len(series) < window + 5:
            return np.mean(series)

        X, y = [], []
        for i in range(window, len(series)):
            window_data = series[i-window:i]

            features = [
                *window_data[-6:],
                np.sum(window_data > 0),
                np.mean(window_data[window_data > 0]) if np.any(window_data > 0) else 0,
                np.std(window_data),
                np.max(window_data),
                np.mean(np.diff(window_data)),
                np.sum(window_data[-3:] > 0),
                len(window_data) - np.where(window_data > 0)[0][-1] if np.any(window_data > 0) else len(window_data)
            ]

            X.append(features)
            y.append(series[i])

        if len(X) < 10:
            return np.mean(series)

        model = lgb.LGBMRegressor(
            objective='tweedie',
            tweedie_variance_power=1.5,
            n_estimators=50,
            max_depth=6,
            learning_rate=0.1,
            feature_fraction=0.8,
            bagging_fraction=0.8,
            verbose=-1,
            random_state=42
        )

        model.fit(X, y)

        last_window = series[-window:]
        features = [
            *last_window[-6:],
            np.sum(last_window > 0),
            np.mean(last_window[last_window > 0]) if np.any(last_window > 0) else 0,
            np.std(last_window),
            np.max(last_window),
            np.mean(np.diff(last_window)),
            np.sum(last_window[-3:] > 0),
            len(last_window) - np.where(last_window > 0)[0][-1] if np.any(last_window > 0) else len(last_window)
        ]

        return max(0, model.predict([features])[0])

class DieboldMarianoFramework:
    def __init__(self, loss_function: str = 'absolute'):
        self.loss_function = loss_function
        self.results_cache = {}

    def _compute_loss_differential(self, errors1: np.ndarray, errors2: np.ndarray) -> np.ndarray:
        if self.loss_function == 'absolute':
            d = np.abs(errors1) - np.abs(errors2)
        elif self.loss_function == 'squared':
            d = errors1**2 - errors2**2
        elif self.loss_function == 'asymmetric':
            def asymmetric_loss(error):
                return np.where(error > 0, 3 * error, -error)
            d = asymmetric_loss(errors1) - asymmetric_loss(errors2)
        else:
            raise ValueError(f"Unsupported loss function: {self.loss_function}")

        return d

    def statistical_test(self, model1_name: str, model1_errors: np.ndarray,
                        model2_name: str, model2_errors: np.ndarray) -> Dict:
        cache_key = f"{model1_name}_vs_{model2_name}_{self.loss_function}"

        if cache_key in self.results_cache:
            return self.results_cache[cache_key]

        d = self._compute_loss_differential(model1_errors, model2_errors)
        n = len(d)

        if n < 10:
            return {
                'test_statistic': 0,
                'p_value': 1.0,
                'significant': False,
                'interpretation': 'Insufficient data for reliable test',
                'sample_size': n
            }

        d_mean = np.mean(d)
        gamma_0 = np.var(d, ddof=1)

        if gamma_0 == 0:
            result = {
                'test_statistic': 0,
                'p_value': 1.0,
                'significant': False,
                'interpretation': 'Models have identical performance',
                'sample_size': n
            }
        else:
            dm_stat = d_mean / np.sqrt(gamma_0 / n)
            p_value = 2 * (1 - stats.norm.cdf(np.abs(dm_stat)))

            if p_value < 0.01:
                significance = "Highly significant"
                confidence = "99%+"
            elif p_value < 0.05:
                significance = "Significant"
                confidence = "95%+"
            elif p_value < 0.10:
                significance = "Marginally significant"
                confidence = "90%+"
            else:
                significance = "Not significant"
                confidence = f"{(1-p_value)*100:.1f}%"

            if p_value < 0.05:
                winner = model1_name if dm_stat < 0 else model2_name
                loser = model2_name if dm_stat < 0 else model1_name
                business_rec = f"Deploy {winner} - statistically superior to {loser}"
            else:
                business_rec = "No significant difference - consider other factors"

            result = {
                'test_statistic': dm_stat,
                'p_value': p_value,
                'significant': p_value < 0.05,
                'significance_level': significance,
                'confidence': confidence,
                'winner': winner if p_value < 0.05 else None,
                'interpretation': f"{significance} difference",
                'business_recommendation': business_rec,
                'sample_size': n,
                'mean_loss_diff': d_mean
            }

        self.results_cache[cache_key] = result
        return result

class IntermittentBenchmark:
    def __init__(self):
        self.methods = {
            'Croston': IntermittentForecastingMethods.croston_method,
            'TSB': IntermittentForecastingMethods.tsb_method,
            'LGBM': IntermittentForecastingMethods.lgbm_standard,
            'LGBM_Tweedie': IntermittentForecastingMethods.lgbm_tweedie
        }

        self.dm_framework = DieboldMarianoFramework(loss_function='absolute')
        self.results = []
        self.statistical_results = []

    def evaluate_methods(self, data: np.ndarray, dataset_name: str):
        split_point = int(0.8 * len(data))
        train, test = data[:split_point], data[split_point:]

        forecasts = {}
        errors = {}
        metrics = IntermittentMetrics()

        for method_name, method_func in self.methods.items():
            try:
                method_forecasts = []
                method_errors = []

                for i in range(len(test)):
                    if i == 0:
                        current_train = train.copy()
                    else:
                        current_train = np.concatenate([train, test[:i]])

                    if len(current_train) < 10:
                        forecast = np.mean(current_train[current_train > 0]) if np.any(current_train > 0) else 0
                    else:
                        forecast = method_func(current_train)

                    actual = test[i]
                    method_forecasts.append(max(0, forecast))
                    method_errors.append(actual - max(0, forecast))

                forecasts[method_name] = np.array(method_forecasts)
                errors[method_name] = np.array(method_errors)

                mae = np.mean(np.abs(method_errors))
                mase = metrics.mase(test, method_forecasts)
                hit_rate = metrics.hit_rate(test, method_forecasts)
                demand_acc = metrics.demand_classification_accuracy(test, method_forecasts)
                periods_in_stock = metrics.periods_in_stock(test, method_forecasts)

                self.results.append({
                    'Dataset': dataset_name,
                    'Method': method_name,
                    'MAE': mae,
                    'MASE': mase,
                    'Hit_Rate': hit_rate,
                    'Demand_Classification_Accuracy': demand_acc,
                    'Periods_In_Stock': periods_in_stock
                })

            except Exception as e:
                print(f"Error with {method_name}: {str(e)}")
                continue

        method_names = list(errors.keys())
        for i in range(len(method_names)):
            for j in range(i + 1, len(method_names)):
                method1, method2 = method_names[i], method_names[j]

                result = self.dm_framework.statistical_test(
                    method1, errors[method1],
                    method2, errors[method2]
                )

                self.statistical_results.append({
                    'Dataset': dataset_name,
                    'Method1': method1,
                    'Method2': method2,
                    'DM_Statistic': result['test_statistic'],
                    'P_Value': result['p_value'],
                    'Significant': result['significant'],
                    'Winner': result.get('winner', 'None'),
                    'Business_Recommendation': result['business_recommendation']
                })

    def run_comprehensive_evaluation(self):
        data_gen = IntermittentDataGenerator(seed=42)
        datasets = {
            'Automotive_Parts': data_gen.spare_parts_demand(n=200, occurrence_prob=0.12),
            'Pharmaceutical': data_gen.pharmaceutical_demand(n=200),
            'Luxury_Electronics': data_gen.luxury_electronics(n=200)
        }

        for dataset_name, data in datasets.items():
            self.evaluate_methods(data, dataset_name)

        results_df = pd.DataFrame(self.results)
        stats_df = pd.DataFrame(self.statistical_results)

        return results_df, stats_df

def demonstrate_metric_inadequacy():
    np.random.seed(123)
    data_gen = IntermittentDataGenerator(seed=123)
    demo_data = data_gen.spare_parts_demand(n=120, occurrence_prob=0.08)

    split_point = int(0.8 * len(demo_data))
    train, test = demo_data[:split_point], demo_data[split_point:]

    croston_forecasts = []
    lgbm_forecasts = []

    for i in range(len(test)):
        if i == 0:
            current_train = train.copy()
        else:
            current_train = np.concatenate([train, test[:i]])

        if len(current_train) < 10:
            croston_forecast = np.mean(current_train[current_train > 0]) if np.any(current_train > 0) else 0
            lgbm_forecast = croston_forecast
        else:
            croston_forecast = IntermittentForecastingMethods.croston_method(current_train)
            lgbm_forecast = IntermittentForecastingMethods.lgbm_standard(current_train)

        croston_forecasts.append(max(0, croston_forecast))
        lgbm_forecasts.append(max(0, lgbm_forecast))

    croston_errors = test - np.array(croston_forecasts)
    lgbm_errors = test - np.array(lgbm_forecasts)

    croston_mae = np.mean(np.abs(croston_errors))
    lgbm_mae = np.mean(np.abs(lgbm_errors))

    dm_test = DieboldMarianoFramework(loss_function='absolute')
    result = dm_test.statistical_test('Croston', croston_errors, 'LGBM', lgbm_errors)

    return {
        'croston_mae': croston_mae,
        'lgbm_mae': lgbm_mae,
        'statistical_result': result,
        'test_data': test,
        'croston_forecasts': croston_forecasts,
        'lgbm_forecasts': lgbm_forecasts
    }

def comprehensive_metric_comparison():
    print("="*80)
    print("WHY TRADITIONAL METRICS FAIL FOR INTERMITTENT TIME SERIES")
    print("="*80)

    np.random.seed(42)
    data_gen = IntermittentDataGenerator(seed=42)

    datasets = {
        'Automotive_Parts': data_gen.spare_parts_demand(n=150, occurrence_prob=0.10),
        'Pharmaceutical': data_gen.pharmaceutical_demand(n=150),
        'Luxury_Electronics': data_gen.luxury_electronics(n=150)
    }

    all_results = []

    for dataset_name, data in datasets.items():
        print(f"\n📊 DATASET: {dataset_name}")
        print(f"Intermittency Rate: {np.sum(data == 0)/len(data)*100:.1f}% zeros")

        split_point = int(0.8 * len(data))
        train, test = data[:split_point], data[split_point:]

        methods = {
            'Croston': IntermittentForecastingMethods.croston_method,
            'TSB': IntermittentForecastingMethods.tsb_method,
            'LGBM_Tweedie': IntermittentForecastingMethods.lgbm_tweedie
        }

        method_results = {}
        method_errors = {}

        for method_name, method_func in methods.items():
            try:
                forecasts = []
                errors = []

                for i in range(len(test)):
                    if i == 0:
                        current_train = train.copy()
                    else:
                        current_train = np.concatenate([train, test[:i]])

                    if len(current_train) < 15:
                        forecast = np.mean(current_train[current_train > 0]) if np.any(current_train > 0) else 0
                    else:
                        forecast = method_func(current_train)

                    forecasts.append(max(0, forecast))
                    errors.append(test[i] - max(0, forecast))

                forecasts = np.array(forecasts)
                errors = np.array(errors)

                metrics = IntermittentMetrics()

                result = {
                    'Dataset': dataset_name,
                    'Method': method_name,
                    'MAE': np.mean(np.abs(errors)),
                    'MASE': metrics.mase(test, forecasts),
                    'Hit_Rate': metrics.hit_rate(test, forecasts, tolerance=0.15),
                    'Demand_Accuracy': metrics.demand_classification_accuracy(test, forecasts),
                    'Service_Level': metrics.periods_in_stock(test, forecasts),
                    'Forecasts': forecasts,
                    'Errors': errors
                }

                method_results[method_name] = result
                method_errors[method_name] = errors
                all_results.append(result)

                print(f"\n{method_name}:")
                print(f"  MAE: {result['MAE']:.3f}")
                print(f"  MASE: {result['MASE']:.3f}")
                print(f"  Hit Rate: {result['Hit_Rate']:.3f}")
                print(f"  Demand Accuracy: {result['Demand_Accuracy']:.3f}")
                print(f"  Service Level: {result['Service_Level']:.3f}")

            except Exception as e:
                print(f"  ❌ {method_name} failed: {str(e)}")
                continue

        if len(method_errors) >= 2:
            print(f"\n🔬 DIEBOLD-MARIANO STATISTICAL TESTS:")
            dm_framework = DieboldMarianoFramework()

            method_names = list(method_errors.keys())
            for i in range(len(method_names)):
                for j in range(i + 1, len(method_names)):
                    method1, method2 = method_names[i], method_names[j]

                    dm_result = dm_framework.statistical_test(
                        method1, method_errors[method1],
                        method2, method_errors[method2]
                    )

                    significance = "✅ SIGNIFICANT" if dm_result['significant'] else "❌ NOT SIGNIFICANT"
                    winner_text = f" → Winner: {dm_result.get('winner', 'None')}" if dm_result['significant'] else ""

                    print(f"  {method1} vs {method2}: p={dm_result['p_value']:.4f} {significance}{winner_text}")

    return all_results

def production_deployment_example():
    print("\n" + "="*80)
    print("PRODUCTION DEPLOYMENT DECISION FRAMEWORK")
    print("="*80)

    np.random.seed(100)
    data_gen = IntermittentDataGenerator(seed=100)
    prod_data = data_gen.spare_parts_demand(n=200, occurrence_prob=0.08)

    split_point = int(0.8 * len(prod_data))
    train, test = prod_data[:split_point], prod_data[split_point:]

    current_model_forecasts = []
    new_model_forecasts = []

    for i in range(len(test)):
        if i == 0:
            current_train = train.copy()
        else:
            current_train = np.concatenate([train, test[:i]])

        if len(current_train) < 15:
            current_forecast = np.mean(current_train[current_train > 0]) if np.any(current_train > 0) else 0
            new_forecast = current_forecast
        else:
            current_forecast = IntermittentForecastingMethods.croston_method(current_train)
            new_forecast = IntermittentForecastingMethods.lgbm_tweedie(current_train)

        current_model_forecasts.append(max(0, current_forecast))
        new_model_forecasts.append(max(0, new_forecast))

    current_errors = test - np.array(current_model_forecasts)
    new_errors = test - np.array(new_model_forecasts)

    print(f"\n📈 TRADITIONAL METRIC COMPARISON:")
    current_mae = np.mean(np.abs(current_errors))
    new_mae = np.mean(np.abs(new_errors))
    improvement = (current_mae - new_mae) / current_mae * 100

    print(f"Current Model (Croston) MAE: {current_mae:.4f}")
    print(f"New Model (LGBM+Tweedie) MAE: {new_mae:.4f}")
    print(f"Improvement: {improvement:.1f}%")

    if improvement > 0:
        print("🎯 Traditional Decision: DEPLOY new model (better MAE)")
    else:
        print("🎯 Traditional Decision: KEEP current model (better MAE)")

    metrics = IntermittentMetrics()
    print(f"\n📊 PROPER INTERMITTENT METRICS:")

    current_mase = metrics.mase(test, current_model_forecasts)
    new_mase = metrics.mase(test, new_model_forecasts)
    current_hit_rate = metrics.hit_rate(test, current_model_forecasts)
    new_hit_rate = metrics.hit_rate(test, new_model_forecasts)
    current_service = metrics.periods_in_stock(test, current_model_forecasts)
    new_service = metrics.periods_in_stock(test, new_model_forecasts)

    print(f"MASE - Current: {current_mase:.3f}, New: {new_mase:.3f}")
    print(f"Hit Rate - Current: {current_hit_rate:.3f}, New: {new_hit_rate:.3f}")
    print(f"Service Level - Current: {current_service:.3f}, New: {new_service:.3f}")

    dm_framework = DieboldMarianoFramework()
    dm_result = dm_framework.statistical_test('Current_Model', current_errors, 'New_Model', new_errors)

    print(f"\n🧮 DIEBOLD-MARIANO STATISTICAL TEST:")
    print(f"Test Statistic: {dm_result['test_statistic']:.4f}")
    print(f"P-value: {dm_result['p_value']:.6f}")
    print(f"Significant at α=0.05: {dm_result['significant']}")

    if dm_result['significant']:
        print(f"Statistical Winner: {dm_result.get('winner', 'None')}")
        print(f"Confidence Level: {dm_result.get('confidence', 'Unknown')}")
    else:
        print("Result: NO statistically significant difference")

    print(f"\n💼 PRODUCTION DEPLOYMENT DECISION:")
    if dm_result['significant'] and dm_result['p_value'] < 0.01:
        decision = f"✅ DEPLOY {dm_result['winner']} - High Confidence (p < 0.01)"
        risk = "Low risk deployment"
    elif dm_result['significant'] and dm_result['p_value'] < 0.05:
        decision = f"⚠️ DEPLOY {dm_result['winner']} - Medium Confidence (p < 0.05)"
        risk = "Medium risk deployment - monitor closely"
    else:
        decision = "❌ DO NOT DEPLOY - Difference not statistically significant"
        risk = "High risk - performance difference likely random noise"

    print(f"Decision: {decision}")
    print(f"Risk Assessment: {risk}")
    print(f"Business Recommendation: {dm_result['business_recommendation']}")

    return {
        'traditional_metrics': {'current_mae': current_mae, 'new_mae': new_mae, 'improvement': improvement},
        'proper_metrics': {'mase_current': current_mase, 'mase_new': new_mase, 'hit_rate_current': current_hit_rate, 'hit_rate_new': new_hit_rate},
        'statistical_result': dm_result,
        'deployment_decision': decision
    }

if __name__ == "__main__":
    print("🚀 INTERMITTENT TIME SERIES FORECASTING: METRICS THAT ACTUALLY MATTER")

    comprehensive_results = comprehensive_metric_comparison()

    production_results = production_deployment_example()

    print("\n" + "="*80)
    print("KEY TAKEAWAYS:")
    print("="*80)
    print("1. MAE alone is insufficient for intermittent time series")
    print("2. Use MASE, Hit Rate, and Service Level for business relevance")
    print("3. ALWAYS validate with Diebold-Mariano before deployment")
    print("4. Statistical significance prevents costly deployment mistakes")
    print("5. Proper metrics align with inventory management objectives")


🚀 INTERMITTENT TIME SERIES FORECASTING: METRICS THAT ACTUALLY MATTER
WHY TRADITIONAL METRICS FAIL FOR INTERMITTENT TIME SERIES

📊 DATASET: Automotive_Parts
Intermittency Rate: 90.7% zeros

Croston:
  MAE: 4.560
  MASE: 0.730
  Hit Rate: 0.000
  Demand Accuracy: 0.100
  Service Level: 0.900

TSB:
  MAE: 4.502
  MASE: 0.721
  Hit Rate: 0.000
  Demand Accuracy: 0.100
  Service Level: 0.900

LGBM_Tweedie:
  MAE: 4.235
  MASE: 0.678
  Hit Rate: 0.000
  Demand Accuracy: 0.133
  Service Level: 0.900

🔬 DIEBOLD-MARIANO STATISTICAL TESTS:
  Croston vs TSB: p=0.0000 ✅ SIGNIFICANT → Winner: TSB
  Croston vs LGBM_Tweedie: p=0.0565 ❌ NOT SIGNIFICANT
  TSB vs LGBM_Tweedie: p=0.1095 ❌ NOT SIGNIFICANT

📊 DATASET: Pharmaceutical
Intermittency Rate: 82.7% zeros

Croston:
  MAE: 19.621
  MASE: 0.676
  Hit Rate: 0.125
  Demand Accuracy: 0.267
  Service Level: 0.733

TSB:
  MAE: 19.191
  MASE: 0.661
  Hit Rate: 0.000
  Demand Accuracy: 0.267
  Service Level: 0.733

LGBM_Tweedie:
  MAE: 20.350
  MASE: 0.701