In [1]:
import numpy as np
import pandas as pd
from typing import List, Tuple, Optional
from dataclasses import dataclass
from scipy.stats import variation
from gluonts.evaluation.metrics import quantile_loss
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error, precision_score, recall_score, f1_score
@dataclass
class CrostonResult:
    forecast: np.ndarray
    demand_series: np.ndarray
    interval_series: np.ndarray
    alpha: float
    initial_demand: float
    initial_interval: float

class CrostonModel:
    """
    Croston's method for intermittent demand forecasting that can handle multiple time series.
    """
    
    def __init__(self, alpha: float = 0.1):
        """
        Initialize the Croston model.
        
        Args:
            alpha: Smoothing parameter between 0 and 1
        """
        self.alpha = alpha
    
    def fit(self, data: np.ndarray) -> List[CrostonResult]:
        """
        Fit the Croston model to multiple time series.

        Args:
            data: 2D array where each row is a time series

        Returns:
            List of CrostonResult objects containing the model state for each series
        """
        results = []
        for series in data:
            non_zero_indices = np.where(series > 0)[0]

            if len(non_zero_indices) == 0:
                # Handle all-zero series
                results.append(CrostonResult(
                    forecast=np.zeros(len(series)),
                    demand_series=np.zeros(len(series)),
                    interval_series=np.zeros(len(series)),
                    alpha=self.alpha,
                    initial_demand=0,
                    initial_interval=0
                ))
                continue

            # Extract demand and intervals
            demand = series[non_zero_indices]
            intervals = np.diff(non_zero_indices, prepend=-1)
            intervals[0] = non_zero_indices[0] + 1  # Handle first interval

            # Initialize demand and interval series
            demand_series = np.zeros(len(series))
            interval_series = np.zeros(len(series))

            # Initial values
            initial_demand = demand[0]
            initial_interval = intervals[0]

            # Update series
            current_demand = initial_demand
            current_interval = initial_interval

            last_non_zero_idx = non_zero_indices[0]

            for i in range(len(series)):
                if series[i] > 0:
                    current_demand = self.alpha * series[i] + (1 - self.alpha) * current_demand
                    if i > 0:
                        # Find previous non-zero index
                        prev_non_zero = np.where(series[:i] > 0)[0]
                        if len(prev_non_zero) > 0:
                            interval = i - prev_non_zero[-1]
                        else:
                            interval = i + 1  # Only current non-zero so far
                        current_interval = self.alpha * interval + (1 - self.alpha) * current_interval
                        last_non_zero_idx = i

                demand_series[i] = current_demand
                interval_series[i] = current_interval

            # Calculate forecasts
            forecast = np.where(interval_series > 0, demand_series / interval_series, 0)

            results.append(CrostonResult(
                forecast=forecast,
                demand_series=demand_series,
                interval_series=interval_series,
                alpha=self.alpha,
                initial_demand=initial_demand,
                initial_interval=initial_interval
            ))

        return results
    
    def predict(self, fitted_models: List[CrostonResult], steps: int = 1) -> np.ndarray:
        """
        Predict future values for multiple time series.
        
        Args:
            fitted_models: List of fitted CrostonResult objects
            steps: Number of steps ahead to forecast
            
        Returns:
            2D array of forecasts (n_series x steps)
        """
        forecasts = []
        for model in fitted_models:
            last_demand = model.demand_series[-1]
            last_interval = model.interval_series[-1]
            
            if last_interval <= 0:
                forecasts.append(np.zeros(steps))
            else:
                forecasts.append(np.full(steps, last_demand / last_interval))
        
        return np.array(forecasts)

