# DTW-based Nearest Neighbor Classification for Time Series

This notebook implements Dynamic Time Warping (DTW) based nearest neighbor classification using the aeon library, following the experimental approach described in similarity measure research for time series data mining.

## Overview
We will:
1. Load time series datasets using aeon
2. Implement DTW-based 1-NN classification
3. Compare performance with baseline methods
4. Visualize results and DTW alignment behavior

## 1. Import Required Libraries

Import necessary libraries including aeon for time series analysis, numpy for numerical operations, matplotlib for visualization, and sklearn for metrics.

In [1]:
# Import Required Libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.neighbors import KNeighborsClassifier
import pandas as pd

# Aeon imports for time series analysis
from aeon.datasets import load_classification
from aeon.classification.distance_based import KNeighborsTimeSeriesClassifier
from aeon.distances import dtw_distance, euclidean_distance

# Set plotting style
plt.style.use("seaborn-v0_8")
sns.set_palette("husl")
np.random.seed(42)

print("Libraries imported successfully!")
print(f"Aeon version available for time series analysis")

Libraries imported successfully!
Aeon version available for time series analysis


## 2. Load Time Series Dataset

Load a UCR time series dataset using aeon's load_classification function, splitting into training and test sets.

In [2]:
# Load Time Series Dataset
# Using a classic UCR dataset - Coffee (small dataset good for demonstration)
# You can change this to other datasets like 'GunPoint', 'TwoLeadECG', 'ArrowHead', etc.

dataset_name = "Coffee"
print(f"Loading {dataset_name} dataset...")

try:
    # Load the dataset
    X_train, y_train = load_classification(dataset_name, split="train", return_X_y=True)
    X_test, y_test = load_classification(dataset_name, split="test", return_X_y=True)

    print(f"Dataset loaded successfully!")
    print(f"Training set shape: {X_train.shape}")
    print(f"Test set shape: {X_test.shape}")
    print(f"Training labels shape: {y_train.shape}")
    print(f"Test labels shape: {y_test.shape}")

except Exception as e:
    print(f"Error loading dataset: {e}")
    print("Trying alternative dataset...")
    dataset_name = "GunPoint"
    X_train, y_train = load_classification(dataset_name, split="train", return_X_y=True)
    X_test, y_test = load_classification(dataset_name, split="test", return_X_y=True)
    print(f"Loaded {dataset_name} instead")

Loading Coffee dataset...
Error loading dataset: load_classification() got an unexpected keyword argument 'return_X_y'
Trying alternative dataset...


TypeError: load_classification() got an unexpected keyword argument 'return_X_y'

## 3. Explore Dataset Properties

Examine the dataset characteristics including number of samples, series length, number of classes, and visualize sample time series from different classes.

In [None]:
# Explore Dataset Properties
print("=== Dataset Properties ===")
print(f"Dataset name: {dataset_name}")
print(f"Number of training samples: {len(X_train)}")
print(f"Number of test samples: {len(X_test)}")
print(f"Time series length: {X_train.shape[1]}")
print(
    f"Number of features/dimensions: {X_train.shape[2] if len(X_train.shape) > 2 else 1}"
)

# Examine class distribution
unique_classes = np.unique(y_train)
print(f"Number of classes: {len(unique_classes)}")
print(f"Class labels: {unique_classes}")

print("\n=== Class Distribution ===")
train_class_counts = pd.Series(y_train).value_counts().sort_index()
test_class_counts = pd.Series(y_test).value_counts().sort_index()

print("Training set:")
for class_label in unique_classes:
    count = train_class_counts.get(class_label, 0)
    print(f"  Class {class_label}: {count} samples")

print("Test set:")
for class_label in unique_classes:
    count = test_class_counts.get(class_label, 0)
    print(f"  Class {class_label}: {count} samples")

In [None]:
# Visualize sample time series from different classes
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(f"Sample Time Series from {dataset_name} Dataset", fontsize=16)

