In [1]:
%run "./1. Data Loading.ipynb"

X: (800, 360, 25)
Y: (800,)


In [10]:
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from sklearn.model_selection import train_test_split

In [7]:
# Taken from https://github.com/renqi1/Time_Series_Classification/blob/main/Rocket.py
# paper: https://arxiv.org/abs/1910.13051
# code: https://github.com/angus924/rocket

class Rocket(nn.Module):
    def __init__(self, n_features, seq_len, n_classes, n_kernels=100, kss=[7, 9, 11]):
        super(Rocket, self).__init__()
        kss = [ks for ks in kss if ks < seq_len]
        convs = nn.ModuleList()
        for i in range(n_kernels):
            ks = np.random.choice(kss)
            dilation = 2**np.random.uniform(0, np.log2((seq_len - 1) // (ks - 1)))
            padding = int((ks - 1) * dilation // 2) if np.random.randint(2) == 1 else 0
            weight = torch.randn(1, n_features, ks)
            weight -= weight.mean()
            bias = 2 * (torch.rand(1) - .5)
            layer = nn.Conv1d(n_features, 1, ks, padding=2 * padding, dilation=int(dilation), bias=True)
            layer.weight = torch.nn.Parameter(weight, requires_grad=False)
            layer.bias = torch.nn.Parameter(bias, requires_grad=False)
            convs.append(layer)
        self.convs = convs
        self.n_kernels = n_kernels
        self.feature_dim = 2 * n_kernels
        self.fc = nn.Linear(self.feature_dim, n_classes)

    def forward(self, x):
        # input x: [B, F, T],  where B = Batch size, F = features, T = Time sampels,
        _output = []
        for i in range(self.n_kernels):
            out = self.convs[i](x).cpu()
            _max = out.max(dim=-1)[0]
            _ppv = torch.gt(out, 0).sum(dim=-1).float() / out.shape[-1]
            _output.append(_max)
            _output.append(_ppv)
        output = torch.cat(_output, dim=1)#.cuda()      # [batch_size, feature_dim]
        output = self.fc(output)
        return output

In [8]:
combined = list(zip(X, Y))
random.shuffle(combined)
shuffled_x_values, shuffled_y_values = zip(*combined)

X_train, X_test, Y_train, Y_test = train_test_split(shuffled_x_values, shuffled_y_values, test_size=0.2, random_state=42)

# Define hyperparameters
num_epochs = 10
batch_size = 32

n_features = 360
seq_len = 25
n_classes = 25
n_kernels = 100
kss = [3, 5, 7, 9, 11]

# Create the Rocket model
model = Rocket(n_features, seq_len, n_classes, n_kernels, kss)

#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_train_samples = len(X_train)

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    
    # Mini-batch training
    for batch_start in range(0, num_train_samples, batch_size):
        batch_end = min(batch_start + batch_size, num_train_samples)
        inputs = torch.tensor(X_train[batch_start:batch_end], dtype=torch.float32)#.to(device)
        labels = torch.tensor(Y_train[batch_start:batch_end], dtype=torch.long)#.to(device)
        
        # Forward pass
        optimizer.zero_grad()
        outputs = model(inputs)
        
        # Compute loss
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item() * (batch_end - batch_start)
    
    # Compute average epoch loss
    epoch_loss /= num_train_samples
    
    # Print training loss for the epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")
    
    # Evaluation
    model.eval()
    num_correct = 0
    num_total = 0
    
    with torch.no_grad():
        for batch_start in range(0, len(X_test), batch_size):
            batch_end = min(batch_start + batch_size, len(X_test))
            inputs = torch.tensor(X_test[batch_start:batch_end], dtype=torch.float32)#.to(device)
            labels = torch.tensor(Y_test[batch_start:batch_end], dtype=torch.long)#.to(device)
            
            # Forward pass
            outputs = model(inputs)
            
            # Compute predictions and accuracy
            _, predicted = torch.max(outputs.data, 1)
            num_correct += (predicted == labels).sum().item()
            num_total += labels.size(0)
    
    # Compute and print evaluation metrics
    accuracy = num_correct / num_total
    print(f"Epoch [{epoch+1}/{num_epochs}], Accuracy: {accuracy:.4f}")

# Save the trained model if desired
#torch.save(model.state_dict(), "rocket_model.pt")


Epoch [1/10], Loss: 709.0549
Epoch [1/10], Accuracy: 0.6500
Epoch [2/10], Loss: 278.7373
Epoch [2/10], Accuracy: 0.7000
Epoch [3/10], Loss: 222.3217
Epoch [3/10], Accuracy: 0.7562
Epoch [4/10], Loss: 179.5808
Epoch [4/10], Accuracy: 0.8063
Epoch [5/10], Loss: 151.0640
Epoch [5/10], Accuracy: 0.8250
Epoch [6/10], Loss: 132.5798
Epoch [6/10], Accuracy: 0.8625
Epoch [7/10], Loss: 114.9918
Epoch [7/10], Accuracy: 0.8688
Epoch [8/10], Loss: 100.3237
Epoch [8/10], Accuracy: 0.8812
Epoch [9/10], Loss: 91.5915
Epoch [9/10], Accuracy: 0.8875
Epoch [10/10], Loss: 79.0048
Epoch [10/10], Accuracy: 0.8938
