In [21]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score

import torch
from torch import tensor
from torch.nn import Conv2d, Parameter
from torch.utils.data import TensorDataset, Dataset, DataLoader, random_split

from torchvision.transforms import Resize

from gzip import GzipFile

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [13]:
x = tensor([3, 4, 5])
x[0:2] = tensor([5,5])
x

tensor([5, 5, 5])

In [16]:
from os import listdir
from gzip import GzipFile
folder = "datasets/figshare"

ds_len = \
    len(listdir(f"{folder}/meningioma")) + \
    len(listdir(f"{folder}/glioma")) + \
    len(listdir(f"{folder}/pituitary_tumor"))

imgs = np.empty((ds_len, 1, 256, 256), dtype=np.float32)
imgs = torch.tensor(imgs, device=device)
labels = np.empty(ds_len, dtype=int)
labels = torch.tensor(labels, device=device)
resizer = Resize((256, 256))

for label, tumor_type in enumerate(["meningioma", "glioma", "pituitary_tumor"]):
    for img in listdir(f"{folder}/{tumor_type}"):
        idx = int(img.split(".")[0])

        with GzipFile(f"{folder}/{tumor_type}/{idx}.npy.gz") as f:
            data = np.load(f)

        data = data / np.max(data)
        data = data.astype(np.float32)
        data = torch.tensor(data, dtype=torch.float32, device=device)
        data = data.reshape((1, *data.shape))
        data = resizer(data)

        imgs[idx-1, 0, :, :] = data
        labels[idx-1] = label

figshare = TensorDataset(imgs, labels)

In [69]:
from os import listdir
from gzip import GzipFile

class FigshareDataset(Dataset):
    label_to_string = [
        "meningioma",
        "glioma",
        "pituitary_tumor"
    ]

    def __init__(
        self,
        folder="datasets/figshare",
        device="cpu"
    ):
        self._folder = folder
        self._device = device
        self._length = \
            len(listdir(f"{folder}/meningioma")) + \
            len(listdir(f"{folder}/glioma")) + \
            len(listdir(f"{folder}/pituitary_tumor"))
        self._labels = np.empty(self._length, dtype=int)
        self._resizer = Resize((256, 256))

        for img in listdir(f"{folder}/meningioma"):
            idx = int(img.split(".")[0])
            self._labels[idx-1] = 0
        for img in listdir(f"{folder}/glioma"):
            idx = int(img.split(".")[0])
            self._labels[idx-1] = 1
        for img in listdir(f"{folder}/pituitary_tumor"):
            idx = int(img.split(".")[0])
            self._labels[idx-1] = 2

    def __len__(self):
        return self._length
    
    def __getitem__(self, idx):
        label = self._labels[idx]
        tumor_type = self.label_to_string[label]

        with GzipFile(f"{self._folder}/{tumor_type}/{idx+1}.npy.gz") as f:
            data = np.load(f)

        data = data / np.max(data)
        data = data.astype(np.float32)
        data = torch.tensor(data, dtype=torch.float32, device=self._device)
        data = data.reshape((1, *data.shape))
        data = self._resizer(data)

        return data, label

In [180]:
from sklearn.linear_model import RidgeClassifierCV