# Plot samples from each class
for i, class_label in enumerate(unique_classes[:4]):  # Show up to 4 classes
    row, col = divmod(i, 2)
    ax = axes[row, col] if len(unique_classes) > 1 else axes

    # Get samples from this class
    class_indices = np.where(y_train == class_label)[0]

    # Plot multiple samples from this class
    for j in range(min(5, len(class_indices))):  # Plot up to 5 samples
        idx = class_indices[j]
        if len(X_train.shape) == 3:
            time_series = X_train[idx, :, 0]  # Take first dimension if multivariate
        else:
            time_series = X_train[idx, :]

        ax.plot(time_series, alpha=0.7, linewidth=1.5)

    ax.set_title(f"Class {class_label} (n={len(class_indices)})")
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.grid(True, alpha=0.3)

# Remove empty subplots if fewer than 4 classes
if len(unique_classes) < 4:
    for i in range(len(unique_classes), 4):
        row, col = divmod(i, 2)
        fig.delaxes(axes[row, col])

plt.tight_layout()
plt.show()

## 4. Implement DTW-based Classification

Create a 1-nearest neighbor classifier using DTW distance metric with aeon's KNeighborsTimeSeriesClassifier, exploring different DTW parameters like window constraints.

In [None]:
# Implement DTW-based Classification
print("=== Setting up DTW-based Classifiers ===")

# DTW without window constraint (full DTW)
dtw_classifier_full = KNeighborsTimeSeriesClassifier(
    n_neighbors=1, distance="dtw", distance_params=None  # No window constraint
)

# DTW with window constraint (Sakoe-Chiba band)
# Window size as percentage of series length
window_size = int(0.1 * X_train.shape[1])  # 10% of series length
print(f"Using window constraint of size: {window_size}")

dtw_classifier_constrained = KNeighborsTimeSeriesClassifier(
    n_neighbors=1, distance="dtw", distance_params={"window": window_size}
)

print("DTW classifiers created successfully!")
print(f"Full DTW: No window constraint")
print(f"Constrained DTW: Window size = {window_size}")

In [None]:
# Train DTW classifiers
print("Training DTW classifiers...")

# Train full DTW classifier
print("Training full DTW classifier...")
dtw_classifier_full.fit(X_train, y_train)
print("Full DTW classifier trained!")

# Train constrained DTW classifier
print("Training constrained DTW classifier...")
dtw_classifier_constrained.fit(X_train, y_train)
print("Constrained DTW classifier trained!")

# Make predictions
print("Making predictions...")
y_pred_dtw_full = dtw_classifier_full.predict(X_test)
y_pred_dtw_constrained = dtw_classifier_constrained.predict(X_test)

print("Predictions completed!")

## 5. Evaluate Classification Performance

Train the DTW classifier and evaluate its performance on the test set using accuracy, confusion matrix, and classification report.

In [None]:
# Evaluate Classification Performance
print("=== DTW Classification Results ===")

# Calculate accuracies
accuracy_dtw_full = accuracy_score(y_test, y_pred_dtw_full)
accuracy_dtw_constrained = accuracy_score(y_test, y_pred_dtw_constrained)

print(f"Full DTW Accuracy: {accuracy_dtw_full:.4f} ({accuracy_dtw_full*100:.2f}%)")
print(
    f"Constrained DTW Accuracy: {accuracy_dtw_constrained:.4f} ({accuracy_dtw_constrained*100:.2f}%)"
)

# Classification reports
print("\n=== Full DTW Classification Report ===")
print(classification_report(y_test, y_pred_dtw_full))

print("\n=== Constrained DTW Classification Report ===")
print(classification_report(y_test, y_pred_dtw_constrained))

In [None]:
# Confusion Matrices
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# Full DTW confusion matrix
cm_full = confusion_matrix(y_test, y_pred_dtw_full)
sns.heatmap(
    cm_full,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=unique_classes,
    yticklabels=unique_classes,
    ax=axes[0],
)
axes[0].set_title(f"Full DTW Confusion Matrix\nAccuracy: {accuracy_dtw_full:.4f}")
axes[0].set_xlabel("Predicted")
axes[0].set_ylabel("Actual")

# Constrained DTW confusion matrix
cm_constrained = confusion_matrix(y_test, y_pred_dtw_constrained)
sns.heatmap(
    cm_constrained,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=unique_classes,
    yticklabels=unique_classes,
    ax=axes[1],
)
axes[1].set_title(
    f"Constrained DTW Confusion Matrix\nAccuracy: {accuracy_dtw_constrained:.4f}"
)
axes[1].set_xlabel("Predicted")
axes[1].set_ylabel("Actual")

plt.tight_layout()
plt.show()

