# TSFEL-Prophet Forecast Notebook

This notebook combines TSFEL feature extraction with adaptive Prophet forecasting:

1. Query time series from Victoria Metrics using a PromQL selector
2. Iterate over each time series (memory-efficient processing)
3. Extract TSFEL features and classify predictability
4. Generate Prophet forecasts with adaptive parameters based on TSFEL features
5. Save forecasts to database

**Classification-Based Forecasting:**
- **Predictable**: Prophet with seasonality enabled based on detected weekly/monthly patterns
- **Low Predictability**: Prophet with no seasonality (trend-only regression)
- **Not Suitable**: Skipped (insufficient data or poor quality)

**Key Features:**
- Memory-efficient: processes one series at a time
- Adaptive parameters: Prophet settings tuned based on TSFEL features
- Automatic seasonality detection: weekly and monthly patterns
- Database integration: saves forecasts with parameter tracking


In [None]:
# Parameters cell (tagged for papermill injection)
# These will be injected by papermill when executed by the job
# When running locally, use environment variables instead
# Note: selector, history_days, forecast parameters, and model parameters
# are hardcoded in the notebook cells where they are used (NOT passed as parameters)
# Database configuration is loaded from YAML config file using VM_JOBS_ENVIRONMENT and VM_JOBS_DB_PASSWORD
# output_results_path: if set, notebook writes a JSON file with timeseries_processed and timeseries_failed for the job to read
# vm_jobs_config_path: path to YAML config; passed by job when run as service, used by create_database_connection(..., config_path=...)
vm_query_url = ''
vm_token = ''
vm_jobs_environment = ''
vm_jobs_config_path = ''
dry_run = True
output_results_path = ''


## 1. Configuration

**Connection settings:** Set via papermill parameters or environment variables.
**Forecasting parameters:** Will be defined in the model training section.


In [None]:
# Configuration
# Connection settings: from papermill parameters or environment variables
# Database configuration is loaded automatically from YAML config using VM_JOBS_ENVIRONMENT and VM_JOBS_DB_PASSWORD
import os

# Victoria Metrics connection - from parameters/env vars (prefer parameters)
# Parameters are now individual variables (not a dict) due to papermill requirements
VM_QUERY_URL = vm_query_url if vm_query_url else os.getenv('VM_QUERY_URL', 'http://victoria-metrics:8428')
VM_TOKEN = vm_token if vm_token else os.getenv('VM_TOKEN', '')

# Environment - from parameters/env vars (prefer parameters)
VM_JOBS_ENVIRONMENT = vm_jobs_environment if vm_jobs_environment else os.getenv('VM_JOBS_ENVIRONMENT', '')
# Config file path - from papermill parameter (when run as service) or environment variable
VM_JOBS_CONFIG_PATH = vm_jobs_config_path if vm_jobs_config_path else os.getenv('VM_JOBS_CONFIG_PATH', '')

# Dry run mode: when True, don't save to database (plotting always happens)
# Handle both bool and string types from papermill
if isinstance(dry_run, bool):
    DRY_RUN = dry_run
elif isinstance(dry_run, str):
    DRY_RUN = dry_run.lower() in ('true', '1', 'yes')
else:
    DRY_RUN = os.getenv('DRY_RUN', 'false').lower() in ('true', '1', 'yes')

print(f"VM Query URL: {VM_QUERY_URL}")
print(f"VM_JOBS_ENVIRONMENT: {VM_JOBS_ENVIRONMENT or 'NOT SET (will use env var)'}")
print(f"Dry Run Mode: {DRY_RUN}")


## 2. Imports and Setup


In [None]:
import sys
import os
from pathlib import Path

# Add current directory to Python path
current_dir = str(Path.cwd())
if current_dir not in sys.path:
    sys.path.insert(0, current_dir)

import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone, date
import warnings
warnings.filterwarnings('ignore')

# Plotting libraries (for plotting)
import matplotlib.pyplot as plt
import seaborn as sns

# TSFEL imports for time series feature extraction
import tsfel

# Darts imports
from darts import TimeSeries
from darts.models import Prophet as DartsProphet

# Helper modules
from prometheus_api_client import PrometheusConnect
from database_helpers import (
    check_forecast_metadata_table_exists,
    create_database_connection,
    create_forecast_run_record,
    save_forecast_metadata_for_metric_by_id,
    save_forecasts_to_database,
    update_forecast_run_record,
)

# Set plot style (plotting always enabled)
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (15, 8)
print("Plotting enabled")

print("Imports successful")


## 3. Connect to Victoria Metrics and Query Data

**Configure your selector and history days in the cell below.**


In [None]:
# PromQL selector - EDIT THIS
SELECTOR = '{job="extractor"}'  # Your PromQL selector

# History parameter - EDIT THIS
HISTORY_DAYS = 365  # Days of history to fetch

# Forecast parameters
FORECAST_HORIZON_DAYS = 90  # Business days to forecast ahead
MIN_HISTORY_POINTS = 60  # Minimum for TSFEL feature extraction and seasonality detection

# TSFEL-specific parameters
SEASONALITY_LAG_WEEKLY = 7
SEASONALITY_LAG_MONTHLY = 30
SEASONALITY_LAG_QUARTERLY = 90
ACF_THRESHOLD = 0.3  # Threshold for seasonality detection

# Floor parameter: set to 0 to prevent negative forecasts
FORECAST_FLOOR = 0  # Minimum value for forecasts (set to None to disable)

# Connect to Victoria Metrics and query historical data
headers = {"Authorization": f"Bearer {VM_TOKEN}"} if VM_TOKEN else {}
prom = PrometheusConnect(url=VM_QUERY_URL, headers=headers, disable_ssl=True)
print(f"Connected to Victoria Metrics at {VM_QUERY_URL}")

print(f"\nQuerying: {SELECTOR}")
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=HISTORY_DAYS)
query_result = prom.custom_query_range(
    query=SELECTOR.replace("'", '"'),  # Ensure double quotes for PromQL
    start_time=start_date,
    end_time=end_date,
    step="24h"
)

print(f"Query range: {start_date.date()} to {end_date.date()}")
print(f"Query returned {len(query_result)} series")


## 4. TSFEL Feature Extraction Functions

This section contains functions for time series feature extraction using TSFEL library.


In [None]:
def regularize_time_intervals(df, freq='D', method='ffill'):
    """Regularize time intervals in a time series DataFrame.
    
    Creates a regular date range and fills missing values using statistical methods.
    This helps ensure consistent time intervals for feature extraction.
    
    Args:
        df: pandas DataFrame with 'ds' (datetime) and 'y' (value) columns
        freq: Frequency string for regular intervals (default 'D' for daily)
        method: Fill method - 'ffill' (forward fill), 'bfill' (backward fill), 
                'interpolate' (linear interpolation), or 'mean' (fill with mean)
    
    Returns:
        DataFrame with regular time intervals
    """
    if len(df) < 2:
        return df
    
    # Ensure 'ds' is datetime
    df = df.copy()
    df['ds'] = pd.to_datetime(df['ds'])
    df = df.sort_values('ds').reset_index(drop=True)
    
    # Create regular date range from first to last date
    start_date = df['ds'].min()
    end_date = df['ds'].max()
    regular_range = pd.date_range(start=start_date, end=end_date, freq=freq)
    
    # Set 'ds' as index and reindex to regular range
    df_indexed = df.set_index('ds')[['y']]
    df_regular = df_indexed.reindex(regular_range)
    
    # Fill missing values based on method
    if method == 'ffill':
        # Forward fill (carry last known value forward)
        df_regular['y'] = df_regular['y'].ffill()
        # Backward fill any remaining NaNs at the start
        df_regular['y'] = df_regular['y'].bfill()
    elif method == 'bfill':
        # Backward fill (carry next known value backward)
        df_regular['y'] = df_regular['y'].bfill()
        # Forward fill any remaining NaNs at the end
        df_regular['y'] = df_regular['y'].ffill()
    elif method == 'interpolate':
        # Linear interpolation
        df_regular['y'] = df_regular['y'].interpolate(method='linear')
        # Fill any remaining NaNs at edges
        df_regular['y'] = df_regular['y'].ffill().bfill()
    elif method == 'mean':
        # Fill with mean value
        mean_val = df['y'].mean()
        df_regular['y'] = df_regular['y'].fillna(mean_val)
    else:
        # Default: forward fill + backward fill
        df_regular['y'] = df_regular['y'].ffill().bfill()
    
    # Reset index to get 'ds' column back
    df_regular = df_regular.reset_index()
    df_regular = df_regular.rename(columns={'index': 'ds'})
    
    # Remove any remaining NaN values (shouldn't happen, but safety check)
    df_regular = df_regular.dropna()
    
    return df_regular


