In [1]:
import os
import zipfile
import urllib.request
import numpy as np
import torch
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader

# Directory where datasets will be downloaded and extracted
DATA_DIR = 'datasets'

# Ensure the dataset directory exists
os.makedirs(DATA_DIR, exist_ok=True)

def download_dataset(dataset_name, url):
    """
    Downloads and extracts a zip file containing the dataset.
    """
    zip_path = os.path.join(DATA_DIR, f"{dataset_name}.zip")
    extract_path = os.path.join(DATA_DIR, dataset_name)

    # Download the dataset
    print(f"Downloading {dataset_name} from {url}...")
    urllib.request.urlretrieve(url, zip_path)

    # Extract the zip file
    print(f"Extracting {dataset_name}...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)

    # Remove the zip file after extraction
    os.remove(zip_path)
    print(f"Dataset {dataset_name} extracted to {extract_path}.")
    return extract_path

def load_japanese_vowels_ts(file_path):
    """
    Load JapaneseVowels .ts file which has a specific multivariate format.
    Each line contains multiple time series separated by colons, followed by the label.
    Format: series1:series2:...:seriesN:label
    """
    data = []
    labels = []

    with open(file_path, 'r') as file:
        is_metadata = True
        for line_num, line in enumerate(file):
            line = line.strip()

            # Skip metadata until @data
            if is_metadata:
                if line.lower() == "@data":
                    is_metadata = False
                continue

            # Skip empty lines
            if len(line) == 0:
                continue

            try:
                # Split by colon to separate different time series and label
                parts = line.split(':')

                if len(parts) < 2:
                    print(f"Skipping line {line_num}: insufficient parts")
                    continue

                # Last part is the label
                label = int(parts[-1])

                # All other parts are time series (one per dimension)
                time_series_parts = parts[:-1]

                # Convert each time series part to arrays
                time_series_arrays = []
                for ts_part in time_series_parts:
                    if ts_part.strip():  # Skip empty parts
                        ts_values = [float(x) for x in ts_part.split(',')]
                        time_series_arrays.append(ts_values)

                if len(time_series_arrays) == 0:
                    print(f"Skipping line {line_num}: no valid time series data")
                    continue

                # Stack the time series arrays to create multivariate data
                # Shape: (time_points, n_dimensions)
                multivariate_series = np.array(time_series_arrays).T

                data.append(multivariate_series)
                labels.append(label)

            except (ValueError, IndexError) as e:
                print(f"Skipping line {line_num}: {str(e)}")
                continue

    if len(data) == 0:
        raise ValueError("No valid data found in .ts file")

    # Find the maximum time length for padding
    max_time_length = max(series.shape[0] for series in data)
    n_dimensions = data[0].shape[1]

    print(f"Found {len(data)} samples")
    print(f"Maximum time length: {max_time_length}")
    print(f"Number of dimensions: {n_dimensions}")

    # Pad all series to the same length
    padded_data = []
    for series in data:
        if series.shape[0] < max_time_length:
            # Pad with the last value
            padding_needed = max_time_length - series.shape[0]
            last_values = series[-1:].repeat(padding_needed, axis=0)
            padded_series = np.vstack([series, last_values])
        else:
            padded_series = series
        padded_data.append(padded_series)

    # Convert to numpy array: (n_samples, time_points, n_dimensions)
    data_array = np.array(padded_data)
    labels_array = np.array(labels)

    print(f"Final data shape: {data_array.shape}")
    print(f"Final labels shape: {labels_array.shape}")

    return data_array, labels_array

# Dataset information
dataset_name = 'JapaneseVowels'
dataset_url = 'https://timeseriesclassification.com/aeon-toolkit/JapaneseVowels.zip'

# Download and extract the dataset
extract_path = download_dataset(dataset_name, dataset_url)

# Check available files
files = os.listdir(extract_path)
print(f"Available files: {files}")

# Try different file variations
possible_train_files = [
    'JapaneseVowels_TRAIN.ts',
    'JapaneseVowels_eq_TRAIN.ts'
]

possible_test_files = [
    'JapaneseVowels_TEST.ts',
    'JapaneseVowels_eq_TEST.ts'
]

# Find the correct files
train_file = None
test_file = None

for fname in possible_train_files:
    if fname in files:
        train_file = os.path.join(extract_path, fname)
        print(f"Using training file: {fname}")
        break

for fname in possible_test_files:
    if fname in files:
        test_file = os.path.join(extract_path, fname)
        print(f"Using test file: {fname}")
        break

if train_file is None or test_file is None:
    print("Could not find appropriate .ts files")
    print("Available files:", files)
    exit()

# Load the train and test datasets
X_train, y_train = load_japanese_vowels_ts(train_file)
X_test, y_test = load_japanese_vowels_ts(test_file)

# Check if train and test have different time lengths
n_train_samples, train_time_points, n_dimensions = X_train.shape
n_test_samples, test_time_points, _ = X_test.shape

print(f"Train time points: {train_time_points}, Test time points: {test_time_points}")

if train_time_points != test_time_points:
    print("Different time lengths detected. Padding to match the maximum length.")
    max_time_points = max(train_time_points, test_time_points)

    # Pad training data if needed
    if train_time_points < max_time_points:
        padding_needed = max_time_points - train_time_points
        # Pad with last values
        last_values = X_train[:, -1:, :].repeat(padding_needed, axis=1)
        X_train = np.concatenate([X_train, last_values], axis=1)

    # Pad test data if needed
    if test_time_points < max_time_points:
        padding_needed = max_time_points - test_time_points
        # Pad with last values
        last_values = X_test[:, -1:, :].repeat(padding_needed, axis=1)
        X_test = np.concatenate([X_test, last_values], axis=1)

    time_points = max_time_points
    print(f"Padded to {time_points} time points")
else:
    time_points = train_time_points

# Update shapes after potential padding
n_train_samples, time_points, n_dimensions = X_train.shape
n_test_samples = X_test.shape[0]

print(f"Final shapes - Train: {X_train.shape}, Test: {X_test.shape}")

# Normalize the features
# Reshape for normalization: (n_samples * time_points, n_dimensions)
X_train_reshaped = X_train.reshape(-1, n_dimensions)
X_test_reshaped = X_test.reshape(-1, n_dimensions)

scaler = StandardScaler()
X_train_scaled_flat = scaler.fit_transform(X_train_reshaped)
X_test_scaled_flat = scaler.transform(X_test_reshaped)

# Reshape back to original shape
X_train_scaled = X_train_scaled_flat.reshape(n_train_samples, time_points, n_dimensions)
X_test_scaled = X_test_scaled_flat.reshape(n_test_samples, time_points, n_dimensions)

# Split the test data into validation and test sets
X_valid_scaled, X_test_scaled, y_valid, y_test = train_test_split(
    X_test_scaled, y_test, test_size=0.50, random_state=42
)

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.int64)

