# Use SVM to Classify Time Series, Shuffling the Timepoints

In [7]:
from __future__ import annotations
import numpy as np
import pandas as pd
import glob
from pathlib import Path
from typing import Dict, Optional
import sys
import os
import re
from collections import defaultdict
import torch
import time

# Add src directory to Python path
sys.path.append(str(Path.cwd().parent.parent.parent / "src"))

# Import custom modules
from classifiers.svm_classifier import svm_classifier
from models.TF_transformer import TFTransformer, ModelCfg


# Import sklearn modules for SVM
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler

%load_ext autoreload
%autoreload 2

In [8]:
def add_binary_labels(df: pd.DataFrame, column: str) -> pd.DataFrame:
    """Return a copy of ``df`` with a new ``label`` column.

    Parameters
    ----------
    df:
        DataFrame containing the simulation results.
    column:
        Column on which to base the 50/50 split.  Values greater than the
        halfway point of the sorted column are labelled ``1``; the rest are
        labelled ``0``.
    """
    labelled = df.copy()
    
    # Sort the values in the specified column to find the median split point
    order = np.argsort(labelled[column].values)
    
    # Calculate the midpoint index for a 50/50 binary classification split
    midpoint = len(labelled) // 2
    
    # Initialize all labels as 0 (lower half of sorted values)
    labels = np.zeros(len(labelled), dtype=int)
    
    # Assign label 1 to the upper half (values above median)
    labels[order[midpoint:]] = 1
    labelled["label"] = labels
    return labelled


def grid_search_svm(
    df: pd.DataFrame,
    param_grid: Optional[Dict[str, list[float]]] = None,
) -> Dict[str, Dict[str, float]]:
    """Grid search SVM hyperparameters for RBF and linear kernels."""
   
    # Extract labels
    y = df["label"].values

    # Extract features (all columns except 'label')
    X = df.drop(columns=["label"]).values
    
    # Split data into training and testing sets with stratification to maintain class balance
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Define default hyperparameter grid if none provided
    # C: regularization parameter (controls overfitting)
    # gamma: kernel coefficient for RBF kernel (controls decision boundary complexity)
    param_grid = param_grid or {
        "C": [0.1, 1, 10],
        "gamma": [0.01, 0.1, 1],
    }
    
    results: Dict[str, Dict[str, float]] = {}
    
    # Test both RBF (radial basis function) and linear kernels
    for kernel in ("rbf", "linear"):
        # Perform 5-fold cross-validation grid search to find optimal hyperparameters
        search = GridSearchCV(SVC(kernel=kernel), param_grid, cv=5)
        
        # Fit the grid search on training data
        search.fit(X_train, y_train)
        
        # Make predictions on test set using the best found parameters
        predictions = search.best_estimator_.predict(X_test)
        
        # Store results including accuracy and best hyperparameters
        results[kernel] = {
            "accuracy": accuracy_score(y_test, predictions),
            "C": float(search.best_params_["C"]),
            "gamma": float(search.best_params_["gamma"]),
        }
    return results


Load the data, deal with variable-length time series, add labels and then concatenate data

In [9]:
##### Set up directory paths for data loading ######
BASE_DIR = Path.cwd().parent  # Gets one directory up from current working directory
OUT_DIR = BASE_DIR
SYNTHETIC_DIR = BASE_DIR / "data_6" 
RESULTS_CSV = "IY010_simulation_parameters_6.csv"
results_csv_path = BASE_DIR / RESULTS_CSV # results csv file, helps to label
results = pd.read_csv(results_csv_path)
# Keep only successful simulations with complete statistics
results = results[results["success"]].dropna(
    subset=["mu_observed", "cv_observed", "t_ac_observed"]
)
# take the first 10 rows for a smaller dataset
# results = results.head(10)

# Create binary labels based on the specified target column (e.g., mu_target), this converts continuous target values into a binary classification problem
label_column = "mu_target"
labelled_results = add_binary_labels(results, label_column)
##### Set up directory paths for data loading ######

In [10]:
##### Deal with variable-length time series, add labels, concatenate data ######
min_length = float('inf')
max_length = 0
column_structures = {}

# find minimum length across all trajs
for i in range(len(results)):
    # Path to the data CSV file 
    trajectory_filename = results["trajectory_filename"].values[i]
    DATA_CSV = SYNTHETIC_DIR / trajectory_filename
    data = pd.read_csv(DATA_CSV)
    
    # Track length and column structure
    length = len(data)
    cols = list(data.columns)
    
    min_length = min(min_length, length)
    max_length = max(max_length, length)
    column_structures[trajectory_filename] = {
        'length': length,
        'columns': cols,
        'num_cols': len(cols)
    }

print(f"📏 Trajectory statistics:")
print(f"   Minimum length: {min_length} rows")
print(f"   Maximum length: {max_length} rows")
print(f"   Column count range: {min([s['num_cols'] for s in column_structures.values()])} - {max([s['num_cols'] for s in column_structures.values()])}")

# Find the minimum number of columns across all files
min_columns = min([s['num_cols'] for s in column_structures.values()])
print(f"   Minimum columns: {min_columns}")

# Create standardized dataset with the same length, with consistent column labels (t_0, t_1, t_2, etc.), then assign a label column
print(f"\n🔧 Creating standardized dataset...")
labelled_data_list = []
for i in range(len(results)):
    # Path to the data CSV file 
    trajectory_filename = results["trajectory_filename"].values[i]
    DATA_CSV = SYNTHETIC_DIR / trajectory_filename
    data = pd.read_csv(DATA_CSV)
    
    # Truncate columns to minimum (take last N columns - to make sure steady state)
    data_standardised = data.iloc[:, -min_columns:].copy()
    
    # Rename columns to be consistent (t_0, t_1, t_2, etc.) - this is crucial so that the resulting df doesnt contain missing data.
    new_columns = [f"t_{j}" for j in range(min_columns)]
    data_standardised.columns = new_columns
    
    # Find the label for this trajectory filename from labelled_results, and add to the standardised data
    label_value = labelled_results[labelled_results['trajectory_filename'] == trajectory_filename]['label'].iloc[0]
    data_standardised['label'] = label_value
    labelled_data_list.append(data_standardised)
    
