In [3]:
import os
import math
import itertools
from torch.utils.data import Dataset, DataLoader

import numpy as np
from tqdm import tqdm
import torch
import torch.nn
import einops
from sklearn.linear_model import RidgeClassifier, RidgeClassifierCV
from sklearn.metrics import precision_recall_curve

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
os.environ['CUDA_VISIBLE_DEVICES'] = '2'
torch.set_default_tensor_type('torch.cuda.DoubleTensor')

In [5]:
def metrics(y_true, y_hat, n_classes):
    y_true_onehot = np.zeros((y_true.shape[0], n_classes))
    y_true_onehot[np.arange(y_true.shape[0]), y_true] = 1
    
    y_hat_onehot = np.zeros((y_hat.shape[0], n_classes))
    y_hat_onehot[np.arange(y_hat.shape[0]), y_hat] = 1
    w = [np.mean(y_true==i) for i in range(n_classes)]

    accuracy = np.mean(y_true == y_hat)
    precision = dict()
    recall = dict()
    for i in range(n_classes):
        precision[i], recall[i], _ = precision_recall_curve(y_true_onehot[:, i], y_hat_onehot[:, i])

    Fw, Fm = 0, 0
    for i in range(n_classes):
        prec = precision[i][1]
        rec = recall[i][1]
        Fw += 2 * w[i] * (prec * rec) / (prec + rec)
        Fm += 2 * (prec * rec) / (prec + rec) / n_classes
    return Fw, Fm, accuracy

## Rocket

In [6]:
class RandomFeatures(torch.nn.Module):
    kernel_length = torch.tensor([7, 9, 11], dtype=torch.float)
    weight = None
    bias = None
    dilation = None
    padding = None
    
    def __init__(self, n_kernels, ts_length, n_ts, random_seed=0):
        super(RandomFeatures, self).__init__()
        
        self.n_kernels = n_kernels
        self.ts_length = ts_length
        self.n_ts = n_ts
        self.random_seed = random_seed * n_kernels
        self.create_kernels(self.random_seed)
        
    def create_kernels(self, random_seed=0):
        torch.manual_seed(random_seed)
        kernel_length_indices = torch.multinomial(self.kernel_length, self.n_kernels, replacement=True)
        kernel_lengths = self.kernel_length[kernel_length_indices].to(torch.long)
        weight_distributions_ = [torch.normal(mean=0, std=1, size=(kernel_length,), dtype=torch.double) for kernel_length in kernel_lengths]
        weight_distributions = [weight_distribution - weight_distribution.mean() for weight_distribution in weight_distributions_]
        self.weight = [weight_distribution.unsqueeze(0).unsqueeze(0) for weight_distribution in weight_distributions]
        
        self.spatial_weight = torch.normal(mean=0, std=1, size=(self.n_kernels, self.n_ts), dtype=torch.double)
        self.spatial_weight = self.spatial_weight / torch.linalg.vector_norm(self.spatial_weight, dim=1, keepdim=True)
        
        self.bias = torch.rand(size=(self.n_kernels,), dtype=torch.double).reshape(-1,1) * 2 - 1
        A = math.log(self.ts_length-1) - torch.log(kernel_lengths-1)
        s = torch.rand(size=(self.n_kernels,)) * A
        self.dilation = torch.floor(2**s)
        self.padding = ((self.dilation * (kernel_lengths-1) / 2) * torch.randint(2, size=(self.n_kernels,)))
    
    
    def forward(self, x):
        batch_size = x.size(0)
        # x = einops.rearrange(x, 'b c t -> b 1 t')
        features = torch.empty(size=(batch_size, self.n_kernels, 3))
        
        for i in range(self.n_kernels):
            
            ts_collapsed = torch.einsum('bct,c->bt', x, self.spatial_weight[i])
            ts_collapsed = einops.rearrange(ts_collapsed, 'b t -> b 1 t')
            # print(ts_collapsed.shape)
            
            ts_convolved = torch.nn.functional.conv1d(
                ts_collapsed, weight=self.weight[i], bias=self.bias[i], 
                padding=int(self.padding[i].item()), dilation=int(self.dilation[i].item())
            )
            ts_convolved = ts_convolved.squeeze(1)

            ts_convolved_max = torch.max(ts_convolved, dim=1).values
            features[:,i,0] = ts_convolved_max

            ts_convolved_ppv = torch.mean((ts_convolved > 0).to(torch.float), dim=1)
            features[:,i,1] = ts_convolved_ppv
            
            ts_convolved_amp = torch.mean(torch.abs(ts_convolved), dim=1)
            features[:,i,2] = ts_convolved_amp
        
        features = einops.rearrange(features, 'b s f -> b (s f)')
        return features

In [7]:
class TimeSeriesDataset(Dataset):
    def __init__(self, samples, labels):
        self.samples = samples
        self.labels = labels

    def __getitem__(self, index):
        return self.samples[index], self.labels[index]

    def __len__(self):
        return len(self.samples)