class Rocket:
    def __init__(
        self,
        n_filters=10000,
        batch_size=128,
        device="cpu",
        seed=None
    ):
        self._n_filters = n_filters
        self._device = device
        self._classifier = RidgeClassifierCV()
        self._batch_size = batch_size
        self._kernels = []
        self._rng = np.random.default_rng(seed)

    def fit(self, dataset):
        _, height, width = dataset[0][0].shape
        assert height == width, "Input images must be square"

        for _ in range(self._n_filters):
            length = self._rng.choice([7, 9, 11])

            weights = np.empty((1, 1, length, length), dtype=np.float32)
            weights[0,0,:,:] = self._rng.normal(size=(length, length))
            weights[0,0,:,:] = weights[0,0,:,:] - np.mean(weights[0,0,:,:])
            weights = tensor(weights, dtype=torch.float32, device=self._device)
            weights = Parameter(weights, requires_grad=False)

            bias = self._rng.uniform(-1, 1, size=(1,))
            bias = tensor(bias, dtype=torch.float32, device=self._device)
            bias = Parameter(bias, requires_grad=False)

            # CHANGE: different pdilations for H and W
            # a_h = np.log2((height - 1) / (length - 1))
            # a_w = np.log2((width - 1) / (length - 1))
            # dilation_h = np.floor(2**self._rng.uniform(0, a_h)).astype(int)
            # dilation_w = np.floor(2**self._rng.uniform(0, a_w)).astype(int)

            max_exponent = np.log2((height - 1) / (length - 1))
            dilation = np.floor(2**self._rng.uniform(0, max_exponent)).astype(int)

            padding = self._rng.choice(["valid", "same"])

            kernel = Conv2d(
                1, 1, length,
                padding=padding,
                #dilation=(dilation_h, dilation_w),
                dilation=dilation,
                device=self._device
            )
            kernel.weight = weights
            kernel.bias = bias

            self._kernels.append(kernel)

        transformed_dataset, labels = self.transform(dataset)

        self._classifier.fit(transformed_dataset, labels)

        return self
            
    def predict(self, dataset, return_true_labels=False):
        transformed_dataset, real_labels = self.transform(dataset)

        if return_true_labels:
            return real_labels, self._classifier.predict(transformed_dataset)
        
        return self._classifier.predict(transformed_dataset)

    def transform(self, dataset):
        loader = DataLoader(
            dataset,
            batch_size=self._batch_size,
            shuffle=False,
        )
        transformed_dataset = np.empty((len(dataset), self._n_filters * 2))
        transformed_labels = np.empty(len(dataset), dtype=int)

        for batch_idx, (imgs, labels) in enumerate(loader):
            idx_start = batch_idx * self._batch_size
            idx_end = idx_start + labels.shape[0]
            data_idx = np.arange(idx_start, idx_end)

            for i, kernel in enumerate(self._kernels):
                convolutions = kernel(imgs)
                _, _, conv_h, conv_w = convolutions.shape

                max_pool = convolutions.amax(dim=(1,2,3))

                ppv_pool = convolutions.gt(0).sum(dim=(1,2,3)) / (conv_h * conv_w)

                transformed_dataset[data_idx, 2*i] = max_pool.numpy(force=True)
                transformed_dataset[data_idx, 2*i + 1] = ppv_pool.numpy(force=True)

            transformed_labels[data_idx] = labels.numpy(force=True)

        return transformed_dataset, transformed_labels

In [183]:
dataset = figshare
train_dataset, test_dataset = random_split(dataset, [0.7, 0.3])

model = Rocket(n_filters=100, device=device, seed=None)
model.fit(train_dataset)
y_true, y_pred = model.predict(test_dataset, return_true_labels=True)
accuracy_score(y_true, y_pred)

0.8607181719260065

In [166]:
# TODO: check license stuff
# Adapted from https://github.com/angus924/minirocket
from itertools import combinations
from sklearn.linear_model import RidgeClassifierCV