## 6. Compare with Baseline Methods

Compare DTW performance against Euclidean distance-based nearest neighbor classification and analyze the differences in accuracy.

In [None]:
# Compare with Baseline Methods
print("=== Implementing Baseline Methods ===")

# Euclidean distance-based 1-NN classifier using aeon
euclidean_classifier = KNeighborsTimeSeriesClassifier(
    n_neighbors=1, distance="euclidean"
)

# Traditional sklearn 1-NN classifier (requires flattening the data)
# Flatten the time series data for sklearn
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_test_flat = X_test.reshape(X_test.shape[0], -1)

sklearn_classifier = KNeighborsClassifier(n_neighbors=1, metric="euclidean")

print("Training baseline classifiers...")

# Train euclidean classifier
euclidean_classifier.fit(X_train, y_train)
y_pred_euclidean = euclidean_classifier.predict(X_test)

# Train sklearn classifier
sklearn_classifier.fit(X_train_flat, y_train)
y_pred_sklearn = sklearn_classifier.predict(X_test_flat)

print("Baseline classifiers trained and predictions made!")

In [None]:
# Compare all methods
print("=== Performance Comparison ===")

methods = {
    "Full DTW": y_pred_dtw_full,
    "Constrained DTW": y_pred_dtw_constrained,
    "Euclidean (aeon)": y_pred_euclidean,
    "Euclidean (sklearn)": y_pred_sklearn,
}

results = []
for method_name, predictions in methods.items():
    accuracy = accuracy_score(y_test, predictions)
    results.append({"Method": method_name, "Accuracy": accuracy})
    print(f"{method_name:20s}: {accuracy:.4f} ({accuracy*100:.2f}%)")

# Create comparison DataFrame
results_df = pd.DataFrame(results)
results_df = results_df.sort_values("Accuracy", ascending=False)

print("\n=== Ranked Results ===")
for i, row in results_df.iterrows():
    print(f"{row['Method']:20s}: {row['Accuracy']:.4f}")

In [None]:
# Visualize performance comparison
plt.figure(figsize=(12, 6))

# Bar plot of accuracies
bars = plt.bar(
    results_df["Method"],
    results_df["Accuracy"],
    color=["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728"],
)

# Add value labels on bars
for bar, accuracy in zip(bars, results_df["Accuracy"]):
    plt.text(
        bar.get_x() + bar.get_width() / 2,
        bar.get_height() + 0.005,
        f"{accuracy:.3f}",
        ha="center",
        va="bottom",
        fontweight="bold",
    )

plt.title(f"Classification Accuracy Comparison on {dataset_name} Dataset", fontsize=14)
plt.ylabel("Accuracy", fontsize=12)
plt.xlabel("Method", fontsize=12)
plt.ylim(0, 1.1)
plt.xticks(rotation=45)
plt.grid(axis="y", alpha=0.3)
plt.tight_layout()
plt.show()

# Calculate improvement of best DTW over Euclidean
best_dtw_acc = max(accuracy_dtw_full, accuracy_dtw_constrained)
euclidean_acc = accuracy_score(y_test, y_pred_euclidean)
improvement = ((best_dtw_acc - euclidean_acc) / euclidean_acc) * 100

print(f"\nBest DTW improvement over Euclidean: {improvement:.2f}%")

## 7. Visualize Results

Create visualizations showing classification results, distance matrices, and alignment paths for selected time series pairs to demonstrate DTW behavior.

In [None]:
# Visualize Results - Sample Alignments
print("=== Visualizing DTW Alignments ===")

# Select samples for demonstration
# Find a correctly classified and a misclassified sample by DTW
correct_predictions = np.where(y_pred_dtw_full == y_test)[0]
incorrect_predictions = np.where(y_pred_dtw_full != y_test)[0]

if len(incorrect_predictions) > 0:
    sample_indices = [correct_predictions[0], incorrect_predictions[0]]
    titles = ["Correctly Classified", "Misclassified"]
else:
    sample_indices = [correct_predictions[0], correct_predictions[1]]
    titles = ["Sample 1", "Sample 2"]

fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle("DTW Alignment Examples", fontsize=16)