In [None]:
# Helper function to convert values to scalars (not Series/arrays)
def to_scalar(val):
    """Convert value to scalar if it's a Series or array."""
    if isinstance(val, pd.Series):
        return val.iloc[0] if len(val) > 0 else np.nan
    elif isinstance(val, np.ndarray):
        return val[0] if len(val) > 0 else np.nan
    elif isinstance(val, (list, tuple)):
        return val[0] if len(val) > 0 else np.nan
    return val


def feature_cv(signal):
    """Coefficient of Variation: std / mean"""
    try:
        signal = np.array(signal)
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        if len(signal) < 2:
            return np.nan
        mean_val = np.mean(signal)
        if mean_val == 0:
            return np.inf
        std_val = np.std(signal)
        return abs(std_val) / abs(mean_val) if std_val != 0 else np.nan
    except Exception:
        return np.nan


def feature_acf1(signal):
    """Lag-1 autocorrelation coefficient (ACF1)"""
    try:
        signal = np.array(signal)
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        if len(signal) < 2:
            return np.nan
        if np.var(signal) == 0:
            return np.nan
        acf1_val = np.corrcoef(signal[:-1], signal[1:])[0, 1]
        return 0.0 if np.isnan(acf1_val) else acf1_val
    except Exception:
        return np.nan


def feature_stability(signal):
    """Stability: inverse of normalized Mean Absolute Deviation (MAD)"""
    try:
        signal = np.array(signal)
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        if len(signal) < 2:
            return np.nan
        
        mean_val = np.mean(signal)
        std_val = np.std(signal)
        mad_val = np.mean(np.abs(signal - mean_val))
        
        if np.isnan(mad_val) or np.isnan(mean_val):
            return np.nan
        
        if abs(mean_val) > 0:
            normalized_mad = mad_val / abs(mean_val)
            return 1.0 / (1.0 + normalized_mad) if normalized_mad > 0 else 1.0
        else:
            # If mean is near zero, use MAD relative to std
            if not np.isnan(std_val) and std_val > 0:
                normalized_mad = mad_val / std_val
                return 1.0 / (1.0 + normalized_mad) if normalized_mad > 0 else 1.0
            else:
                return np.nan
    except Exception:
        return np.nan


def feature_weekly_autocorr(signal, lag=7, threshold=0.3):
    """Weekly seasonality autocorrelation at lag 7"""
    try:
        signal = np.array(signal)
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        if len(signal) <= lag:
            return 0.0
        autocorr = np.corrcoef(signal[:-lag], signal[lag:])[0, 1]
        return 0.0 if np.isnan(autocorr) else autocorr
    except Exception:
        return 0.0


def feature_monthly_autocorr(signal, lag=30, threshold=0.3):
    """Monthly seasonality autocorrelation at lag 30"""
    try:
        signal = np.array(signal)
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        if len(signal) <= lag:
            return 0.0
        autocorr = np.corrcoef(signal[:-lag], signal[lag:])[0, 1]
        return 0.0 if np.isnan(autocorr) else autocorr
    except Exception:
        return 0.0


def feature_quarterly_autocorr(signal, lag=90, threshold=0.3):
    """Quarterly seasonality autocorrelation at lag 90"""
    try:
        signal = np.array(signal)
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        if len(signal) <= lag:
            return 0.0
        autocorr = np.corrcoef(signal[:-lag], signal[lag:])[0, 1]
        return 0.0 if np.isnan(autocorr) else autocorr
    except Exception:
        return 0.0


def feature_outlier_ratio(signal):
    """IQR-based outlier ratio"""
    try:
        signal = np.array(signal)
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        if len(signal) < 10:
            return 0.0
        
        Q1 = np.percentile(signal, 25)
        Q3 = np.percentile(signal, 75)
        IQR = Q3 - Q1
        
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = (signal < lower_bound) | (signal > upper_bound)
        return np.sum(outliers) / len(signal)
    except Exception:
        return 0.0