# Concatenate all standardized data
labelled_data = pd.concat(labelled_data_list, ignore_index=True)
# Verify no NaN values
nan_count = labelled_data.isnull().sum().sum()
print(f"   NaN values: {nan_count}")

##### Deal with variable-length time series, add labels, concatenate data ######

📏 Trajectory statistics:
   Minimum length: 50 rows
   Maximum length: 50 rows
   Column count range: 144 - 1999
   Minimum columns: 144

🔧 Creating standardized dataset...
   NaN values: 0


SVM

In [11]:
# =========================================================
# Prepare Features and Labels for SVM
# =========================================================
df = labelled_data.copy()
# Extract labels
y = df["label"].values

# Extract features (all columns except 'label')
X = df.drop(columns=["label"]).values

print(f"Data preparation for SVM:")
print(f"  Feature matrix shape: {X.shape}")
print(f"  Labels shape: {y.shape}")
print(f"  Number of classes: {len(np.unique(y))}")
print(f"  Class distribution: {np.bincount(y)}")
print(f"  Memory usage: {X.nbytes / 1024**2:.2f} MB")

# Check for any NaN or infinite values
if np.any(np.isnan(X)):
    print("⚠️  Warning: NaN values detected in features")
if np.any(np.isinf(X)):
    print("⚠️  Warning: Infinite values detected in features")
    
print("✅ Data ready for SVM classification!")

Data preparation for SVM:
  Feature matrix shape: (49000, 144)
  Labels shape: (49000,)
  Number of classes: 2
  Class distribution: [24500 24500]
  Memory usage: 53.83 MB
✅ Data ready for SVM classification!


In [None]:
# SVM Parameters (using defaults from svm_classifier function)
SVM_C = 1.0           # Regularization parameter
SVM_GAMMA = 'scale'   # Kernel coefficient 
SVM_KERNEL = 'rbf'    # Kernel type

# Train/test split ratio
TEST_SPLIT = 0.2
RANDOM_STATE = 42

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=TEST_SPLIT, 
    random_state=RANDOM_STATE,
    stratify=y  # Ensure balanced split across classes
)

# Record training time
start_time = time.time()

# Train SVM using the imported svm_classifier function
svm_accuracy = svm_classifier(
    X_train, X_test, y_train, y_test,
    svm_C=SVM_C,
    svm_gamma=SVM_GAMMA, 
    svm_kernel=SVM_KERNEL,
    print_classification_report=True,
    print_confusion_matrix=True,
)

training_time = time.time() - start_time
print(f"⏱️  SVM ({SVM_KERNEL}) training and evaluation time: {training_time:.2f} seconds")



Does it make a difference if we apply scaling before training?

In [None]:
# Scale features before SVM training
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Record training time
start_time = time.time()

# Train SVM using the imported svm_classifier function
svm_accuracy = svm_classifier(
    X_train, X_test, y_train, y_test,
    svm_C=SVM_C,
    svm_gamma=SVM_GAMMA, 
    svm_kernel=SVM_KERNEL,
    print_classification_report=True,
    print_confusion_matrix=True,
)

training_time = time.time() - start_time
print(f"⏱️  SVM ({SVM_KERNEL}) training and evaluation time: {training_time:.2f} seconds")



NameError: name 'StandardScaler' is not defined


# =========================================================
# Experiment: Does Temporal Order Matter for Classification?
What if we shuffle the time series before testing? Does it affect the SVM performance?
# =========================================================


In [None]:
# =========================================================
# Experiment: Does Temporal Order Matter for Classification?
# =========================================================

# Set random seed for reproducibility
np.random.seed(RANDOM_STATE)

# Create a copy of the dataframe for shuffling
df_shuffled = df.copy()

# Shuffle each row (time series) individually, keeping the label column intact
for i in range(df_shuffled.shape[0]):
    # Get the feature columns (exclude 'label' column)
    feature_cols = [col for col in df_shuffled.columns if col != 'label']
    
    # Extract the row's feature values as numpy array
    row_features = df_shuffled.loc[i, feature_cols].values
    
    # Shuffle the features in-place
    np.random.shuffle(row_features)
    
    # Assign the shuffled features back to the dataframe
    df_shuffled.loc[i, feature_cols] = row_features

y_shuffled = df_shuffled["label"].values
X_shuffled = df_shuffled.drop(columns=["label"]).values
# split the data
X_train_shuffled, X_test_shuffled, y_train_shuffled, y_test_shuffled = train_test_split(
    X_shuffled, 
    y_shuffled, 
    test_size=TEST_SPLIT,
    random_state=RANDOM_STATE,
    stratify=y_shuffled  # Ensure balanced split across classes
)
# Record training time
start_time = time.time()

# Train SVM using X_train_shuffled and y_train_shuffled but test with shuffled data
svm_accuracy = svm_classifier(
    X_train_shuffled, X_test_shuffled, y_train_shuffled, y_test_shuffled,
    svm_C=SVM_C,
    svm_gamma=SVM_GAMMA, 
    svm_kernel=SVM_KERNEL,
    print_classification_report=True,
    print_confusion_matrix=True,
)

training_time = time.time() - start_time
print(f"⏱️  SVM ({SVM_KERNEL}) training and evaluation time: {training_time:.2f} seconds")

NameError: name 'df' is not defined