This notebook shows the implementation of a skorch wrapper class containing the InceptionTime model. Code for the model itself was based on https://github.com/okrasolar/pytorch-timeseries

Author: Christos C. Papadopoulos
https://github.com/Christosc96

In [5]:
!pip install skorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting skorch
  Downloading skorch-0.11.0-py3-none-any.whl (155 kB)
[K     |████████████████████████████████| 155 kB 4.9 MB/s 
Installing collected packages: skorch
Successfully installed skorch-0.11.0


In [6]:

import torch
from torch import nn
import torch.nn.functional as F

from sklearn.datasets import make_classification
from sklearn.preprocessing import MinMaxScaler
import numpy as np


from typing import cast, Union, List
from skorch import NeuralNetClassifier

In [7]:
'''
Define conv1d with same padding
'''



class Conv1dSamePadding(nn.Conv1d):
    """Represents the "Same" padding functionality from Tensorflow.
    See: https://github.com/pytorch/pytorch/issues/3867
    Note that the padding argument in the initializer doesn't do anything now
    """
    def forward(self, input):
        return conv1d_same_padding(input, self.weight, self.bias, self.stride,
                                   self.dilation, self.groups)


def conv1d_same_padding(input, weight, bias, stride, dilation, groups):
    # stride and dilation are expected to be tuples.
    kernel, dilation, stride = weight.size(2), dilation[0], stride[0]
    l_out = l_in = input.size(2)
    padding = (((l_out - 1) * stride) - l_in + (dilation * (kernel - 1)) + 1)
    if padding % 2 != 0:
        input = F.pad(input, [0, 1])

    return F.conv1d(input=input, weight=weight, bias=bias, stride=stride,
                    padding=padding // 2,
                    dilation=dilation, groups=groups)


class ConvBlock(nn.Module):

    def __init__(self, in_channels: int, out_channels: int, kernel_size: int,
                 stride: int) -> None:
        super().__init__()

        self.layers = nn.Sequential(
            Conv1dSamePadding(in_channels=in_channels,
                              out_channels=out_channels,
                              kernel_size=kernel_size,
                              stride=stride),
            nn.BatchNorm1d(num_features=out_channels),
            nn.ReLU(),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:  # type: ignore

        return self.layers(x)

Define the _InceptionModel class. Code is based on https://github.com/okrasolar/pytorch-timeseries

In [8]:
class _InceptionModel(nn.Module):
    """A PyTorch implementation of the InceptionTime model.
    From https://arxiv.org/abs/1909.04939
    Attributes
    ----------
    num_blocks:
        The number of inception blocks to use. One inception block consists
        of 3 convolutional layers, (optionally) a bottleneck and (optionally) a residual
        connector
    in_channels:
        The number of input channels (i.e. input.shape[-1])
    out_channels:
        The number of "hidden channels" to use. Can be a list (for each block) or an
        int, in which case the same value will be applied to each block
    bottleneck_channels:
        The number of channels to use for the bottleneck. Can be list or int. If 0, no
        bottleneck is applied
    kernel_sizes:
        The size of the kernels to use for each inception block. Within each block, each
        of the 3 convolutional layers will have kernel size
        `[kernel_size // (2 ** i) for i in range(3)]`
    num_pred_classes:
        The number of output classes
    """

    def __init__(self, num_blocks: int, in_channels: int, out_channels: Union[List[int], int],
                 bottleneck_channels: Union[List[int], int], kernel_sizes: Union[List[int], int],
                 use_residuals: Union[List[bool], bool, str] = 'default',
                 num_pred_classes: int = 1
                 ) -> None:
        super().__init__()

        # for easier saving and loading
        self.input_args = {
            'num_blocks': num_blocks,
            'in_channels': in_channels,
            'out_channels': out_channels,
            'bottleneck_channels': bottleneck_channels,
            'kernel_sizes': kernel_sizes,
            'use_residuals': use_residuals,
            'num_pred_classes': num_pred_classes
        }

        channels = [in_channels] + cast(List[int], self._expand_to_blocks(out_channels,
                                                                          num_blocks))
        bottleneck_channels = cast(List[int], self._expand_to_blocks(bottleneck_channels,
                                                                     num_blocks))
        kernel_sizes = cast(List[int], self._expand_to_blocks(kernel_sizes, num_blocks))
        if use_residuals == 'default':
            use_residuals = [True if i % 3 == 2 else False for i in range(num_blocks)]
        use_residuals = cast(List[bool], self._expand_to_blocks(
            cast(Union[bool, List[bool]], use_residuals), num_blocks)
        )

        self.blocks = nn.Sequential(*[
            _InceptionBlock(in_channels=channels[i], out_channels=channels[i + 1],
                           residual=use_residuals[i], bottleneck_channels=bottleneck_channels[i],
                           kernel_size=kernel_sizes[i]) for i in range(num_blocks)
        ])

        # a global average pooling (i.e. mean of the time dimension) is why
        # in_features=channels[-1]
        self.linear = nn.Linear(in_features=channels[-1], out_features=num_pred_classes)

    @staticmethod
    def _expand_to_blocks(value: Union[int, bool, List[int], List[bool]],
                          num_blocks: int) -> Union[List[int], List[bool]]:
        if isinstance(value, list):
            assert len(value) == num_blocks, \
                f'Length of inputs lists must be the same as num blocks, ' \
                f'expected length {num_blocks}, got {len(value)}'
        else:
            value = [value] * num_blocks
        return value

    def forward(self, x: torch.Tensor) -> torch.Tensor:  # type: ignore
        x = self.blocks(x).mean(dim=-1)  # the mean is the global average pooling
        return self.linear(x)


class _InceptionBlock(nn.Module):
    """An inception block consists of an (optional) bottleneck, followed
    by 3 conv1d layers. Optionally residual
    """

    def __init__(self, in_channels: int, out_channels: int,
                 residual: bool, stride: int = 1, bottleneck_channels: int = 32,
                 kernel_size: int = 41) -> None:
        assert kernel_size > 3, "Kernel size must be strictly greater than 3"
        super().__init__()

        self.use_bottleneck = bottleneck_channels > 0
        if self.use_bottleneck:
            self.bottleneck = Conv1dSamePadding(in_channels, bottleneck_channels,
                                                kernel_size=1, bias=False)
        kernel_size_s = [kernel_size // (2 ** i) for i in range(3)]
        start_channels = bottleneck_channels if self.use_bottleneck else in_channels
        channels = [start_channels] + [out_channels] * 3
        self.conv_layers = nn.Sequential(*[
            Conv1dSamePadding(in_channels=channels[i], out_channels=channels[i + 1],
                              kernel_size=kernel_size_s[i], stride=stride, bias=False)
            for i in range(len(kernel_size_s))
        ])

        self.batchnorm = nn.BatchNorm1d(num_features=channels[-1])
        self.relu = nn.ReLU()

        self.use_residual = residual
        if residual:
            self.residual = nn.Sequential(*[
                Conv1dSamePadding(in_channels=in_channels, out_channels=out_channels,
                                  kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(out_channels),
                nn.ReLU()
            ])

    def forward(self, x: torch.Tensor) -> torch.Tensor:  # type: ignore
        org_x = x
        if self.use_bottleneck:
            x = self.bottleneck(x)
        x = self.conv_layers(x)

        if self.use_residual:
            x = x + self.residual(org_x)
        return x

In [9]:
'''
wrapper scorch class
'''


class InceptionTimeClassifier(NeuralNetClassifier):
  def __init__(self, num_blocks=2, in_channels=1, out_channels=2,
                           bottleneck_channels=2, kernel_sizes=41, use_residuals=True,
                           num_pred_classes=1, learning_rate=0.05, batch_size=1000, criterion = nn.BCEWithLogitsLoss, max_epochs = 50):
    
    self.inception_model = _InceptionModel(num_blocks=2, in_channels=1, out_channels=2,
                           bottleneck_channels=2, kernel_sizes=41, use_residuals=True,
                           num_pred_classes=1)
    
    super(InceptionTimeClassifier, self).__init__(
            module=self.inception_model,
            max_epochs=max_epochs,
            lr=learning_rate,
            batch_size=batch_size,
            criterion=criterion ,
            # Shuffle training data on each epoch
            iterator_train__shuffle=False,
            device='cuda',
            )

Generate data for training the model. The data used originates from the FordA dataset which is included in the InceptionTime paper as well

In [10]:
def readucr(filename):
    data = np.loadtxt(filename, delimiter="\t")
    y = data[:, 0]
    x = data[:, 1:]
    return x, y.astype(int)


root_url = "https://raw.githubusercontent.com/hfawaz/cd-diagram/master/FordA/"

x_train, y_train = readucr(root_url + "FordA_TRAIN.tsv")
x_test, y_test = readucr(root_url + "FordA_TEST.tsv")

x_train = x_train.reshape((x_train.shape[0], 1, x_train.shape[1], ))
x_test = x_test.reshape((x_test.shape[0], 1, x_test.shape[1]))
y_train[y_train == -1] = 0
y_test[y_test == -1] = 0
x_train = torch.from_numpy(x_train)
y_train = torch.from_numpy(y_train)
y_train=y_train.unsqueeze(1)

x_train=x_train.to(torch.float32)
y_train=y_train.to(torch.float32)

To train the model, since we are using a Scorch wrapper, simply call fit after creating it. 

In [18]:
model = InceptionTimeClassifier(out_channels=10, bottleneck_channels=10, batch_size=500, max_epochs=20, learning_rate=0.5)
model.fit(x_train, y_train)

  epoch    train_loss    valid_acc    valid_loss     dur
-------  ------------  -----------  ------------  ------
      1        [36m0.6988[0m       [32m0.4868[0m        [35m0.6935[0m  0.1272
      2        [36m0.6930[0m       [32m0.5132[0m        [35m0.6927[0m  0.1280
      3        [36m0.6921[0m       0.5132        [35m0.6926[0m  0.1027
      4        [36m0.6904[0m       0.5132        [35m0.6922[0m  0.0912
      5        [36m0.6824[0m       0.5132        [35m0.6902[0m  0.0862
      6        [36m0.6707[0m       [32m0.5520[0m        [35m0.6787[0m  0.0742
      7        [36m0.6656[0m       [32m0.6491[0m        [35m0.6657[0m  0.0755
      8        [36m0.6593[0m       [32m0.6602[0m        [35m0.6566[0m  0.0759
      9        [36m0.6535[0m       0.6519        [35m0.6483[0m  0.0713
     10        [36m0.6462[0m       [32m0.6727[0m        [35m0.6389[0m  0.0725
     11        [36m0.6353[0m       [32m0.6935[0m        [35m0.6223[0m  0.074

<class '__main__.InceptionTimeClassifier'>[initialized](
  module_=_InceptionModel(
    (blocks): Sequential(
      (0): _InceptionBlock(
        (bottleneck): Conv1dSamePadding(1, 2, kernel_size=(1,), stride=(1,), bias=False)
        (conv_layers): Sequential(
          (0): Conv1dSamePadding(2, 2, kernel_size=(41,), stride=(1,), bias=False)
          (1): Conv1dSamePadding(2, 2, kernel_size=(20,), stride=(1,), bias=False)
          (2): Conv1dSamePadding(2, 2, kernel_size=(10,), stride=(1,), bias=False)
        )
        (batchnorm): BatchNorm1d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU()
        (residual): Sequential(
          (0): Conv1dSamePadding(1, 2, kernel_size=(1,), stride=(1,), bias=False)
          (1): BatchNorm1d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU()
        )
      )
      (1): _InceptionBlock(
        (bottleneck): Conv1dSamePadding(2, 2, kernel_size=(1,), stride=(1,), bia