X_valid_tensor = torch.tensor(X_valid_scaled, dtype=torch.float32)
y_valid_tensor = torch.tensor(y_valid, dtype=torch.int64)

X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.int64)

# Create DataLoaders
batch_size = 64
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
valid_dataset = TensorDataset(X_valid_tensor, y_valid_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Calculate the number of unique classes
n_classes = len(np.unique(y_train))

# Print the shapes and number of classes
print(f"\nFinal Results:")
print(f"Number of classes: {n_classes}")
print(f"X_train shape: {X_train_tensor.shape}, y_train shape: {y_train_tensor.shape}")
print(f"X_valid shape: {X_valid_tensor.shape}, y_valid shape: {y_valid_tensor.shape}")
print(f"X_test shape: {X_test_tensor.shape}, y_test shape: {y_test_tensor.shape}")
print(f"Number of dimensions: {n_dimensions}")
print(f"Time points: {time_points}")

Downloading JapaneseVowels from https://timeseriesclassification.com/aeon-toolkit/JapaneseVowels.zip...
Extracting JapaneseVowels...
Dataset JapaneseVowels extracted to datasets/JapaneseVowels.
Available files: ['JapaneseVowels_eq_TEST.ts', 'JapaneseVowels_eq_TRAIN.ts', 'JapaneseVowels_TRAIN.ts', 'JapaneseVowels_TEST.ts']
Using training file: JapaneseVowels_TRAIN.ts
Using test file: JapaneseVowels_TEST.ts
Found 270 samples
Maximum time length: 26
Number of dimensions: 12
Final data shape: (270, 26, 12)
Final labels shape: (270,)
Found 370 samples
Maximum time length: 29
Number of dimensions: 12
Final data shape: (370, 29, 12)
Final labels shape: (370,)
Train time points: 26, Test time points: 29
Different time lengths detected. Padding to match the maximum length.
Padded to 29 time points
Final shapes - Train: (270, 29, 12), Test: (370, 29, 12)

Final Results:
Number of classes: 9
X_train shape: torch.Size([270, 29, 12]), y_train shape: torch.Size([270])
X_valid shape: torch.Size([185,