def feature_changepoint_count_and_dates(df):
    """Variance-based changepoint detection that returns both count and dates.
    
    Args:
        df: pandas DataFrame with 'ds' (datetime) and 'y' (value) columns
    
    Returns:
        Tuple of (changepoint_count, changepoint_dates_list)
        changepoint_count: Integer count of changepoints
        changepoint_dates_list: List of date objects where changepoints occur
    """
    try:
        if len(df) < 10:
            return 0, []
        
        # Ensure 'ds' is datetime and sort
        df = df.copy()
        df['ds'] = pd.to_datetime(df['ds'])
        df = df.sort_values('ds').reset_index(drop=True)
        
        # Filter out NaN and Inf values while keeping dates aligned
        # Create a mask for valid values
        valid_mask = ~(df['y'].isna() | np.isinf(df['y']))
        df_clean = df[valid_mask].reset_index(drop=True)
        
        if len(df_clean) < 10:
            return 0, []
        
        # Get aligned values and dates
        values = df_clean['y'].values
        dates = df_clean['ds'].values
        
        window_size = min(20, len(values) // 4)
        if window_size < 5:
            return 0, []
        
        # Calculate rolling variance with center=True
        # This means the variance at index i is calculated from values[i-window_size//2 : i+window_size//2+1]
        # When center=True, the first window_size//2 and last window_size//2 values will be NaN
        rolling_var = pd.Series(values).rolling(window=window_size, center=True).var()
        var_mean = rolling_var.mean()
        var_std = rolling_var.std()
        threshold = var_mean + 2 * var_std if not np.isnan(var_std) and var_std > 0 else var_mean * 2
        
        # Create mask for changepoints (high variance regions)
        changepoints_mask = (rolling_var > threshold) & (~np.isnan(rolling_var))
        
        changepoint_dates = []
        in_changepoint = False
        
        
        for idx, is_cp in enumerate(changepoints_mask):
            if is_cp and not in_changepoint:
                # Find the actual changepoint by looking at value changes
                # within the high variance region, not just the detection index
                # Look ahead in the high variance region to find where values actually change
                changepoint_idx = idx
                
                # Look ahead up to window_size positions to find the actual change
                look_ahead = min(window_size, len(values) - idx - 1)
                max_change_idx = idx
                max_change = 0
                
                for j in range(1, look_ahead + 1):
                    if idx + j >= len(values):
                        break
                    # Calculate change magnitude
                    if idx + j - 1 >= 0:
                        change = abs(values[idx + j] - values[idx + j - 1])
                        if change > max_change:
                            max_change = change
                            max_change_idx = idx + j
                
                # Use the index where the largest change occurred
                changepoint_idx = max_change_idx
                
                if changepoint_idx < len(dates):
                    changepoint_date = pd.Timestamp(dates[changepoint_idx])
                    changepoint_date_date = changepoint_date.date()
                    
                    # Just store the changepoint date (previous_date calculated later)
                    changepoint_dates.append(changepoint_date_date)
                in_changepoint = True
            elif not is_cp:
                in_changepoint = False
        
        return len(changepoint_dates), changepoint_dates
    except Exception:
        return 0, []


In [None]:
def create_personalized_features_config():
    """Create TSFEL personalized features configuration dictionary"""
    personalized_features = {
        'CV': {
            'function': feature_cv,
            'name': 'Coefficient of Variation',
            'domain': 'statistical',
            'parameters': {}
        },
        'ACF1': {
            'function': feature_acf1,
            'name': 'Lag-1 Autocorrelation',
            'domain': 'temporal',
            'parameters': {}
        },
        'Stability': {
            'function': feature_stability,
            'name': 'Stability (inverse normalized MAD)',
            'domain': 'statistical',
            'parameters': {}
        },
        'Weekly Autocorr': {
            'function': lambda signal: feature_weekly_autocorr(signal, lag=SEASONALITY_LAG_WEEKLY, threshold=ACF_THRESHOLD),
            'name': 'Weekly Seasonality Autocorrelation',
            'domain': 'temporal',
            'parameters': {'lag': SEASONALITY_LAG_WEEKLY, 'threshold': ACF_THRESHOLD}
        },
        'Monthly Autocorr': {
            'function': lambda signal: feature_monthly_autocorr(signal, lag=SEASONALITY_LAG_MONTHLY, threshold=ACF_THRESHOLD),
            'name': 'Monthly Seasonality Autocorrelation',
            'domain': 'temporal',
            'parameters': {'lag': SEASONALITY_LAG_MONTHLY, 'threshold': ACF_THRESHOLD}
        },
        'Quarterly Autocorr': {
            'function': lambda signal: feature_quarterly_autocorr(signal, lag=SEASONALITY_LAG_QUARTERLY, threshold=ACF_THRESHOLD),
            'name': 'Quarterly Seasonality Autocorrelation',
            'domain': 'temporal',
            'parameters': {'lag': SEASONALITY_LAG_QUARTERLY, 'threshold': ACF_THRESHOLD}
        },
        'Outlier Ratio': {
            'function': feature_outlier_ratio,
            'name': 'IQR-based Outlier Ratio',
            'domain': 'statistical',
            'parameters': {}
        },
        'Changepoint Count': {
            'function': feature_changepoint_count_and_dates,
            'name': 'Variance-based Changepoint Count and Dates',
            'domain': 'temporal',
            'parameters': {}
        }
    }
    return personalized_features


def extract_tsfel_features(df):
    """Extract time series features using TSFEL with personalized features.
    
    Args:
        df: pandas DataFrame with 'ds' (datetime) and 'y' (value) columns
    
    Returns:
        Dictionary of TSFEL features including personalized features
    """
    features = {}
    
    if len(df) < 2:
        return features
    
    # Add basic statistics first (always available)
    values = df['y'].values
    features['mean'] = np.mean(values)
    features['std'] = np.std(values)
    features['min'] = np.min(values)
    features['max'] = np.max(values)
    features['range'] = features['max'] - features['min']
    features['data_points'] = len(df)
    
    try:
        # Prepare time series data for TSFEL
        df_prepared = regularize_time_intervals(df, freq='D', method='ffill')
        
        if len(df_prepared) < 2:
            # Calculate basic CV as fallback
            mean_val = features.get('mean', 0)
            std_val = features.get('std', 0)
            features['cv'] = std_val / mean_val if mean_val != 0 else np.inf
            return features
        
        # Clean data: remove infinite values, ensure proper types
        signal = df_prepared['y'].values.copy()
        signal = signal[~np.isnan(signal)]
        signal = signal[~np.isinf(signal)]
        
        if len(signal) < 2 or np.var(signal) == 0:
            mean_val = features.get('mean', 0)
            std_val = features.get('std', 0)
            features['cv'] = std_val / mean_val if mean_val != 0 else np.inf
            return features
        
        # Get TSFEL feature configuration for statistical and temporal domains
        cfg = tsfel.get_features_by_domain(['statistical', 'temporal'])
        
        # Extract features using TSFEL 0.2.0
        if len(signal) < 10:
            raise ValueError(f"Signal too short for TSFEL: {len(signal)} points")
        
        feature_dict = tsfel.time_series_features_extractor(cfg, signal, fs=1.0)
        
        # Extract TSFEL features using exact names from TSFEL 0.2.0
        features['mean'] = to_scalar(feature_dict.get('0_Mean', features['mean']))
        features['std'] = to_scalar(feature_dict.get('0_Standard deviation', features['std']))
        features['var'] = to_scalar(feature_dict.get('0_Variance', np.nan))
        features['entropy'] = to_scalar(feature_dict.get('0_Entropy', np.nan))
        features['autocorr_persistence'] = to_scalar(feature_dict.get('0_Autocorrelation', np.nan))
        
        # Calculate slope and trend_strength
        features['slope'] = to_scalar(feature_dict.get('0_Slope', np.nan))
        slope_val = features['slope']
        if isinstance(slope_val, (pd.Series, np.ndarray)):
            slope_val = to_scalar(slope_val)
        features['trend_strength'] = abs(slope_val) if not np.isnan(slope_val) else np.nan
        
        # Distribution shape indicators
        features['skewness'] = to_scalar(feature_dict.get('0_Skewness', np.nan))
        features['kurtosis'] = to_scalar(feature_dict.get('0_Kurtosis', np.nan))
        
        # Variability indicators
        features['interquartile_range'] = to_scalar(feature_dict.get('0_Interquartile range', np.nan))
        features['zero_crossing_rate'] = to_scalar(feature_dict.get('0_Zero crossing rate', np.nan))
        
        # Structure indicators
        features['positive_turning_points'] = to_scalar(feature_dict.get('0_Positive turning points', np.nan))
        features['negative_turning_points'] = to_scalar(feature_dict.get('0_Negative turning points', np.nan))
        features['neighbourhood_peaks'] = to_scalar(feature_dict.get('0_Neighbourhood peaks', np.nan))
        features['mean_absolute_deviation'] = to_scalar(feature_dict.get('0_Mean absolute deviation', np.nan))
        
        # Extract personalized features
        personalized_cfg = create_personalized_features_config()
        
        # Calculate personalized features
        features['cv'] = personalized_cfg['CV']['function'](signal)
        features['acf1'] = personalized_cfg['ACF1']['function'](signal)
        features['stability'] = personalized_cfg['Stability']['function'](signal)
        
        # Seasonality features
        weekly_autocorr = personalized_cfg['Weekly Autocorr']['function'](signal)
        monthly_autocorr = personalized_cfg['Monthly Autocorr']['function'](signal)
        quarterly_autocorr = personalized_cfg['Quarterly Autocorr']['function'](signal)
        
        features['weekly_autocorr'] = weekly_autocorr
        features['monthly_autocorr'] = monthly_autocorr
        features['quarterly_autocorr'] = quarterly_autocorr
        features['has_weekly_seasonality'] = abs(weekly_autocorr) > ACF_THRESHOLD
        features['has_monthly_seasonality'] = abs(monthly_autocorr) > ACF_THRESHOLD
        features['has_quarterly_seasonality'] = abs(quarterly_autocorr) > ACF_THRESHOLD
        
        # Outlier and changepoint features
        features['outlier_ratio'] = personalized_cfg['Outlier Ratio']['function'](signal)
        
        # Changepoint detection returns both count and dates (takes dataframe, not signal)
        changepoint_count, changepoint_dates = personalized_cfg['Changepoint Count']['function'](df_prepared)
        features['changepoint_count'] = float(changepoint_count) if not np.isnan(changepoint_count) else 0.0
        features['changepoint_dates'] = changepoint_dates  # List of datetime objects
        
    except Exception as tsfel_error:
        # TSFEL feature extraction failed - set all TSFEL-specific features to NaN
        features['acf1'] = np.nan
        features['autocorr_persistence'] = np.nan
        features['trend_strength'] = np.nan
        features['stability'] = np.nan
        features['entropy'] = np.nan
        features['slope'] = np.nan
        features['skewness'] = np.nan
        features['kurtosis'] = np.nan
        features['interquartile_range'] = np.nan
        features['zero_crossing_rate'] = np.nan
        features['positive_turning_points'] = np.nan
        features['negative_turning_points'] = np.nan
        features['neighbourhood_peaks'] = np.nan
        features['mean_absolute_deviation'] = np.nan
        features['var'] = features.get('std', np.nan) ** 2 if not np.isnan(features.get('std', np.nan)) else np.nan
        
        # Calculate basic CV as fallback
        mean_val = features.get('mean', np.nan)
        std_val = features.get('std', np.nan)
        if not np.isnan(mean_val) and mean_val != 0:
            features['cv'] = abs(std_val) / abs(mean_val) if not np.isnan(std_val) else np.nan
        else:
            features['cv'] = np.nan
        
        # Set seasonality and detector features to defaults
        features['has_weekly_seasonality'] = False
        features['has_monthly_seasonality'] = False
        features['has_quarterly_seasonality'] = False
        features['weekly_autocorr'] = 0.0
        features['monthly_autocorr'] = 0.0
        features['quarterly_autocorr'] = 0.0
        features['outlier_ratio'] = 0.0
        features['changepoint_count'] = 0.0
        features['changepoint_dates'] = []
    
    return features


## 5. Classification Function

Classify time series by predictability based on TSFEL features.


In [None]:
def classify_with_tsfel(features, detector_info):
    """Classify time series predictability based on TSFEL features and detectors.
    
    Args:
        features: Dictionary of TSFEL features including seasonality flags
        detector_info: Dictionary with outlier and changepoint information
    
    Returns:
        Tuple of (category, reason)
    """
    # Handle None or empty features
    if not features:
        return 'Not Suitable', 'No features extracted'
    
    if not detector_info:
        detector_info = {'outlier_ratio': 0.0, 'changepoint_count': 0}
    
    # Check for data quality issues
    outlier_ratio = detector_info.get('outlier_ratio', 0.0)
    changepoint_count = detector_info.get('changepoint_count', 0)
    
    # High outlier ratio indicates poor data quality
    if outlier_ratio > 0.2:  # More than 20% outliers
        return 'Not Suitable', f'High outlier ratio: {outlier_ratio:.2%}'
    
    # Too many changepoints indicates instability
    if changepoint_count > 5:
        return 'Not Suitable', f'Too many changepoints: {changepoint_count}'
    
    # Extract seasonality information
    has_weekly = features.get('has_weekly_seasonality', False)
    has_monthly = features.get('has_monthly_seasonality', False)
    has_quarterly = features.get('has_quarterly_seasonality', False)
    has_seasonality = has_weekly or has_monthly or has_quarterly
    
    # Extract key TSFEL features
    acf1 = features.get('acf1', np.nan)
    autocorr_persistence = features.get('autocorr_persistence', np.nan)
    trend_strength = features.get('trend_strength', np.nan)
    stability = features.get('stability', np.nan)
    cv = features.get('cv', np.nan)
    skewness = features.get('skewness', np.nan)
    kurtosis = features.get('kurtosis', np.nan)
    zero_crossing_rate = features.get('zero_crossing_rate', np.nan)
    interquartile_range = features.get('interquartile_range', np.nan)
    mean_val = features.get('mean', np.nan)
    std_val = features.get('std', np.nan)
    
    # Assess predictability indicators
    high_noise = False
    if not np.isnan(zero_crossing_rate):
        data_points = features.get('data_points', np.nan)
        if not np.isnan(data_points) and data_points > 0:
            zcr_normalized = zero_crossing_rate / data_points
            high_noise = zcr_normalized > 0.3
        else:
            high_noise = zero_crossing_rate > 30
    
    extreme_skew = False
    if not np.isnan(skewness):
        extreme_skew = abs(skewness) > 2.0
    
    heavy_tails = False
    if not np.isnan(kurtosis):
        heavy_tails = kurtosis > 5.0
    
    high_variability = False
    if not np.isnan(interquartile_range) and not np.isnan(mean_val) and abs(mean_val) > 0:
        iqr_coefficient = interquartile_range / abs(mean_val)
        high_variability = iqr_coefficient > 1.0
    
    negative_indicators = sum([high_noise, extreme_skew, heavy_tails, high_variability])
    seasonality_count = sum([has_weekly, has_monthly, has_quarterly])
    
    # Classification logic
    if has_seasonality and seasonality_count >= 1:
        if negative_indicators <= 1:
            if not np.isnan(acf1) and acf1 > 0.3:
                category = 'Predictable'
                seasonality_types = []
                if has_weekly:
                    seasonality_types.append('weekly')
                if has_monthly:
                    seasonality_types.append('monthly')
                if has_quarterly:
                    seasonality_types.append('quarterly')
                reason = f'Seasonality detected: {", ".join(seasonality_types)}'
                if not np.isnan(acf1):
                    reason += f', ACF1={acf1:.2f}'
                return category, reason
    
    if not np.isnan(acf1) and acf1 > 0.5:
        if not np.isnan(stability) and stability > 0.7:
            if negative_indicators <= 1:
                category = 'Predictable'
                reason = f'Strong autocorrelation (ACF1={acf1:.2f}) and stability ({stability:.2f})'
                return category, reason
            else:
                category = 'Low Predictability'
                reason = f'Strong autocorrelation (ACF1={acf1:.2f}) but high noise/variability'
                return category, reason
    
    if not np.isnan(acf1) and acf1 > 0.2:
        if not np.isnan(stability) and stability > 0.5:
            if not np.isnan(autocorr_persistence) and autocorr_persistence > 10 and negative_indicators <= 1:
                category = 'Predictable'
                reason = f'Moderate autocorrelation (ACF1={acf1:.2f}), high persistence (lag={autocorr_persistence:.0f}), and stability ({stability:.2f})'
                return category, reason
            else:
                category = 'Low Predictability'
                reason = f'Moderate autocorrelation (ACF1={acf1:.2f}) and stability ({stability:.2f})'
                return category, reason
    
    if not np.isnan(acf1) and acf1 < 0.2:
        category = 'Low Predictability'
        reason = f'Low autocorrelation (ACF1={acf1:.2f})'
        return category, reason
    
    if has_seasonality:
        category = 'Low Predictability'
        seasonality_types = []
        if has_weekly:
            seasonality_types.append('weekly')
        if has_monthly:
            seasonality_types.append('monthly')
        if has_quarterly:
            seasonality_types.append('quarterly')
        reason = f'Seasonality detected but weak patterns: {", ".join(seasonality_types)}'
        return category, reason
    
    if not np.isnan(acf1) or not np.isnan(trend_strength):
        return 'Low Predictability', 'Weak patterns detected'
    
    return 'Not Suitable', 'Insufficient features for classification'


In [None]:
def generate_prophet_params(classification, features, df=None):
    """Generate Prophet parameters based on classification and TSFEL features.
    
    Args:
        classification: 'Predictable', 'Low Predictability', or 'Not Suitable'
        features: Dictionary of TSFEL features
    
    Returns:
        Dictionary of Prophet parameters
    """
    if classification == 'Not Suitable':
        return None
    
    # Base parameters
    prophet_params = {
        'yearly_seasonality': False,
        'weekly_seasonality': False,
        'daily_seasonality': False,
        'seasonality_mode': 'multiplicative',
    }
    
    if classification == 'Predictable':
        # Enable seasonality based on detected patterns
        has_weekly = features.get('has_weekly_seasonality', False)
        has_monthly = features.get('has_monthly_seasonality', False)
        has_quarterly = features.get('has_quarterly_seasonality', False)
        
        # Build add_seasonalities list for custom seasonalities
        # Monthly and quarterly must be added via add_seasonalities (Prophet doesn't have built-in monthly/quarterly)
        add_seasonalities = []
        
        # Add monthly seasonality if detected
        if has_monthly:
            add_seasonalities.append({
                'name': 'monthly',
                'seasonal_periods': 30.5,
                'fourier_order': 5
            })
        
        # Add quarterly seasonality if detected
        if has_quarterly:
            add_seasonalities.append({
                'name': 'quarterly',
                'seasonal_periods': 91.25,
                'fourier_order': 5
            })
        
        # If weekly is detected, add it as custom seasonality (disable built-in)
        # This matches the pattern used in recommended parameters
        if has_weekly:
            prophet_params['weekly_seasonality'] = True
        else:
            prophet_params['weekly_seasonality'] = False
        
        # Add custom seasonalities if any are detected
        if add_seasonalities:
            prophet_params['add_seasonalities'] = add_seasonalities
        
        # Add changepoint dates if detected
        changepoint_dates = features.get('changepoint_dates', [])
        if changepoint_dates and len(changepoint_dates) > 0:
            # changepoint_dates is now a simple list of date objects
            # Calculate previous_date for each changepoint when adding to Prophet
            changepoint_dates_clean = []
            
            for cp_date in changepoint_dates:
                # Handle both tuple format (legacy) and date format (new)
                if isinstance(cp_date, tuple) and len(cp_date) == 2:
                    # Legacy format: (changepoint_date, previous_date)
                    changepoint_date, previous_date = cp_date
                    if changepoint_date is not None:
                        changepoint_dates_clean.append(changepoint_date)
                    if previous_date is not None:
                        changepoint_dates_clean.append(previous_date)
                elif isinstance(cp_date, date):
                    # New format: just the changepoint date
                    # Add the changepoint date
                    changepoint_dates_clean.append(cp_date)
                    
                    # Find previous data point in the dataframe (not just subtract 1 day)
                    previous_date = None
                    if df is not None and 'ds' in df.columns:
                        # Prepare sorted dataframe for finding previous data points
                        df_sorted = df.copy()
                        df_sorted['ds'] = pd.to_datetime(df_sorted['ds'])
                        df_sorted = df_sorted.sort_values('ds').reset_index(drop=True)
                        # Convert 'ds' to date for comparison
                        df_sorted['ds_date'] = df_sorted['ds'].dt.date
                        
                        # Find the changepoint date in the dataframe
                        cp_matches = df_sorted[df_sorted['ds_date'] == cp_date]
                        if len(cp_matches) > 0:
                            # Get the index of the first match
                            cp_idx = cp_matches.index[0]
                            # Get the previous row if it exists
                            if cp_idx > 0:
                                previous_date = df_sorted.iloc[cp_idx - 1]['ds_date']
                    
                    # If we found a previous date, add it; otherwise skip (don't add arbitrary date)
                    if previous_date is not None:
                        changepoint_dates_clean.append(previous_date)
            if changepoint_dates_clean:
                # Sort and deduplicate dates (keep as date objects)
                changepoint_dates_clean = sorted(set(changepoint_dates_clean))
                prophet_params['changepoints'] = changepoint_dates_clean
        # Adjust changepoint_prior_scale based on stability and changepoint_count
        stability = features.get('stability', np.nan)
        changepoint_count = features.get('changepoint_count', 0)
        
        if not np.isnan(stability):
            if stability > 0.7 and changepoint_count <= 2:
                # High stability, low changepoints -> smoother trend
                prophet_params['changepoint_prior_scale'] = 0.3
            elif stability > 0.5 and changepoint_count <= 3:
                # Moderate stability -> default flexibility
                prophet_params['changepoint_prior_scale'] = 0.15
            else:
                # Lower stability or more changepoints -> more flexible
                prophet_params['changepoint_prior_scale'] = 0.1
        else:
            prophet_params['changepoint_prior_scale'] = 0.05
        
        # Adjust seasonality_prior_scale based on autocorrelation
        acf1 = features.get('acf1', np.nan)
        autocorr_persistence = features.get('autocorr_persistence', np.nan)
        
        if not np.isnan(acf1) and acf1 > 0.5:
            # Strong autocorrelation -> emphasize seasonality
            prophet_params['seasonality_prior_scale'] = 6.0
        elif not np.isnan(autocorr_persistence) and autocorr_persistence > 10:
            # High persistence -> moderate seasonality emphasis
            prophet_params['seasonality_prior_scale'] = 10.0
        else:
            # Default seasonality strength
            prophet_params['seasonality_prior_scale'] = 10.0

        prophet_params['interval_width'] = 0.95
    
    elif classification == 'Low Predictability':
        # Trend-only model (no seasonality)
        prophet_params['changepoint_prior_scale'] = 0.1  # Slightly more flexible for trend-only
        prophet_params['seasonality_prior_scale'] = 1.0  # Low (not used, but set for consistency)
    
    return prophet_params


## 7. Process Series and Generate Forecasts

Iterate over each time series, extract features, classify, and generate forecasts.


In [None]:
# Process each series: extract features, classify, and forecast
forecasts_by_series = []
classification_counts = {'Predictable': 0, 'Low Predictability': 0, 'Not Suitable': 0, 'Error': 0}

# When not dry run: create DB connection and run record before the loop (for per-metric metadata and run stats)
if not DRY_RUN:
    engine, conn = create_database_connection(
        environment=VM_JOBS_ENVIRONMENT if VM_JOBS_ENVIRONMENT else None,
        config_path=VM_JOBS_CONFIG_PATH if VM_JOBS_CONFIG_PATH else None,
    )
    tsfel_params = {
        'seasonality_lag_weekly': SEASONALITY_LAG_WEEKLY,
        'seasonality_lag_monthly': SEASONALITY_LAG_MONTHLY,
        'seasonality_lag_quarterly': SEASONALITY_LAG_QUARTERLY,
        'acf_threshold': ACF_THRESHOLD,
        'regularize_freq': 'D',
        'regularize_method': 'ffill',
        'tsfel_domains': ['statistical', 'temporal'],
        'tsfel_sampling_frequency': 1.0,
        'min_history_points': MIN_HISTORY_POINTS,
        'outlier_ratio_threshold': 0.2,
        'changepoint_count_threshold': 5,
        'forecast_floor': FORECAST_FLOOR if FORECAST_FLOOR is not None else None,
    }
    representative_params = {
        'yearly_seasonality': False,
        'weekly_seasonality': False,
        'daily_seasonality': False,
        'seasonality_mode': 'additive',
        'changepoint_prior_scale': 0.05,
    }
    model_config = {'prophet_params': representative_params, 'tsfel_params': tsfel_params}
    run_id = create_forecast_run_record(
        conn=conn,
        job_id="metrics_forecast_notebooks",
        selection_value=SELECTOR,
        model_type="tsfel_prophet",
        model_config=model_config,
        model_fit_config={},
        history_days=HISTORY_DAYS,
        forecast_horizon_days=FORECAST_HORIZON_DAYS,
        min_history_points=MIN_HISTORY_POINTS,
        config_source="notebook",
    )
    loop_start = datetime.now(timezone.utc)
    print(f"Created forecast run record: run_id={run_id}")
    if not check_forecast_metadata_table_exists(conn):
        raise RuntimeError(
            "Table public.vm_metrics_forecast_metadata does not exist. "
            "Run database/vm_metrics_forecast_metadata.sql to create it."
        )
else:
    conn = None
    run_id = None
    loop_start = None

print(f"Processing {len(query_result)} time series...")
print(f"{'='*60}")

for series_idx, item in enumerate(query_result):
    metric = item.get('metric', {})
    metric_name = metric.get('__name__')
    if not metric_name:
        continue
    
    labels = {k: v for k, v in metric.items() if k != '__name__'}
    values = item.get('values', [])
    samples = [(datetime.fromtimestamp(float(ts), tz=timezone.utc), float(value)) for ts, value in values]
    
    if not samples:
        continue
    
    series_info = {'metric_name': metric_name, 'labels': labels}
    
    print(f"\nProcessing {series_idx + 1}/{len(query_result)}: {metric_name}")
    
    try:
        # Prepare data
        df = pd.DataFrame(samples, columns=['ds', 'y'])
        df['ds'] = pd.to_datetime(df['ds'], utc=True).dt.tz_localize(None)
        df = df.sort_values('ds').reset_index(drop=True)
        
        # Check minimum data points
        if len(df) < MIN_HISTORY_POINTS:
            print(f"  ⚠️  Skipping: insufficient data ({len(df)} < {MIN_HISTORY_POINTS})")
            classification_counts['Not Suitable'] += 1
            continue
        
        # Check for invalid values
        if df['y'].isna().all() or np.isinf(df['y']).any():
            print(f"  ⚠️  Skipping: invalid values")
            classification_counts['Not Suitable'] += 1
            continue
        
        # Extract TSFEL features
        print(f"  Extracting TSFEL features...")
        try:
            features = extract_tsfel_features(df)
            if not features or 'mean' not in features:
                print(f"  ⚠️  Warning: TSFEL feature extraction returned no features")
                classification_counts['Not Suitable'] += 1
                continue
        except Exception as feat_error:
            print(f"  ⚠️  Warning: TSFEL feature extraction failed: {feat_error}")
            classification_counts['Not Suitable'] += 1
            continue
        
        # Extract detector information
        outlier_ratio = features.get('outlier_ratio', 0.0)
        changepoint_count = features.get('changepoint_count', 0.0)
        data_points = features.get('data_points', 0)
        outlier_count = int(outlier_ratio * data_points) if data_points > 0 else 0
        
        detector_info = {
            'outlier_count': outlier_count,
            'outlier_ratio': float(outlier_ratio) if not np.isnan(outlier_ratio) else 0.0,
            'changepoint_count': int(changepoint_count) if not np.isnan(changepoint_count) else 0
        }
        
        # Classify series
        classification, reason = classify_with_tsfel(features, detector_info)
        print(f"  Classification: {classification} - {reason}")
        classification_counts[classification] = classification_counts.get(classification, 0) + 1
        
        # Skip if not suitable
        if classification == 'Not Suitable':
            continue
        
        # Generate Prophet parameters (pass df to find previous data points for changepoints)
        prophet_params = generate_prophet_params(classification, features, df=df)
        if prophet_params is None:
            print(f"  ⚠️  Skipping: no Prophet parameters generated")
            continue
        
        print(f"  Prophet params: {prophet_params}")
        
        # Prepare training data
        df_training = df.copy()
        
        # Add floor/cap columns if floor is specified
        prophet_params_final = prophet_params.copy()
        if FORECAST_FLOOR is not None:
            prophet_params_final['growth'] = 'logistic'
            prophet_params_final['floor'] = FORECAST_FLOOR
            prophet_params_final['cap'] = df_training['y'].max() * 1.5 if df_training['y'].max() > FORECAST_FLOOR else FORECAST_FLOOR + 1
        
        # Convert to darts TimeSeries
        series = TimeSeries.from_dataframe(df_training.set_index('ds'), fill_missing_dates=True, freq="D")
        
        # Train Prophet model
        model = DartsProphet(**prophet_params_final)
        model.fit(series)
        
        # Generate forecast
        forecast = model.predict(n=FORECAST_HORIZON_DAYS)
        forecast_df = pd.DataFrame({
            'ds': forecast.time_index,
            'yhat': forecast.values().flatten()
        })
        
        print(f"  ✓ Forecast: {len(forecast_df)} predictions")
        
        # Store forecast (include features and reason so we can save metadata in the save loop using job_idx/metric_id from save_forecasts_to_database)
        # Always store extended tuple with df_training and detector_info for plotting
        forecasts_by_series.append((series_info, forecast_df, df_training, classification, prophet_params, features, detector_info, reason))
        
    except Exception as exc:
        print(f"  ✗ Failed: {exc}")
        classification_counts['Error'] += 1
        continue

print(f"\n{'='*60}")
print(f"Processing complete: {len(forecasts_by_series)}/{len(query_result)} series forecasted")
print(f"\nClassification summary:")
for cls, count in classification_counts.items():
    if count > 0:
        print(f"  {cls}: {count}")

# Update run record with counts and completion status (when not dry run)
if not DRY_RUN and conn is not None and run_id is not None:
    try:
        duration_seconds = (datetime.now(timezone.utc) - loop_start).total_seconds() if loop_start else None
        update_forecast_run_record(
            conn=conn,
            run_id=run_id,
            series_count=len(query_result),
            success_count=len(forecasts_by_series),
            failed_count=len(query_result) - len(forecasts_by_series),
            status="completed" if len(forecasts_by_series) == len(query_result) else "partial",
            completed_at=datetime.now(timezone.utc),
            duration_seconds=duration_seconds,
        )
    except Exception as update_exc:
        print(f"Warning: failed to update run record: {update_exc}")


## 7a. Cross-Correlation Outlier Clustering

Cluster time series by similarity of outlier timing patterns to surface metrics whose anomalies co-occur (e.g., shared incidents or deployments). Uses IQR-based outlier detection, cross-correlation of binary outlier vectors, and hierarchical clustering. Visualization shows historical data with outlier points highlighted—no forecast.

In [None]:
def get_outlier_dates(df):
    """IQR-based outlier dates. Returns set of dates (as date) where outliers occur."""
    values = df['y'].values
    Q1, Q3 = np.percentile(values, [25, 75])
    IQR = Q3 - Q1
    if IQR == 0:
        return set()
    lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
    outlier_mask = (values < lower) | (values > upper)
    return set(pd.to_datetime(df.loc[outlier_mask, 'ds']).dt.date)

In [None]:
# Build binary outlier matrix and cluster series by cross-correlation of outlier timing
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import squareform

outlier_clusters = {}  # cluster_id -> list of (series_info, df_training, outlier_dates)
cluster_labels = []   # cluster label per series index

if len(forecasts_by_series) < 2:
    print("Skipping outlier clustering: need at least 2 series")
else:
    # Extract outlier dates and df_training for each series
    series_data = []
    for item in forecasts_by_series:
        series_info, forecast_df, df_training, classification, prophet_params, features, detector_info, reason = item
        outlier_dates = get_outlier_dates(df_training)
        series_data.append((series_info, df_training, outlier_dates))

    # Check if any series have outliers
    total_outliers = sum(len(od) for _, _, od in series_data)
    if total_outliers == 0:
        print("Skipping outlier clustering: no outliers detected across any series")
    else:
        # Build common date grid
        all_dates = set()
        for _, df, _ in series_data:
            all_dates.update(pd.to_datetime(df['ds']).dt.date)
        date_grid = sorted(all_dates)

        # Build binary matrix M: (n_series, n_days)
        M = np.zeros((len(series_data), len(date_grid)), dtype=float)
        for i, (_, _, outlier_dates) in enumerate(series_data):
            for j, d in enumerate(date_grid):
                M[i, j] = 1.0 if d in outlier_dates else 0.0

        # Correlation matrix (handle constant vectors)
        try:
            corr_matrix = np.corrcoef(M)
            np.nan_to_num(corr_matrix, copy=False, nan=0.0, posinf=0.0, neginf=0.0)
        except Exception:
            corr_matrix = np.eye(len(series_data))

        # Distance = 1 - abs(correlation) for clustering
        dist_matrix = 1 - np.abs(corr_matrix)
        np.fill_diagonal(dist_matrix, 0)

        # Hierarchical clustering
        condensed_dist = squareform(dist_matrix, checks=False)
        Z = linkage(condensed_dist, method='average')
        n_clusters = min(5, max(1, len(series_data) // 2))
        cluster_labels = fcluster(Z, t=n_clusters, criterion='maxclust')

        # Group by cluster
        for idx, label in enumerate(cluster_labels):
            if label not in outlier_clusters:
                outlier_clusters[label] = []
            outlier_clusters[label].append(series_data[idx])

        print(f"Outlier clustering: {len(series_data)} series -> {len(outlier_clusters)} clusters")

In [None]:
# Optional: correlation heatmap and dendrogram (separate figures for readability)
if outlier_clusters and len(series_data) >= 2:
    from scipy.cluster.hierarchy import dendrogram
    metric_labels = [s[0]['metric_name'] for s in series_data]
    n_series = len(metric_labels)
    # Heatmap: size scales with number of series for label readability
    fig1, ax1 = plt.subplots(figsize=(max(10, n_series * 0.8), max(8, n_series * 0.6)))
    sns.heatmap(corr_matrix, ax=ax1, cmap='RdYlBu_r', center=0, vmin=-1, vmax=1,
                xticklabels=metric_labels, yticklabels=metric_labels)
    ax1.set_title('Outlier timing correlation (series x series)')
    plt.setp(ax1.get_xticklabels(), rotation=45, ha='right')
    plt.tight_layout()
    plt.show()
    # Dendrogram: separate figure
    fig2, ax2 = plt.subplots(figsize=(max(12, n_series * 0.5), 6))
    dendrogram(Z, ax=ax2, labels=metric_labels, leaf_rotation=45)
    ax2.set_title('Hierarchical clustering dendrogram')
    plt.tight_layout()
    plt.show()

In [None]:
# Print cluster summary table
if outlier_clusters:
    print("Cluster summary (outlier timing cross-correlation):")
    print("-" * 60)
    for cid, members in sorted(outlier_clusters.items()):
        metric_names = [m[0]['metric_name'] for m in members]
        outlier_counts = [len(m[2]) for m in members]
        print(f"  Cluster {cid}: {len(members)} series, "
              f"{sum(outlier_counts)} total outliers")
        for name, cnt in zip(metric_names, outlier_counts):
            print(f"    - {name}: {cnt} outliers")
    print("-" * 60)

In [None]:
# Per-cluster time series plots: historical data only, outlier points highlighted (no forecast)
if outlier_clusters:
    for cid, members in sorted(outlier_clusters.items()):
        # Width accommodates legend for long metric names
        fig, ax = plt.subplots(figsize=(16, 6))
        for series_info, df_training, outlier_dates in members:
            df = df_training.copy()
            df['ds'] = pd.to_datetime(df['ds'])
            # Z-score normalize for comparable overlay
            y_mean, y_std = df['y'].mean(), df['y'].std()
            y_norm = (df['y'] - y_mean) / y_std if y_std and y_std > 0 else df['y'] - y_mean
            ax.plot(df['ds'], y_norm, '-', alpha=0.7, label=series_info['metric_name'])
            # Highlight outlier points
            outlier_mask = df['ds'].dt.date.isin(outlier_dates)
            if outlier_mask.any():
                ax.scatter(df.loc[outlier_mask, 'ds'], y_norm[outlier_mask],
                          s=80, marker='o', edgecolors='red', facecolors='none', zorder=5)
        ax.set_title(f'Cluster {cid}: {len(members)} series (z-score normalized, outliers in red)')
        ax.set_xlabel('Date')
        ax.set_ylabel('Z-score')
        ax.legend(loc='center left', bbox_to_anchor=(1.02, 0.5), fontsize=9, frameon=True)
        ax.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        plt.tight_layout(rect=[0, 0, 0.85, 1])
        plt.show()

## 8. Visualize Results

**Note:** Plots are always generated for each series showing historical data and forecasts. The `dry_run` parameter only controls whether forecasts are saved to the database.


In [None]:
# Plot forecasts for each series (always; dry_run only controls database save)
print(f"\n{'='*60}")
print("Generating plots (history + forecast for each series)")
if DRY_RUN:
    print("Dry run: no data will be saved to database")
else:
    print("Forecasts will be saved to database")
print(f"{'='*60}\n")

for plot_idx, forecast_item in enumerate(forecasts_by_series):
    # Unpack: (series_info, forecast_df, df_training, classification, prophet_params, features, detector_info, reason)
    series_info, forecast_df, df_training, classification, prophet_params, features, detector_info, reason = forecast_item

    plt.figure(figsize=(18, 10))

    # Plot historical data
    plt.plot(df_training['ds'], df_training['y'], 
            'ko-', label='Historical Data', linewidth=2, markersize=3, alpha=0.6)

    # Plot forecast trend
    plt.plot(forecast_df['ds'], forecast_df['yhat'], 
            'b--', label='Forecast (trend)', linewidth=2.5)

    # Vertical line showing where forecast starts
    last_history_date = df_training['ds'].max()
    plt.axvline(x=last_history_date, color='red', linestyle=':', 
               linewidth=2, label='Forecast Start', alpha=0.7)

    # Title and labels
    title = f"TSFEL-Prophet Forecast: {series_info['metric_name']} [{classification}]"
    if series_info['labels']:
        title += f" {series_info['labels']}"
    plt.title(title, fontsize=16, fontweight='bold')
    plt.xlabel('Date', fontsize=12)
    plt.ylabel('Value', fontsize=12)
    plt.legend(loc='best', fontsize=10)
    plt.grid(True, alpha=0.3)

    # Add classification features text below the plot (similar to classification notebook)
    if features:
        decision_lines = []
        
        # Extract key features for decision display
        acf1 = features.get('acf1', np.nan)
        autocorr_persistence = features.get('autocorr_persistence', np.nan)
        stability = features.get('stability', np.nan)
        has_weekly = features.get('has_weekly_seasonality', False)
        has_monthly = features.get('has_monthly_seasonality', False)
        has_quarterly = features.get('has_quarterly_seasonality', False)
        skewness = features.get('skewness', np.nan)
        kurtosis = features.get('kurtosis', np.nan)
        zero_crossing_rate = features.get('zero_crossing_rate', np.nan)
        interquartile_range = features.get('interquartile_range', np.nan)
        mean_val = features.get('mean', np.nan)
        data_points = features.get('data_points', np.nan)
        outlier_ratio = detector_info.get('outlier_ratio', 0.0)
        changepoint_count = detector_info.get('changepoint_count', 0)
        
        # Classification decision factors
        decision_lines.append(f"Classification: {classification}")
        if reason:
            decision_lines.append(f"Reason: {reason}")
        
        # Primary decision factors
        primary_factors = []
        if not np.isnan(acf1):
            if acf1 > 0.5:
                primary_factors.append(f"ACF1={acf1:.2f} (strong)")
            elif acf1 > 0.2:
                primary_factors.append(f"ACF1={acf1:.2f} (moderate)")
            else:
                primary_factors.append(f"ACF1={acf1:.2f} (low)")
        
        # Add autocorrelation persistence (TSFEL feature)
        if not np.isnan(autocorr_persistence):
            if autocorr_persistence > 10:
                primary_factors.append(f"Persistence=lag {autocorr_persistence:.0f} (high)")
            elif autocorr_persistence > 5:
                primary_factors.append(f"Persistence=lag {autocorr_persistence:.0f} (moderate)")
            else:
                primary_factors.append(f"Persistence=lag {autocorr_persistence:.0f} (low)")
        
        if not np.isnan(stability):
            if stability > 0.7:
                primary_factors.append(f"Stability={stability:.2f} (high)")
            elif stability > 0.5:
                primary_factors.append(f"Stability={stability:.2f} (moderate)")
            else:
                primary_factors.append(f"Stability={stability:.2f} (low)")
        
        if has_weekly or has_monthly or has_quarterly:
            seasonality_list = []
            if has_weekly:
                seasonality_list.append('weekly')
            if has_monthly:
                seasonality_list.append('monthly')
            if has_quarterly:
                seasonality_list.append('quarterly')
            primary_factors.append(f"Seasonality: {', '.join(seasonality_list)}")
        
        if primary_factors:
            decision_lines.append("Primary: " + " | ".join(primary_factors))
        
        # Negative indicators (noise/variability)
        negative_indicators = []
        
        # Zero crossing rate
        if not np.isnan(zero_crossing_rate) and not np.isnan(data_points) and data_points > 0:
            zcr_normalized = zero_crossing_rate / data_points
            if zcr_normalized > 0.3:
                negative_indicators.append(f"High noise (ZCR={zcr_normalized:.1%})")
            elif zcr_normalized > 0.1:
                negative_indicators.append(f"Moderate noise (ZCR={zcr_normalized:.1%})")
        
        # Skewness
        if not np.isnan(skewness):
            if abs(skewness) > 2.0:
                negative_indicators.append(f"Extreme skew ({skewness:.2f})")
            elif abs(skewness) > 1.0:
                negative_indicators.append(f"Moderate skew ({skewness:.2f})")
        
        # Kurtosis
        if not np.isnan(kurtosis):
            if kurtosis > 5.0:
                negative_indicators.append(f"Heavy tails (kurt={kurtosis:.2f})")
            elif kurtosis > 4.0:
                negative_indicators.append(f"Moderate tails (kurt={kurtosis:.2f})")
        
        # IQR variability
        if not np.isnan(interquartile_range) and not np.isnan(mean_val) and abs(mean_val) > 0:
            iqr_coefficient = interquartile_range / abs(mean_val)
            if iqr_coefficient > 1.0:
                negative_indicators.append(f"High variability (IQR/mean={iqr_coefficient:.2f})")
            elif iqr_coefficient > 0.5:
                negative_indicators.append(f"Moderate variability (IQR/mean={iqr_coefficient:.2f})")
        
        # Data quality issues
        if outlier_ratio > 0.2:
            negative_indicators.append(f"High outliers ({outlier_ratio:.1%})")
        elif outlier_ratio > 0.1:
            negative_indicators.append(f"Moderate outliers ({outlier_ratio:.1%})")
        
        if changepoint_count > 5:
            negative_indicators.append(f"Many changepoints ({changepoint_count})")
        elif changepoint_count > 2:
            negative_indicators.append(f"Some changepoints ({changepoint_count})")
        
        if negative_indicators:
            decision_lines.append("Concerns: " + " | ".join(negative_indicators[:3]))  # Limit to 3 concerns
        
        # Format decision text and add below plot
        if len(decision_lines) > 0:
            decision_text = "\n".join(decision_lines)
            
            # Add text below the plot
            plt.figtext(0.5, 0.02, decision_text, ha='center', va='bottom', fontsize=9, 
                       bbox=dict(boxstyle='round,pad=0.5', facecolor='lightblue', alpha=0.7),
                       wrap=True)

    plt.tight_layout(rect=[0, 0.08, 1, 0.98])  # Make room for text at bottom
    plt.show()

    print(f"  ✓ Plotted forecast for {series_info['metric_name']} [{classification}]")

print(f"\n{'='*60}")
print(f"Plotted {len(forecasts_by_series)} series")
print(f"{'='*60}")


## 9. Save Forecasts to Database

**Note:** This step is skipped in dry run mode.


In [None]:
# Save forecasts to database for each series (only if not dry run)
if not DRY_RUN:
    # Defensive: ensure database connection and run_id exist (created before the processing loop)
    if "conn" not in globals() or conn is None:
        raise RuntimeError(
            "Database connection not established (conn is missing/None). "
            "Check VM_JOBS_ENVIRONMENT, VM_JOBS_CONFIG_PATH, and VM_JOBS_DB_PASSWORD."
        )
    if "run_id" not in globals() or run_id is None:
        raise RuntimeError("Forecast run record not created (run_id is missing/None).")

    def _unpack_forecast_item(item):
        """Support both tuple shapes:
        - non-dry-run: (series_info, forecast_df, classification, prophet_params, features, reason)
        - dry-run: (series_info, forecast_df, df_training, classification, prophet_params, features, detector_info, reason)
        Returns: series_info, forecast_df, classification, prophet_params, features, reason
        """
        if not isinstance(item, (list, tuple)):
            raise ValueError(f"Unexpected forecast item type: {type(item)}")
        if len(item) == 6:
            series_info, forecast_df, classification, prophet_params, features, reason = item
            return series_info, forecast_df, classification, prophet_params, features, reason
        if len(item) >= 8:
            series_info, forecast_df, _df_training, classification, prophet_params, features, _detector_info, reason = item
            return series_info, forecast_df, classification, prophet_params, features, reason
        raise ValueError(f"Unexpected forecast item tuple length: {len(item)}")

    # Forecast types - only trend (no intervals)
    forecast_types = [{"name": "trend", "field": "yhat"}]

    print(f"\nSaving forecasts for {len(forecasts_by_series)} series...")

    total_rows_inserted = 0
    for forecast_item in forecasts_by_series:
        series_info, forecast_df, classification, prophet_params, features, reason = _unpack_forecast_item(
            forecast_item
        )

        try:
            rows_inserted, job_idx_used, metric_id_used = save_forecasts_to_database(
                conn=conn,
                metric_name=series_info['metric_name'],
                labels=series_info['labels'],
                forecast_df=forecast_df,
                forecast_types=forecast_types,
                run_id=run_id,
            )
            total_rows_inserted += rows_inserted
            # Save forecast metadata using the same (job_idx, metric_id) we just used for forecast data
            if job_idx_used is not None and metric_id_used is not None:
                try:
                    save_forecast_metadata_for_metric_by_id(
                        conn=conn,
                        run_id=run_id,
                        job_idx=job_idx_used,
                        metric_id=metric_id_used,
                        tsfel_features=features,
                        classification={"category": classification, "reason": reason},
                        prophet_params=prophet_params,
                    )
                except Exception as meta_exc:
                    print(f"  ⚠️  Forecast metadata: {meta_exc}")
            print(f"  ✓ {series_info['metric_name']} [{classification}]: {rows_inserted} rows saved")
        except Exception as exc:
            print(f"  ✗ {series_info['metric_name']}: Failed to save - {exc}")

    print(f"\nTotal: {total_rows_inserted} forecast rows saved to database")
else:
    print("Dry run mode: Skipping database save operations")


In [None]:
# Close database connection (only if not dry run)
if not DRY_RUN:
    conn.close()
    engine.dispose()
    print("Database connection closed")


In [None]:
# Write execution results for the job (timeseries_processed, timeseries_failed)
# When run via papermill, output_results_path is set and the job reads this file.
# IMPORTANT: This must never fail the notebook (DB saves may have already happened).
try:
    import json

    _query_result = globals().get("query_result") or []
    _forecasts_by_series = globals().get("forecasts_by_series") or []

    timeseries_processed = len(_forecasts_by_series)
    timeseries_failed = max(len(_query_result) - len(_forecasts_by_series), 0)

    if output_results_path:
        with open(output_results_path, "w", encoding="utf-8") as f:
            json.dump(
                {
                    "timeseries_processed": timeseries_processed,
                    "timeseries_failed": timeseries_failed,
                },
                f,
                indent=2,
            )
        print(
            f"Wrote results to {output_results_path}: "
            f"{timeseries_processed} processed, {timeseries_failed} failed/skipped"
        )
except Exception as _results_exc:
    print(f"Warning: failed to write results JSON: {_results_exc}")