In [8]:
class Rocket():
    def __init__(self, n_features, ts_length, n_ts, n_classes, random_seed=0):
        self.n_classes = n_classes
        self.random_seed = random_seed
        self.batch_size = 1024
        assert n_features % 3 == 0, 'n_features must be even'
        n_kernels = n_features // 3
        self.rf = RandomFeatures(n_kernels, ts_length, n_ts, random_seed)
        self.csf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 20))

    def fit(self, X, Y):
        dataset = TimeSeriesDataset(X, Y)
        dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True, generator=torch.Generator(device='cuda'))
        
        features, labels = [], []
        for x, y in tqdm(dataloader):
            features.append(self.rf(x).detach().cpu().numpy().astype(np.float32))
            labels.append(y.detach().cpu().numpy())
        features = np.concatenate(features, axis=0)
        labels = np.concatenate(labels, axis=0)
        self.csf.fit(features, labels)

    def predict(self, X, Y=None):
        dataset = TimeSeriesDataset(X, Y)
        dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=False, generator=torch.Generator(device='cuda'))
        
        features = []
        for x, _ in tqdm(dataloader):
            features.append(self.rf(x).detach().cpu().numpy().astype(np.float32))
        features = np.concatenate(features, axis=0)

        Y_hat = self.csf.predict(features)
        return Y_hat

## UCI

In [133]:
path_load = 'datasets/HAR/'
train_x = np.load(path_load + 'train_x.npy')
train_y = np.load(path_load + 'train_y.npy')
test_x = np.load(path_load + 'test_x.npy')
test_y = np.load(path_load + 'test_y.npy')

ts_length = train_x.shape[2]
n_ts = train_x.shape[1]
n_classes = np.unique(train_y).shape[0]

In [134]:
n_features = 90000
rocket = Rocket(n_features, ts_length, n_ts, n_classes, random_seed=0)

In [135]:
rocket.fit(train_x, train_y)

100%|██████████| 8/8 [23:24<00:00, 175.61s/it]


In [136]:
test_y_hat = rocket.predict(test_x, test_y)

100%|██████████| 3/3 [09:18<00:00, 186.04s/it]


In [137]:
Fw, Fm, accuracy = metrics(test_y, test_y_hat, n_classes)
print(Fw, Fm, accuracy)

0.9717398821825951 0.9726660054194296 0.9718357651849339


## OPPOR

In [138]:
path_load = 'datasets/OPPOR/'
train_x = np.load(path_load + 'train_x.npy').astype(np.float64)
train_y = np.load(path_load + 'train_y.npy')
test_x = np.load(path_load + 'test_x.npy').astype(np.float64)
test_y = np.load(path_load + 'test_y.npy')

ts_length = train_x.shape[2]
n_ts = train_x.shape[1]
n_classes = np.unique(train_y).shape[0]

In [149]:
n_features = 12000
rocket = Rocket(n_features, ts_length, n_ts, n_classes, random_seed=0)

In [150]:
rocket.fit(train_x, train_y)

100%|██████████| 34/34 [15:58<00:00, 28.18s/it]


In [151]:
test_y_hat = rocket.predict(test_x, test_y)

100%|██████████| 15/15 [06:53<00:00, 27.58s/it]


In [152]:
Fw, Fm, accuracy = metrics(test_y, test_y_hat, n_classes)
print(Fw, Fm, accuracy)

0.5059436684850275 0.5070361930295703 0.5178185745140389


## NinaPro

In [9]:
path_load = 'datasets/NinaPro/'
train_x = np.load(path_load + 'train_x.npy').astype(np.float64)
train_y = np.load(path_load + 'train_y.npy')
test_x = np.load(path_load + 'test_x.npy').astype(np.float64)
test_y = np.load(path_load + 'test_y.npy')

ts_length = train_x.shape[2]
n_ts = train_x.shape[1]
n_classes = np.unique(train_y).shape[0]

In [10]:
n_features = 30000
rocket = Rocket(n_features, ts_length, n_ts, n_classes, random_seed=0)

In [None]:
rocket.fit(train_x, train_y)
del train_x, train_y

100%|██████████| 31/31 [40:11<00:00, 77.78s/it]


In [None]:
test_y_hat = rocket.predict(test_x, test_y)

In [None]:
Fw, Fm, accuracy = metrics(test_y, test_y_hat, n_classes)
print(Fw, Fm, accuracy)

## BCICIV2a

In [7]:
Fws, Fms, accuracys = [], [], []

for i in range(9):
    path_load = f'datasets/BCICIV2a/subject_left_{i}/'
    train_x = np.load(path_load + 'train_x.npy').astype(np.float64)
    train_y = np.load(path_load + 'train_y.npy')
    test_x = np.load(path_load + 'test_x.npy').astype(np.float64)
    test_y = np.load(path_load + 'test_y.npy')

    ts_length = train_x.shape[2]
    n_ts = train_x.shape[1]
    n_classes = np.unique(train_y).shape[0]

    n_features = 4500
    rocket = Rocket(n_features, ts_length, n_ts, n_classes, random_seed=0)
    rocket.fit(train_x, train_y)
    del train_x, train_y
    test_y_hat = rocket.predict(test_x, test_y)
    Fw, Fm, accuracy = metrics(test_y, test_y_hat, n_classes)
    print(Fw, Fm, accuracy)
    del rocket

    Fws.append(Fw)
    Fms.append(Fm)
    accuracys.append(accuracy)

100%|██████████| 121/121 [56:27<00:00, 27.99s/it]
100%|██████████| 16/16 [07:30<00:00, 28.15s/it]


0.28201840023953084 0.28184236783408734 0.3


 71%|███████   | 86/121 [40:33<16:30, 28.29s/it]


KeyboardInterrupt: 

In [None]:
print(np.mean(Fws), np.mean(Fms), np.mean(accuracys))