for idx, (sample_idx, title) in enumerate(zip(sample_indices, titles)):
    # Get test sample
    if len(X_test.shape) == 3:
        test_sample = X_test[sample_idx, :, 0]
    else:
        test_sample = X_test[sample_idx, :]

    true_label = y_test[sample_idx]
    predicted_label = y_pred_dtw_full[sample_idx]

    # Find nearest neighbor in training set
    min_distance = float("inf")
    nearest_idx = 0

    for train_idx in range(len(X_train)):
        if len(X_train.shape) == 3:
            train_sample = X_train[train_idx, :, 0]
        else:
            train_sample = X_train[train_idx, :]

        distance = dtw_distance(test_sample, train_sample)
        if distance < min_distance:
            min_distance = distance
            nearest_idx = train_idx

    # Get nearest neighbor
    if len(X_train.shape) == 3:
        nearest_sample = X_train[nearest_idx, :, 0]
    else:
        nearest_sample = X_train[nearest_idx, :]

    nearest_label = y_train[nearest_idx]

    # Plot the time series
    ax1 = axes[idx, 0]
    ax1.plot(test_sample, label=f"Test (True: {true_label})", linewidth=2)
    ax1.plot(
        nearest_sample,
        label=f"Nearest (Label: {nearest_label})",
        linewidth=2,
        alpha=0.7,
    )
    ax1.set_title(f"{title}\nPred: {predicted_label}, DTW Distance: {min_distance:.3f}")
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Create a simple alignment visualization
    ax2 = axes[idx, 1]

    # Create a heatmap showing the distance matrix (simplified)
    len1, len2 = len(test_sample), len(nearest_sample)
    dist_matrix = np.zeros((len1, len2))

    for i in range(len1):
        for j in range(len2):
            dist_matrix[i, j] = abs(test_sample[i] - nearest_sample[j])

    im = ax2.imshow(dist_matrix, cmap="viridis", aspect="auto")
    ax2.set_title("Distance Matrix")
    ax2.set_xlabel("Nearest Neighbor Index")
    ax2.set_ylabel("Test Sample Index")
    plt.colorbar(im, ax=ax2)

plt.tight_layout()
plt.show()

In [None]:
# Final Summary and Analysis
print("=== Final Analysis Summary ===")
print(f"Dataset: {dataset_name}")
print(f"Training samples: {len(X_train)}")
print(f"Test samples: {len(X_test)}")
print(f"Time series length: {X_train.shape[1]}")
print(f"Number of classes: {len(unique_classes)}")

print("\n=== Performance Summary ===")
for i, row in results_df.iterrows():
    print(f"{row['Method']:20s}: {row['Accuracy']:.4f} ({row['Accuracy']*100:.2f}%)")

print(f"\n=== Key Findings ===")
best_method = results_df.iloc[0]["Method"]
best_accuracy = results_df.iloc[0]["Accuracy"]
print(f"• Best performing method: {best_method} with {best_accuracy:.4f} accuracy")

dtw_methods = results_df[results_df["Method"].str.contains("DTW")]
if len(dtw_methods) > 1:
    full_dtw_acc = results_df[results_df["Method"] == "Full DTW"]["Accuracy"].iloc[0]
    const_dtw_acc = results_df[results_df["Method"] == "Constrained DTW"][
        "Accuracy"
    ].iloc[0]

    if full_dtw_acc > const_dtw_acc:
        print(
            f"• Full DTW outperformed constrained DTW by {(full_dtw_acc - const_dtw_acc)*100:.2f}%"
        )
    else:
        print(
            f"• Constrained DTW outperformed full DTW by {(const_dtw_acc - full_dtw_acc)*100:.2f}%"
        )

euclidean_acc = results_df[results_df["Method"] == "Euclidean (aeon)"]["Accuracy"].iloc[
    0
]
best_dtw_acc = max(accuracy_dtw_full, accuracy_dtw_constrained)

if best_dtw_acc > euclidean_acc:
    improvement = ((best_dtw_acc - euclidean_acc) / euclidean_acc) * 100
    print(f"• DTW showed {improvement:.1f}% improvement over Euclidean distance")
else:
    decline = ((euclidean_acc - best_dtw_acc) / euclidean_acc) * 100
    print(f"• Euclidean distance outperformed DTW by {decline:.1f}%")

print(f"\n=== Experiment Complete ===")
print("This notebook demonstrates DTW-based nearest neighbor classification")
print("for time series data, following experimental methodologies from")
print("similarity measure research in time series data mining.")