<a href="https://colab.research.google.com/github/cwhitz/ts-trove/blob/classification/notebooks/classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Time Series Classification

This notebook explores various time series classification techniques. It makes much fuller use of the bearings dataset also explored in the signal analysis notebook.

## Overview

Time series classification involves assigning time series instances to predefined categories. This notebook will cover:

### Table of Contents

> 1 [Data Preparation](#Data-Preparation)

1.1. [Data Download](##Data-Download)

1.2 [Data Organization](##Data-Organization)

> 2 [Utility Functions](#Training-Functions)

2.1 [Data Loader](##Data-Loader)

> 3 [Time Series Classification with SciKit](#Scikit-Functions)

3.1 [Scikit Trainer and Evaluator](##Scikit-Trainer-and-Evaluator)

> 4 [Deep Learning Functions](#PyTorch-Functions)

4.1 [PyTorch Trainer and Evaluator](##-Trainer-and-Evaluator)

4.2 [Fully Connected Neural Networks]()

4.3 [Recurrent Neural Networks]()

4.3.1 [Classic Recurrent Neural Network]()

4.3.2 [Long Short Term Memory (LSTM) Neural Network]()

4.3.3 [Gated Recurrent Neural Network]()

4.4 [Convolutional Neural Networks]()

4.4.1 [1D Convolutional Neural Network]()

4.4.2 [Temporal Convolutional Network]()

4.5 [Attention Based Models]()

4.5.1 [LSTM with Attention]()

4.5.2 [Time Series Transformer]()



In [1]:
import pandas as pd
import numpy as np
import os
import pathlib
import matplotlib.pyplot as plt
import json
import pathlib
import shutil
import kagglehub

In [2]:
!pip install cesium

Collecting cesium
  Downloading cesium-0.12.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.9 kB)
Collecting gatspy>=0.3.0 (from cesium)
  Downloading gatspy-0.3.tar.gz (554 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m554.5/554.5 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading cesium-0.12.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (806 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m806.5/806.5 kB[0m [31m29.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: gatspy
  Building wheel for gatspy (setup.py) ... [?25l[?25hdone
  Created wheel for gatspy: filename=gatspy-0.3-py3-none-any.whl size=43804 sha256=dd05583e9f7e3ac09f4c0cf5f0abb7d168f3b41d604ec7c0cabbe723531d13e0
  Stored in directory: /root/.cache/pip/wheels/b5/56/88/04643e9be584a6018e10aae5789d98225995da3e89513c3f30
Successfully built gatspy
Installi

# Data Preparation
## Data Download

In [3]:
kagglepath = "sufian79/cwru-mat-full-dataset"
path = kagglehub.dataset_download(kagglepath)


pathlib.Path(f"./classification/{kagglepath.split('/')[-1]}").mkdir(parents=True, exist_ok=True)
shutil.copytree(path, f"./classification/{kagglepath.split('/')[-1]}", dirs_exist_ok=True)

Downloading from https://www.kaggle.com/api/v1/datasets/download/sufian79/cwru-mat-full-dataset?dataset_version_number=4...


100%|██████████| 234M/234M [00:02<00:00, 86.8MB/s]

Extracting files...





'./classification/cwru-mat-full-dataset'

## Data Organization

In [5]:
SOURCE_DIR = "classification/cwru-mat-full-dataset/"      # where files currently live
TARGET_DIR = "classification-cwru-mat"        # root of new structured folders
FILE_EXTENSION = ".mat"         # change if needed (e.g. ".csv")

def ensure_dir(path):
    os.makedirs(path, exist_ok=True)

def move_file(file_id, dest_dir):
    filename = file_id + FILE_EXTENSION
    src_path = os.path.join(SOURCE_DIR, filename)
    dst_path = os.path.join(dest_dir, filename)

    if not os.path.exists(src_path):
        print(f"⚠️ Missing file: {src_path}")
        return

    ensure_dir(dest_dir)
    shutil.move(src_path, dst_path)
    print(f"Moved {filename} -> {dest_dir}")

def walk_structure(node, current_path):
    if isinstance(node, list):
        for file_id in node:
            move_file(file_id, current_path)
    elif isinstance(node, dict):
        for key, child in node.items():
            walk_structure(child, os.path.join(current_path, key))
    else:
        raise ValueError("Unexpected structure type")

if __name__ == "__main__":
    with open(os.path.join(SOURCE_DIR, "file_fault_map.json"), "r") as f:
        structure = json.load(f)
    walk_structure(structure, TARGET_DIR)
    print("Done.")

Moved 97.mat -> classification-cwru-mat/normal/48k
Moved 98.mat -> classification-cwru-mat/normal/48k
Moved 99.mat -> classification-cwru-mat/normal/48k
Moved 100.mat -> classification-cwru-mat/normal/48k
Moved 105.mat -> classification-cwru-mat/drive_end_fault/12k/IR/007
Moved 106.mat -> classification-cwru-mat/drive_end_fault/12k/IR/007
Moved 107.mat -> classification-cwru-mat/drive_end_fault/12k/IR/007
Moved 108.mat -> classification-cwru-mat/drive_end_fault/12k/IR/007
Moved 169.mat -> classification-cwru-mat/drive_end_fault/12k/IR/014
Moved 170.mat -> classification-cwru-mat/drive_end_fault/12k/IR/014
Moved 171.mat -> classification-cwru-mat/drive_end_fault/12k/IR/014
Moved 172.mat -> classification-cwru-mat/drive_end_fault/12k/IR/014
Moved 209.mat -> classification-cwru-mat/drive_end_fault/12k/IR/021
Moved 210.mat -> classification-cwru-mat/drive_end_fault/12k/IR/021
Moved 211.mat -> classification-cwru-mat/drive_end_fault/12k/IR/021
Moved 212.mat -> classification-cwru-mat/drive_

# Utility Functions

## Data Loader

Before diving into modeling, we first need a consistent way to load and represent our time-series data. Since later sections will experiment with both deep learning and traditional classifiers, we define a reusable dataset structure that keeps preprocessing, sampling rate handling, and labels consistent across all methods.

In [6]:
from torch.utils.data import Dataset
from torch.nn import Module
import scipy.io
import enum

# samplng rate enum
class SamplingRate(enum.Enum):
    sr12K = "12k"
    sr48K = "48k"

class FaultLocation(enum.Enum):
    DE = "drive_end_fault"
    FE = "front_end_fault"


class BearingDataset(Dataset):
    def __init__(self, file_paths, sampling_rate, fault_location, chunk_length, transform=None):
        self.file_paths = file_paths
        self.sampling_rate = sampling_rate
        self.fault_location = fault_location
        self.chunk_length = chunk_length
        self.transform = transform

        self.data = []
        self.labels = []
        for fp in self.file_paths:
            if not pathlib.Path(fp).exists():
                raise FileNotFoundError(f"File not found: {fp}")

            mat_data = scipy.io.loadmat(fp)

            key_to_match = f"_{str(self.fault_location)[-2:]}_time"
            sensor_key = [key for key in mat_data.keys() if key_to_match in key][0]

            signal = mat_data[sensor_key].squeeze()

            n_chunks = len(signal) // self.chunk_length
            truncated = signal[:n_chunks * self.chunk_length]

            windows = truncated.reshape(n_chunks, self.chunk_length)

            label = '_'.join(fp.parent.parts[-2:])

            for window in windows:
                self.labels.append(label)
                self.data.append(window)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        window = self.data[idx]
        label = self.labels[idx]

        if self.transform:
            window = self.transform(window).astype('float32')

        return window, label





The dataset class above seeks to make the most of the data available in the bearings dataset by splitting each sample in the file into multiple overlapping windows. This increases the effective number of training samples and helps models learn more robust patterns. However, care must be taken to avoid data leakage between training and test sets when using overlapping windows - if we were to pull all the data and then split into train/test, windows from the same original sample could end up in both sets.

To prevent this, we ensure that all windows derived from a given file are assigned to either the training or test set exclusively by splitting into train/test at the file level.

In [7]:
from sklearn.model_selection import train_test_split
from pathlib import Path
from collections import Counter

all_files = list(Path("classification-cwru-mat").rglob("*.mat"))

# derive one label per file
file_labels = [
    '_'.join(f.parent.parts[-2:])
    for f in all_files
]

train_files, test_files = train_test_split(
    all_files,
    test_size=.2,
    shuffle=True,
    stratify=file_labels
)


##

We want to set up a class for testing different classification techniques on the bearings dataset. The class will accept a dataset object and classification model, and be able to train and evaluate the model consistently for metrics like accuracy, precision, recall, and F1-score as well as time for training and inference.

In [8]:
from abc import ABC, abstractmethod
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import plotly.graph_objects as go
from plotly.subplots import make_subplots


class ClassificationTrainTestEvaluate(ABC):
    def __init__(self, train_dataset: Dataset, test_dataset: Dataset, model):
        self.train_dataset = train_dataset
        self.test_dataset = test_dataset
        self.model = model

        self.time_metrics = {
            "train_time": None,
            "inference_time": None
        }

    def classification_report(self):
        """
        Creates a Plotly subplot with:
        - Confusion matrix heatmap
        - Metrics summary table

        Parameters:
        - test_y : array-like of true labels
        - test_prediction : array-like of predicted labels
        - class_names : optional list of class label names
        """

        # Compute confusion matrix
        cm = confusion_matrix(self.test_y, self.predictions)

        # Compute metrics (handles binary & multiclass)
        accuracy = accuracy_score(self.test_y, self.predictions)
        precision = precision_score(self.test_y, self.predictions, average='weighted', zero_division=0)
        recall = recall_score(self.test_y, self.predictions, average='weighted', zero_division=0)
        f1 = f1_score(self.test_y, self.predictions, average='weighted', zero_division=0)

        # Create subplot layout
        fig = make_subplots(
            rows=1, cols=2,
            column_widths=[0.6, 0.4],
            specs=[[{"type": "heatmap"}, {"type": "table"}]],
            subplot_titles=("Confusion Matrix", "Model Performance Metrics")
        )

        # --- Confusion Matrix Heatmap ---
        fig.add_trace(
            go.Heatmap(
                z=cm,
                x=self.class_names,
                y=self.class_names,
                text=cm,
                texttemplate="%{text}",
                colorscale="Blues",
                showscale=False
            ),
            row=1, col=1
        )

        fig.update_xaxes(title_text="Predicted Label", row=1, col=1)
        fig.update_yaxes(title_text="True Label", row=1, col=1)

        # --- Metrics Table ---
        fig.add_trace(
            go.Table(
                header=dict(values=["Metric", "Value"],
                            fill_color="lightgrey",
                            align="center"),
                cells=dict(values=[
                    ["Accuracy", "Precision", "Recall", "F1 Score"],
                    [f"{accuracy:.4f}", f"{precision:.4f}", f"{recall:.4f}", f"{f1:.4f}"]
                ],
                align="center")
            ),
            row=1, col=2
        )

        fig.update_layout(
            title="Model Evaluation Summary",
            height=500,
            width=900
        )

        fig.show()

class SciKitCTTE(ClassificationTrainTestEvaluate):
    def prepare_data(self):
        self.train_X, self.train_y = pd.DataFrame(), pd.Series()
        for i in range(len(self.train_dataset)):
            print(i, "of", len(self.train_dataset))
            X_chunk, label = self.train_dataset[i]

            self.train_X = pd.concat([self.train_X, X_chunk], ignore_index=True)
            self.train_y = pd.concat([self.train_y, pd.Series(label)], ignore_index=True)

        self.test_X, self.test_y = pd.DataFrame(), pd.Series()
        for i in range(len(self.test_dataset)):
            X_chunk, labels = self.test_dataset[i]

            self.test_X = pd.concat([self.test_X, X_chunk], ignore_index=True)
            self.test_y = pd.concat([self.test_y, pd.Series(labels)], ignore_index=True)

    def train(self, train_X, train_y):
        self.model.fit(train_X, train_y)
        self.class_names = sorted(self.train_y.unique())

    def evaluate(self, test_X, test_y):
        self.predictions = self.model.predict(test_X)


# Feature Extraction + Feature Based Classification

With a dataset abstraction in place, we can now explore different families of time-series classification techniques. The goal here is not only to compare performance, but also to understand how different representation choices affect model behavior on sensor-like signals.

We begin with feature-based methods, which transform raw time-series into fixed-length statistical representations. These approaches are often strong baselines, easier to interpret, and computationally efficient compared to end-to-end deep learning models.

### Feature Extraction

We will implement a custom transformer class for the PyTorch dataset to extract statistical features using the `cesium` library.

In [9]:
from cesium import featurize

class FeatureExtractionTransform(Module):
    def forward(self, window):
        features_to_use = [
            "amplitude",
            "percent_beyond_1_std",
            "maximum",
            "max_slope",
            "median",
            "median_absolute_deviation",
            "percent_close_to_median",
            "minimum",
            "period_fast",
            "skew",
            "std",
        ]

        fset = featurize.featurize_time_series(
            times=np.arange(len(window)),
            values=window,
            errors=None,
            features_to_use=features_to_use,
        )

        fset = fset.stack(future_stack=True)

        return fset


In [10]:
train_dataset = BearingDataset(
    train_files,
    sampling_rate=SamplingRate.sr48K,
    fault_location=FaultLocation.DE,
    chunk_length=4800,
    transform=FeatureExtractionTransform()
)

test_dataset = BearingDataset(
    test_files,
    sampling_rate=SamplingRate.sr48K,
    fault_location=FaultLocation.DE,
    chunk_length=4800,
    transform=FeatureExtractionTransform()
)


In [11]:
from sklearn.ensemble import RandomForestClassifier

rfc = RandomForestClassifier(n_estimators=100, random_state=42)

trainer = SciKitCTTE(
    train_dataset,
    test_dataset,
    rfc)

trainer.prepare_data()
trainer.train(trainer.train_X, trainer.train_y)
trainer.evaluate(trainer.test_X, trainer.test_y)
trainer.classification_report()

0 of 4315
1 of 4315
2 of 4315
3 of 4315
4 of 4315
5 of 4315
6 of 4315
7 of 4315
8 of 4315
9 of 4315
10 of 4315
11 of 4315
12 of 4315
13 of 4315
14 of 4315
15 of 4315
16 of 4315
17 of 4315
18 of 4315
19 of 4315
20 of 4315
21 of 4315
22 of 4315
23 of 4315
24 of 4315
25 of 4315
26 of 4315
27 of 4315
28 of 4315
29 of 4315
30 of 4315
31 of 4315
32 of 4315
33 of 4315
34 of 4315
35 of 4315
36 of 4315
37 of 4315
38 of 4315
39 of 4315
40 of 4315
41 of 4315
42 of 4315
43 of 4315
44 of 4315
45 of 4315
46 of 4315
47 of 4315
48 of 4315
49 of 4315
50 of 4315
51 of 4315
52 of 4315
53 of 4315
54 of 4315
55 of 4315
56 of 4315
57 of 4315
58 of 4315
59 of 4315
60 of 4315
61 of 4315
62 of 4315
63 of 4315
64 of 4315
65 of 4315
66 of 4315
67 of 4315
68 of 4315
69 of 4315
70 of 4315
71 of 4315
72 of 4315
73 of 4315
74 of 4315
75 of 4315
76 of 4315
77 of 4315
78 of 4315
79 of 4315
80 of 4315
81 of 4315
82 of 4315
83 of 4315
84 of 4315
85 of 4315
86 of 4315
87 of 4315
88 of 4315
89 of 4315
90 of 4315
91 of 431

# Deep Learning Based Classification (PyTorch)

https://colah.github.io/posts/2015-08-Understanding-LSTMs/

In [12]:
import torch

print(f"CUDA available: {torch.cuda.is_available()}")
print(f"GPU count: {torch.cuda.device_count()}")
if torch.cuda.is_available():
    print(f"GPU name: {torch.cuda.get_device_name(0)}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

CUDA available: True
GPU count: 1
GPU name: Tesla T4


In [17]:
from torch.utils.data import DataLoader
from torch import Tensor, float32, LongTensor
import torch
from tqdm import tqdm

class PyTorchCTTE(ClassificationTrainTestEvaluate):
    def __init__(self, train_dataset: Dataset, test_dataset: Dataset, model, device='cpu', criterion=None, optimizer=None):
        super().__init__(train_dataset, test_dataset, model)
        self.device = device
        self.optimizer = optimizer
        self.criterion = criterion
        self.class_labels_dict = {label: idx for idx, label in enumerate(sorted(set(train_dataset.labels)))}
        self.class_labels_dict_reverse = {idx: label for label, idx in self.class_labels_dict.items()}

        self.train_dataset_mean = None
        self.train_dataset_std = None

    def prepare_data(self):
        self.train_dataloader = DataLoader(self.train_dataset, batch_size=64, shuffle=True)
        self.test_dataloader = DataLoader(self.test_dataset, batch_size=64, shuffle=False)

    def train(self, epochs: int, batch_size: int):
        self.model.to(self.device)
        self.class_names = sorted(set(self.train_dataset.labels))

        self.train_dataset_mean = np.mean(np.concatenate(self.train_dataset.data))
        self.train_dataset_std = np.std(np.concatenate(self.train_dataset.data))


        for epoch in range(epochs):
            self.model.train()
            epoch_loss = 0.0

            progress_bar = tqdm(self.train_dataloader, desc=f"Epoch {epoch+1}/{epochs}")

            for batch_X, batch_y in progress_bar:
                batch_X = (Tensor(batch_X.to(float32)) - self.train_dataset_mean) / self.train_dataset_std
                batch_X = batch_X.to(self.device)
                batch_y_num_labels = [self.class_labels_dict[label] for label in batch_y]
                batch_y = Tensor(batch_y_num_labels).to(self.device)

                self.optimizer.zero_grad()
                outputs = self.model(batch_X)
                loss = self.criterion(outputs, batch_y.type(LongTensor).to(self.device))
                loss.backward()
                self.optimizer.step()

                epoch_loss += loss.item()
                progress_bar.set_postfix(loss=loss.item())

            print(f"Epoch {epoch+1} avg loss: {epoch_loss/len(self.train_dataloader):.4f}")


    def evaluate(self):
        self.test_y = [self.class_labels_dict[label] for label in self.test_dataloader.dataset.labels]
        self.predictions = []
        self.model.eval()
        with torch.no_grad():
            for batch_X, batch_y in self.test_dataloader:
                batch_X = (Tensor(batch_X.to(float32)) - self.train_dataset_mean) / self.train_dataset_std
                batch_X = Tensor(batch_X.to(float32)).to(self.device)
                outputs = self.model(batch_X)
                _, predicted = torch.max(outputs.data, 1)
                self.predictions.extend(predicted.cpu().numpy().tolist())

In [23]:
from torch import nn
import torch
import torch.nn.functional as F

class LSTM1D_pt(nn.Module):
    def __init__(self, sequence_length=1024, hidden_size=128, num_layers=2, dropout_rate=0.3, num_classes=4):
        super(LSTM1D_pt, self).__init__()
        self.sequence_length = sequence_length
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size=1,
                            hidden_size=hidden_size,
                            num_layers=num_layers,
                            batch_first=True,
                            dropout=dropout_rate,
                            bidirectional=False)

        self.bn = nn.BatchNorm1d(hidden_size)

        # Single output head for multi-class classification
        self.fc_type = nn.Linear(hidden_size, num_classes)

        self._init_weights()

    def _init_weights(self):
        for layer in [self.fc_type]:
            nn.init.xavier_uniform_(layer.weight)
            if layer.bias is not None:
                nn.init.zeros_(layer.bias)

    def forward(self, x):
        x = x.unsqueeze(-1)
        lstm_out, (h_n, _) = self.lstm(x)

        # Instead of only last hidden state:
        features = lstm_out.mean(dim=1)
        features = self.bn(features) # Apply BatchNorm1d

        out_type = self.fc_type(features)
        return out_type

In [36]:
from torch.nn import CrossEntropyLoss

train_dataset = BearingDataset(
    train_files,
    sampling_rate=SamplingRate.sr48K,
    fault_location=FaultLocation.DE,
    chunk_length=4800
)

test_dataset = BearingDataset(
    test_files,
    sampling_rate=SamplingRate.sr48K,
    fault_location=FaultLocation.DE,
    chunk_length=4800
)

lstm_model = LSTM1D_pt(
    sequence_length=4800,
    hidden_size=256,
    num_layers=3,
    dropout_rate=0.1,
    num_classes=len(set(train_dataset.labels))
)

optimizer = torch.optim.Adam(lstm_model.parameters(), lr=1e-3)

lstm_ctte = PyTorchCTTE(
    train_dataset,
    test_dataset,
    model=lstm_model,
    device=device,
    criterion=CrossEntropyLoss(),
    optimizer=optimizer
)

In [None]:
lstm_ctte.prepare_data()
lstm_ctte.train(epochs=25, batch_size=64)
lstm_ctte.evaluate()

Epoch 1/25: 100%|██████████| 68/68 [01:17<00:00,  1.14s/it, loss=1.55]


Epoch 1 avg loss: 1.8493


Epoch 2/25: 100%|██████████| 68/68 [01:20<00:00,  1.18s/it, loss=1.16]


Epoch 2 avg loss: 1.5548


Epoch 3/25: 100%|██████████| 68/68 [01:23<00:00,  1.23s/it, loss=1.5]


Epoch 3 avg loss: 1.4678


Epoch 4/25: 100%|██████████| 68/68 [01:24<00:00,  1.24s/it, loss=1.86]


Epoch 4 avg loss: 1.5544


Epoch 5/25:  21%|██        | 14/68 [00:17<01:07,  1.26s/it, loss=0.882]

In [35]:
lstm_ctte.classification_report()