class MiniRocket:
    def __init__(
        self,
        n_features=10000,
        max_dilations_per_kernel=32,
        batch_size=128,
        device="cpu",
        seed=None
    ):
        self._n_features = n_features
        self._max_dilations_per_kernel = max_dilations_per_kernel
        self._device = device
        self._classifier = RidgeClassifierCV()
        self._batch_size = batch_size
        self._rng = np.random.default_rng(seed)

    def _build_kernels(self, dataset, input_size, dilations, n_features_per_dilation, quantiles):
        n_examples = len(dataset)
        indices = np.array([_ for _ in combinations(np.arange(9), 3)], dtype=np.int32)

        n_kernels = len(indices)
        n_dilations = len(dilations)
        self._n_features = n_kernels * np.sum(n_features_per_dilation)

        paddings = ["valid", "same"]
        padding_i = 0
        feature_index_start = 0
        for dilation_index in range(n_dilations):
            dilation = dilations[dilation_index]
            n_features_this_dilation = n_features_per_dilation[dilation_index]

            for kernel_index in range(n_kernels):
                feature_index_end = feature_index_start + n_features_this_dilation

                weights = np.repeat(-1., 9).astype(np.float32)
                weights[indices[kernel_index]] = -2
                weights = weights.reshape((1, 1, 3, 3))
                weights = tensor(weights, dtype=torch.float32, device=self._device)
                weights = Parameter(weights, requires_grad=False)

                kernel = Conv2d(
                    in_channels=1,
                    out_channels=1,
                    kernel_size=3,
                    padding=paddings[padding_i],
                    dilation=dilation,
                    bias=False,
                    device=self._device
                )
                kernel.weight = weights

                random_example, _ = dataset[self._rng.integers(n_examples)]
                random_conv = kernel(random_example)

                this_quantiles = quantiles[feature_index_start:feature_index_end]
                this_quantiles = tensor(this_quantiles, dtype=torch.float32, device=self._device)
                bias = torch.quantile(random_conv, this_quantiles)

                self._kernels.append(kernel)
                self._biases.append(bias)

                feature_index_start = feature_index_end
                padding_i = (padding_i + 1) % 2             

    def fit(self, dataset):
        _, height, width = dataset[0][0].shape
        assert height == width, "Input images must be square"

        n_kernels = 84
        n_features_per_kernel = self._n_features // n_kernels
        true_max_dilations_per_kernel = min(n_features_per_kernel, self._max_dilations_per_kernel)
        multiplier = n_features_per_kernel / true_max_dilations_per_kernel

        max_exponent = np.log2((height - 1) / (9 - 1))
        dilations, n_features_per_dilation = \
        np.unique(np.logspace(0, max_exponent, true_max_dilations_per_kernel, base = 2).astype(np.int32), return_counts = True)
        n_features_per_dilation = (n_features_per_dilation * multiplier).astype(np.int32) # this is a vector

        remainder = n_features_per_kernel - np.sum(n_features_per_dilation)
        i = 0
        while remainder > 0:
            n_features_per_dilation[i] += 1
            remainder -= 1
            i = (i + 1) % len(n_features_per_dilation)

        n_features_per_kernel = np.sum(n_features_per_dilation)

        n_quantiles = n_kernels * n_features_per_kernel
        quantiles = np.array([(_ * ((np.sqrt(5) + 1) / 2)) % 1 for _ in range(1, n_quantiles + 1)], dtype = np.float32)

        self._kernels = []
        self._biases = []
        self._build_kernels(dataset, height, dilations, n_features_per_dilation, quantiles)

        transformed_dataset, real_labels = self.transform(dataset)

        self._classifier.fit(transformed_dataset, real_labels)

        return self
            
    def predict(self, dataset, return_true_labels=False):
        transformed_dataset, real_labels = self.transform(dataset)

        if return_true_labels:
            return real_labels, self._classifier.predict(transformed_dataset)
        
        return self._classifier.predict(transformed_dataset)

    def transform(self, dataset):
        loader = DataLoader(
            dataset,
            batch_size=self._batch_size,
            shuffle=False,
        )
        transformed_dataset = np.empty((len(dataset), self._n_features))
        transformed_labels = np.empty(len(dataset), dtype=int)

        for batch_idx, (imgs, labels) in enumerate(loader):
            idx_start = batch_idx * self._batch_size
            idx_end = idx_start + labels.shape[0]
            data_idx = np.arange(idx_start, idx_end)
            feature_idx = 0

            for kernel, biases in zip(self._kernels, self._biases):
                convolutions = kernel(imgs)
                _, _, conv_h, conv_w = convolutions.shape
                for i in range(biases.shape[0]):
                    results = convolutions - biases[i]

                    ppv_pool = results.gt(0).sum(dim=(1,2,3)) / (conv_h * conv_w)

                    transformed_dataset[data_idx, feature_idx] = ppv_pool.numpy(force=True)

                    feature_idx += 1

            transformed_labels[data_idx] = labels.numpy(force=True)

        return transformed_dataset, transformed_labels

In [174]:
dataset = figshare
train_dataset, test_dataset = random_split(dataset, [0.7, 0.3])

model = MiniRocket(n_features=10000, device=device, seed=None)
model.fit(train_dataset)
y_true, y_pred = model.predict(test_dataset, return_true_labels=True)
accuracy_score(y_true, y_pred)


0.9390642002176278