class CrostonEvaluator:
    """
    Evaluator for Croston model with all the requested metrics.
    """
    
    @staticmethod
    def mean_absolute_scaled_error(y_true: np.ndarray, y_pred: np.ndarray, y_train: np.ndarray) -> float:
        """
        Calculate mean absolute scaled error (MASE).
        """
        n = len(y_true)
        scaling_factor = np.mean(np.abs(np.diff(y_train)))
        errors = np.abs(y_true - y_pred) / scaling_factor
        return np.mean(errors)
    
    @staticmethod
    def root_mean_square_scaled_error(y_true: np.ndarray, y_pred: np.ndarray, y_train: np.ndarray) -> float:
        """
        Calculate root mean square scaled error (RMSSE).
        """
        n = len(y_true)
        scaling_factor = np.mean(np.square(np.diff(y_train)))
        errors = np.square(y_true - y_pred) / scaling_factor
        return np.sqrt(np.mean(errors))
    
    @staticmethod
    def quantile_loss(y_true: np.ndarray, y_pred: np.ndarray, q: float = 0.5) -> float:
        """
        Calculate quantile loss.
        """
        errors = y_true - y_pred
        return np.mean(np.maximum(q * errors, (q - 1) * errors))
    
    @staticmethod
    def evaluate(
        y_true: np.ndarray,
        y_pred: np.ndarray,
        y_train: np.ndarray,
        test_month_num: int = 1
    ) -> dict:
        """
        Comprehensive evaluation of forecasts.
        
        Args:
            y_true: True values (n_series x n_timesteps)
            y_pred: Predicted values (n_series x n_timesteps)
            y_train: Training values (n_series x n_train_timesteps)
            test_month_num: Number of test timesteps per series
            
        Returns:
            Dictionary of evaluation metrics
        """
        metrics = {}
        
        # Flatten arrays for overall metrics
        y_true_flat = y_true.flatten()
        y_pred_flat = y_pred.flatten()
        
        # Non-zero metrics
        non_zero_mask = y_true_flat > 0
        y_true_non_zero = y_true_flat[non_zero_mask]
        y_pred_non_zero = y_pred_flat[non_zero_mask]
        
        # R2 score
        metrics['R2'] = r2_score(y_true_flat, y_pred_flat)
        
        # RMSE
        metrics['RMSE'] = mean_squared_error(y_true_flat, y_pred_flat, squared=False)
        metrics['RMSE_non_zero'] = mean_squared_error(y_true_non_zero, y_pred_non_zero, squared=False) if len(y_true_non_zero) > 0 else np.nan
        
        # MAE
        metrics['MAE'] = mean_absolute_error(y_true_flat, y_pred_flat)
        metrics['MAE_non_zero'] = mean_absolute_error(y_true_non_zero, y_pred_non_zero) if len(y_true_non_zero) > 0 else np.nan
        
        # MASE
        mase_values = []
        mase_non_zero_values = []
        for i in range(len(y_true)):
            y_true_i = y_true[i]
            y_pred_i = y_pred[i]
            y_train_i = y_train[i]
            
            mase = CrostonEvaluator.mean_absolute_scaled_error(y_true_i, y_pred_i, y_train_i)
            mase_values.append(mase)
            
            # Non-zero MASE
            non_zero_mask = y_true_i > 0
            if np.any(non_zero_mask):
                mase_nz = CrostonEvaluator.mean_absolute_scaled_error(
                    y_true_i[non_zero_mask], 
                    y_pred_i[non_zero_mask], 
                    y_train_i
                )
                mase_non_zero_values.append(mase_nz)
        
        metrics['MASE'] = np.nanmean(mase_values)
        metrics['MASE_non_zero'] = np.nanmean(mase_non_zero_values) if mase_non_zero_values else np.nan
        
        # RMSSE
        rmsse_values = []
        rmsse_non_zero_values = []
        for i in range(len(y_true)):
            y_true_i = y_true[i]
            y_pred_i = y_pred[i]
            y_train_i = y_train[i]
            
            rmsse = CrostonEvaluator.root_mean_square_scaled_error(y_true_i, y_pred_i, y_train_i)
            rmsse_values.append(rmsse)
            
            # Non-zero RMSSE
            non_zero_mask = y_true_i > 0
            if np.any(non_zero_mask):
                rmsse_nz = CrostonEvaluator.root_mean_square_scaled_error(
                    y_true_i[non_zero_mask], 
                    y_pred_i[non_zero_mask], 
                    y_train_i
                )
                rmsse_non_zero_values.append(rmsse_nz)
        
        metrics['RMSSE'] = np.nanmean(rmsse_values)
        metrics['RMSSE_non_zero'] = np.nanmean(rmsse_non_zero_values) if rmsse_non_zero_values else np.nan
        
        # Quantile Loss
        testmonth_num = 1
        LS_list = [(0,1)]  #L表示相对于第1个预测时间t0的QuantileLoss区间起始点，S表示QuantileLoss区间长度
        for LS_pair in LS_list:
            L = LS_pair[0]
            S = LS_pair[1]
            QuantileLoss2 = 0.0
            test_y2 = 0.0
            num = int(len(y_pred_flat) / testmonth_num)  ##时间序列sku数量
            for i in range(num):
                QuantileLoss2 += quantile_loss(y_true_flat[(testmonth_num * i + L):(testmonth_num * i + L + S)],
                                              y_pred_flat[(testmonth_num * i + L):(testmonth_num * i + L + S)],
                                              q=0.5)
                test_y2 += np.sum(y_true_flat[(testmonth_num * i + L):(testmonth_num * i + L + S)])
            #print(QuantileLoss2,test_y2)
            QuantileLoss2 = QuantileLoss2 / test_y2
            
            print('L=%d,S=%d,rou=50%%时的QuantileLoss值：%f' % (L,S,QuantileLoss2))
            metrics[f'QuantileLoss_50'] = QuantileLoss2
        test_y_01 = [1 if x == 0 else 0 for x in y_true_flat] #0为正类，1为负类
        test_predict_01 = [1 if x == 0 else 0 for x in y_pred_flat]
        metrics['Precision'] = precision_score(test_y_01, test_predict_01)
        metrics['Recall'] = recall_score(test_y_01, test_predict_01)
        metrics['F1'] = f1_score(test_y_01, test_predict_01)
        from sklearn.metrics import confusion_matrix
        print("真实零值分布:", np.unique(test_y_01, return_counts=True))
        print("预测零值分布:", np.unique(test_predict_01, return_counts=True))
        print("混淆矩阵:\n", confusion_matrix(test_y_01, test_predict_01))
        '''
        # Zero metrics
        y_true_01 = (y_true_flat == 0).astype(int)
        y_pred_01 = (y_pred_flat == 0).astype(int)
        
        if len(np.unique(y_true_01)) > 1 and len(np.unique(y_pred_01)) > 1:
            metrics['Precision'] = precision_score(y_true_01, y_pred_01)
            metrics['Recall'] = recall_score(y_true_01, y_pred_01)
            metrics['F1'] = f1_score(y_true_01, y_pred_01)
        else:
            metrics['Precision'] = np.nan
            metrics['Recall'] = np.nan
            metrics['F1'] = np.nan
        '''
        # Segment metrics
        segment_metrics = CrostonEvaluator.segment_evaluation(y_true_flat, y_pred_flat)
        metrics.update(segment_metrics)
        
        return metrics
    
    @staticmethod
    def segment_evaluation(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
        """
        Calculate segment evaluation metrics.
        """
        segments = {
            '0': (0, 0),
            '0_50': (0, 50),
            '50_100': (50, 100),
            '100_above': (100, np.inf)
        }
        
        results = {}
        
        for name, (lower, upper) in segments.items():
            if lower == upper:
                mask = y_true == lower
            elif upper == np.inf:
                mask = y_true > lower
            else:
                mask = (y_true > lower) & (y_true <= upper)
                
            y_true_seg = y_true[mask]
            y_pred_seg = y_pred[mask]
            
            if len(y_true_seg) > 0:
                results[f'MAE_{name}'] = mean_absolute_error(y_true_seg, y_pred_seg)
                results[f'RMSE_{name}'] = mean_squared_error(y_true_seg, y_pred_seg, squared=False)
            else:
                results[f'MAE_{name}'] = np.nan
                results[f'RMSE_{name}'] = np.nan
                
        return results

# Example usage with your data:

# Load the data
#data = pd.read_csv('E:\ZIP-DeepAR代码\data\InterSim层次聚类后的Q料202001-202302(halfmonth).csv', index_col=0)
#data = pd.read_csv('E:\ZIP-DeepAR代码\data\salestv_data.csv', index_col=0)
data = pd.read_csv('E:\ZIP-DeepAR代码\data\carpartsdelete70.csv', index_col=0)
# Convert to numpy array (excluding the first column which is the part number)
ts_data = data.iloc[:, :].values

# Split into train and test
train_size = 47 #47 72 1909
train_data = ts_data[:, :train_size]
test_data = ts_data[:, train_size:]

# Initialize and fit the model
model = CrostonModel(alpha=0.1)
fitted_models = model.fit(train_data)

# Make predictions
predictions = model.predict(fitted_models, steps=test_data.shape[1])

# Evaluate
evaluator = CrostonEvaluator()
metrics = evaluator.evaluate(test_data, predictions, train_data)

# Print metrics
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")

NameError: name 'test_y' is not defined

In [7]:
metrics

{'R2': 0.6568023107026171,
 'RMSE': 2.0999644899666943,
 'RMSE_non_zero': 2.7744911195247357,
 'MAE': 1.068019106486348,
 'MAE_non_zero': 1.5565091770761292,
 'MASE': 1.6605150204581687,
 'MASE_non_zero': 2.610491334623011,
 'RMSSE': 0.8201849585891773,
 'RMSSE_non_zero': 1.1198853468089724,
 'QuantileLoss_50': 0.7472769896049746,
 'Precision': 0.0,
 'Recall': 0.0,
 'F1': 0.0,
 'MAE_0': 0.662668797055579,
 'RMSE_0': 1.2967272069859148,
 'MAE_0_50': 1.5288567972707663,
 'RMSE_0_50': 2.5943145864799333,
 'MAE_50_100': 19.40204325171845,
 'RMSE_50_100': 23.891746478071703,
 'MAE_100_above': 39.3106244355605,
 'RMSE_100_above': 47.38884605348456}

In [8]:
predictions

array([[0.97982548, 0.97982548, 0.97982548, 0.97982548],
       [0.20475479, 0.20475479, 0.20475479, 0.20475479],
       [0.53870639, 0.53870639, 0.53870639, 0.53870639],
       ...,
       [0.99053972, 0.99053972, 0.99053972, 0.99053972],
       [0.84887271, 0.84887271, 0.84887271, 0.84887271],
       [1.93342931, 1.93342931, 1.93342931, 1.93342931]])

In [1]:
import numpy as np
import pandas as pd

# 定义 Croston 方法
def croston_forecast(data, alpha, beta):
    non_zero_data = data[data > 0]
    demand = np.mean(non_zero_data)
    intervals = np.where(data > 0)[0]

    if len(intervals) > 0:
        interval_mean = np.mean(np.diff(intervals))
    else:
        interval_mean = 1  # 如果没有非零需求，设置为1

    forecast_demand = demand
    forecast_interval = interval_mean

    for d in data:
        if d > 0:
            forecast_demand = alpha * d + (1 - alpha) * forecast_demand
            forecast_interval = beta * (1) + (1 - beta) * forecast_interval

    return forecast_demand * forecast_interval

# 给定的时间序列数据
data = np.array([100.0, 0.0, 0.0, 1200.0, 0.0, 3385.0, 388.0, 375.0,
                 222.0, 155.0, 55.0, 0.0, 0.0, 0.0, 0.0, 87.0,
                 0.0, 110.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
                 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
                 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
                 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 34.902, 0.0,
                 0.0, 0.0, 0.0, 60.0, 200.0, 538.0, 55.0,
                 420.0, 540.0, 430.0, 605.0, 174.374, 128.0,
                 0.0, 36.632, 0.0, 0.0, 50.0, 0.0, 60.0,
                 0.0, 0.0, 247.91, 0.0, 50.0, 98.426, 27.97,
                 0.0, 170.344, 0.0])

# 不同的 alpha 和 beta 值
alpha_values = [0.1, 0.2, 0.3]
beta_values = [0.1, 0.2, 0.3]

# 计算预测值
results = {}
for alpha in alpha_values:
    for beta in beta_values:
        forecast_value = croston_forecast(data, alpha, beta)
        results[(alpha, beta)] = forecast_value

# 输出结果
for (alpha, beta), value in results.items():
    print(f"Alpha: {alpha}, Beta: {beta}, Forecast Value: {value:.2f}")

Alpha: 0.1, Beta: 0.1, Forecast Value: 220.22
Alpha: 0.1, Beta: 0.2, Forecast Value: 204.31
Alpha: 0.1, Beta: 0.3, Forecast Value: 203.78
Alpha: 0.2, Beta: 0.1, Forecast Value: 147.85
Alpha: 0.2, Beta: 0.2, Forecast Value: 137.17
Alpha: 0.2, Beta: 0.3, Forecast Value: 136.81
Alpha: 0.3, Beta: 0.1, Forecast Value: 126.47
Alpha: 0.3, Beta: 0.2, Forecast Value: 117.33
Alpha: 0.3, Beta: 0.3, Forecast Value: 117.02
