In [None]:
# default_exp models.InceptionTimePlus

# InceptionTimePlus

> This is an unofficial PyTorch implementation of InceptionTime (Fawaz, 2019) created by Ignacio Oguiza.

**References:**
* Fawaz, H. I., Lucas, B., Forestier, G., Pelletier, C., Schmidt, D. F., Weber, J., ... & Petitjean, F. (2020). [Inceptiontime: Finding alexnet for time series classification. Data Mining and Knowledge Discovery, 34(6), 1936-1962.](https://arxiv.org/pdf/1909.04939)
* Official InceptionTime tensorflow implementation: https://github.com/hfawaz/InceptionTime

In [None]:
#export
from tsai.imports import *
from tsai.utils import *
from tsai.models.layers import *
from tsai.models.utils import *
torch.set_num_threads(cpus)

In [None]:
#export
# This is an unofficial PyTorch implementation by Ignacio Oguiza - oguiza@gmail.com modified from:

# Fawaz, H. I., Lucas, B., Forestier, G., Pelletier, C., Schmidt, D. F., Weber, J., ... & Petitjean, F. (2019). 
# InceptionTime: Finding AlexNet for Time Series Classification. arXiv preprint arXiv:1909.04939.
# Official InceptionTime tensorflow implementation: https://github.com/hfawaz/InceptionTime

    
class InceptionModulePlus(Module):
    def __init__(self, ni, nf, ks=40, bottleneck=True, padding='same', coord=False, separable=False, dilation=1, stride=1, conv_dropout=0., sa=False, se=None,
                 norm='Batch', zero_norm=False, bn_1st=True, act=nn.ReLU, act_kwargs={}):
        if isinstance(ks, Integral): ks = [ks // (2**i) for i in range(3)]
        ks = [ksi if ksi % 2 != 0 else ksi - 1 for ksi in ks]  # ensure odd ks for padding='same'
        bottleneck = False if ni == nf else bottleneck
        self.bottleneck = Conv(ni, nf, 1, coord=coord, bias=False) if bottleneck else noop # 
        self.convs = nn.ModuleList()
        for i in range(len(ks)): self.convs.append(Conv(nf if bottleneck else ni, nf, ks[i], padding=padding, coord=coord, separable=separable,
                                                         dilation=dilation**i, stride=stride, bias=False))
        self.mp_conv = nn.Sequential(*[nn.MaxPool1d(3, stride=1, padding=1), Conv(ni, nf, 1, coord=coord, bias=False)])
        self.concat = Concat()
        self.norm = Norm(nf * 4, norm=norm, zero_norm=zero_norm)
        self.conv_dropout = nn.Dropout(conv_dropout) if conv_dropout else noop
        self.sa = SimpleSelfAttention(nf * 4) if sa else noop
        self.act = act(**act_kwargs) if act else noop
        self.se = nn.Sequential(SqueezeExciteBlock(nf * 4, reduction=se), BN1d(nf * 4)) if se else noop
        
        self._init_cnn(self)
    
    def _init_cnn(self, m):
        if getattr(self, 'bias', None) is not None: nn.init.constant_(self.bias, 0)
        if isinstance(self, (nn.Conv1d,nn.Conv2d,nn.Conv3d,nn.Linear)): nn.init.kaiming_normal_(self.weight)
        for l in m.children(): self._init_cnn(l)

    def forward(self, x):
        input_tensor = x
        x = self.bottleneck(x)
        x = self.concat([l(x) for l in self.convs] + [self.mp_conv(input_tensor)])
        x = self.norm(x)
        x = self.conv_dropout(x)
        x = self.sa(x)
        x = self.act(x)
        x = self.se(x)
        return x


@delegates(InceptionModulePlus.__init__)
class InceptionBlockPlus(Module):
    def __init__(self, ni, nf, residual=True, depth=6, coord=False, norm='Batch', zero_norm=False, act=nn.ReLU, act_kwargs={}, sa=False, se=None, 
                 keep_prob=1., **kwargs):
        self.residual, self.depth = residual, depth
        self.inception, self.shortcut, self.act = nn.ModuleList(), nn.ModuleList(), nn.ModuleList()
        for d in range(depth):
            self.inception.append(InceptionModulePlus(ni if d == 0 else nf * 4, nf, coord=coord, norm=norm, 
                                                      zero_norm=zero_norm if d % 3 == 2 else False,
                                                      act=act if d % 3 != 2 else None, act_kwargs=act_kwargs, 
                                                      sa=sa if d % 3 == 2 else False,
                                                      se=se if d % 3 != 2 else None,
                                                      **kwargs))
            if self.residual and d % 3 == 2:
                n_in, n_out = ni if d == 2 else nf * 4, nf * 4
                self.shortcut.append(Norm(n_in, norm=norm) if n_in == n_out else ConvBlock(n_in, n_out, 1, coord=coord, bias=False, norm=norm, act=None))
                self.act.append(act(**act_kwargs))
        self.add = Add()
        self.keep_prob = keep_prob

    def forward(self, x):
        res = x
        for i in range(self.depth):
            if self.training and self.keep_prob[i//3] < 1. and self.keep_prob[i//3] < random.random() and self.residual and i % 3 == 2: 
                res = x = self.act[i//3](self.shortcut[i//3](res))
            else: 
                x = self.inception[i](x)
                if self.residual and i % 3 == 2: res = x = self.act[i//3](self.add(x, self.shortcut[i//3](res)))
        return x


@delegates(InceptionModulePlus.__init__)
class InceptionTimePlus(Module):
    def __init__(self, c_in, c_out, seq_len=None, nf=32, nb_filters=None, concat_pool=False, fc_dropout=0., depth=6, stoch_depth=1., y_range=None, 
                 flatten=False, custom_head=None, **kwargs):
        
        nf = ifnone(nf, nb_filters) # for compatibility
        self.fc_dropout, self.c_out, self.y_range = fc_dropout, c_out, y_range
        self.c_out = c_out
        
        if stoch_depth is not 0: keep_prob = np.linspace(1, stoch_depth, depth // 3)
        else: keep_prob = np.array([1] * depth // 3)
        self.inceptionblock = InceptionBlockPlus(c_in, nf, depth=depth, keep_prob=keep_prob, **kwargs)
        
        self.head_nf = nf * 4
        self.flatten = None
        if flatten:  self.head_nf *= seq_len
        self.flatten = Flatten() if flatten else None
        if custom_head: self.head = custom_head(self.head_nf, c_out)
        else: self.head = self.create_head(self.head_nf, c_out, concat_pool=concat_pool, fc_dropout=fc_dropout, y_range=y_range)
        
    def create_head(self, nf, c_out, concat_pool=False, fc_dropout=0., y_range=None, **kwargs):
        if concat_pool: nf = nf * 2
        layers = [GACP1d(1) if concat_pool else GAP1d(1)]
        if fc_dropout: layers += [nn.Dropout(fc_dropout)]
        layers += [nn.Linear(nf, c_out)]
        if y_range: layers += [SigmoidRange(*y_range)]
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.inceptionblock(x)
        if self.flatten is not None: x = self.flatten(x)
        x = self.head(x)
        return x


class InCoordTime(InceptionTimePlus):
    def __init__(self, *args, coord=True, zero_norm=True, **kwargs):
        super().__init__(*args, coord=coord, zero_norm=zero_norm, **kwargs)


class XCoordTime(InceptionTimePlus):
    def __init__(self, *args, coord=True, separable=True, zero_norm=True, **kwargs):
        super().__init__(*args, coord=coord, separable=separable, zero_norm=zero_norm, **kwargs)
        
InceptionTimePlus17x17 = partial(InceptionTimePlus, nf=17, depth=3)
setattr(InceptionTimePlus17x17, '__name__', 'InceptionTimePlus17x17')
InceptionTimePlus32x32 = InceptionTimePlus
InceptionTimePlus47x47 = partial(InceptionTimePlus, nf=47, depth=9)
setattr(InceptionTimePlus47x47, '__name__', 'InceptionTimePlus47x47')
InceptionTimePlus62x62 = partial(InceptionTimePlus, nf=62, depth=9)
setattr(InceptionTimePlus62x62, '__name__', 'InceptionTimePlus62x62')

In [None]:
#export
@delegates(InceptionTimePlus.__init__)
class MultiInceptionTimePlus(Module):
    _arch = InceptionTimePlus
    def __init__(self, feat_mask, c_out, seq_len=None, **kwargs):
        r"""
        MultiInceptionTimePlus is a class that allows you to create a model with multiple branches of InceptionTimePlus.
        
        Args:
            - feat_mask: list with number of features that will be passed to each body.
        """
        self.feat_mask = [feat_mask] if isinstance(feat_mask, int) else feat_mask 
        self.c_out = c_out
        
        # Body
        self.branches = nn.ModuleList()
        self.head_nf = 0
        for feat in self.feat_mask:
            m = create_model(self._arch, c_in=feat, c_out=c_out, seq_len=seq_len, **kwargs)
            self.head_nf += m.head_nf
            m.head = Noop
            self.branches.append(m)
        
        # Head
        self.head = self._arch.create_head(self, self.head_nf, c_out, **kwargs)

        
    def forward(self, x):
        x = torch.split(x, self.feat_mask, dim=1)
        for i, branch in enumerate(self.branches):
            out = branch(x[i]) if i == 0 else torch.cat([out, branch(x[i])], dim=1)
        return self.head(out)

In [None]:
bs = 16
n_vars = 3
seq_len = 12
c_out = 2
xb = torch.rand(bs, n_vars, seq_len)

test_eq(InceptionTimePlus(n_vars,c_out)(xb).shape, [bs, c_out])
test_eq(InceptionTimePlus(n_vars,c_out,concat_pool=True)(xb).shape, [bs, c_out])
test_eq(InceptionTimePlus(n_vars,c_out, bottleneck=False)(xb).shape, [bs, c_out])
test_eq(InceptionTimePlus(n_vars,c_out, residual=False)(xb).shape, [bs, c_out])
test_eq(InceptionTimePlus(n_vars,c_out, conv_dropout=.5)(xb).shape, [bs, c_out])
test_eq(InceptionTimePlus(n_vars,c_out, coord=True, separable=True, 
                          norm='Instance', zero_norm=True, bn_1st=False, fc_dropout=.5, sa=True, se=True, act=nn.PReLU, act_kwargs={})(xb).shape, [bs, c_out])
test_eq(InceptionTimePlus(n_vars,c_out, coord=True, separable=True,
                          norm='Instance', zero_norm=True, bn_1st=False, act=nn.PReLU, act_kwargs={})(xb).shape, [bs, c_out])
test_eq(total_params(InceptionTimePlus(3, 2))[0], 455490)
test_eq(total_params(InceptionTimePlus(6, 2, **{'coord': True, 'separable': True, 'zero_norm': True}))[0], 77204)
test_eq(total_params(InceptionTimePlus(3, 2, ks=40))[0], total_params(InceptionTimePlus(3, 2, ks=[9, 19, 39]))[0])

In [None]:
InceptionTimePlus(n_vars, c_out, seq_len=seq_len, zero_norm=True, flatten=True, custom_head=create_mlp_head)(xb).shape

torch.Size([16, 2])

In [None]:
test_eq(check_bias(InceptionTimePlus(2,3, zero_norm=True), is_conv)[0].sum(), 0)
test_eq(check_weight(InceptionTimePlus(2,3, zero_norm=True), is_bn)[0].sum(), 6)
test_eq(check_weight(InceptionTimePlus(2,3), is_bn)[0], np.array([1., 1., 1., 1., 1., 1., 1., 1.]))

In [None]:
test_eq(count_parameters(MultiInceptionTimePlus([1,1,1], c_out)) > count_parameters(MultiInceptionTimePlus(3, c_out)), True)
test_eq(MultiInceptionTimePlus([1,1,1], c_out)(xb).shape, MultiInceptionTimePlus(3, c_out)(xb).shape)

In [None]:
for i in range(10): InceptionTimePlus(n_vars,c_out,stoch_depth=0.8,depth=9,zero_norm=True)(xb)

In [None]:
net = InceptionTimePlus(2,3,**{'coord': True, 'separable': True, 'zero_norm': True})
test_eq(check_weight(net, is_bn)[0], np.array([1., 1., 0., 1., 1., 0., 1., 1.]))
net

InceptionTimePlus(
  (inceptionblock): InceptionBlockPlus(
    (inception): ModuleList(
      (0): InceptionModulePlus(
        (bottleneck): ConvBlock(
          (0): AddCoords1d()
          (1): Conv1d(3, 32, kernel_size=(1,), stride=(1,), bias=False)
        )
        (convs): ModuleList(
          (0): ConvBlock(
            (0): AddCoords1d()
            (1): SeparableConv1d(
              (depthwise_conv): Conv1d(33, 33, kernel_size=(39,), stride=(1,), padding=(19,), groups=33, bias=False)
              (pointwise_conv): Conv1d(33, 32, kernel_size=(1,), stride=(1,), bias=False)
            )
          )
          (1): ConvBlock(
            (0): AddCoords1d()
            (1): SeparableConv1d(
              (depthwise_conv): Conv1d(33, 33, kernel_size=(19,), stride=(1,), padding=(9,), groups=33, bias=False)
              (pointwise_conv): Conv1d(33, 32, kernel_size=(1,), stride=(1,), bias=False)
            )
          )
          (2): ConvBlock(
            (0): AddCoords1d()
   

In [None]:
bs = 16
n_vars = 3
seq_len = 12
c_out = 2
xb = torch.rand(bs, n_vars, seq_len)
net = MultiInceptionTimePlus(n_vars, c_out)
change_model_head(net, create_pool_plus_head, concat_pool=False)
print(net(xb).shape)
net.head

torch.Size([16, 2])


Sequential(
  (0): AdaptiveAvgPool1d(output_size=1)
  (1): Flatten(full=False)
  (2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (3): Linear(in_features=128, out_features=512, bias=False)
  (4): ReLU(inplace=True)
  (5): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (6): Linear(in_features=512, out_features=2, bias=False)
)

In [None]:
#hide
out = create_scripts()
beep(out)

<IPython.core.display.Javascript object>

Converted 000_utils.ipynb.
Converted 000b_data.validation.ipynb.
Converted 001_data.external.ipynb.
Converted 002_data.core.ipynb.
Converted 003_data.preprocessing.ipynb.
Converted 003b_data.transforms.ipynb.
Converted 003c_data.image.ipynb.
Converted 005_data.tabular.ipynb.
Converted 006_data.mixed.ipynb.
Converted 007_metrics.ipynb.
Converted 008_learner.ipynb.
Converted 009_optimizer.ipynb.
Converted 010_callback.core.ipynb.
Converted 011_callback.semi_supervised.ipynb.
Converted 100_models.utils.ipynb.
Converted 100b_models.layers.ipynb.
Converted 101_models.ResNet.ipynb.
Converted 101b_models.ResNetPlus.ipynb.
Converted 102_models.InceptionTime.ipynb.
Converted 102b_models.InceptionTimePlus.ipynb.
Converted 103_models.MLP.ipynb.
Converted 103b_models.FCN.ipynb.
Converted 103c_models.FCNPlus.ipynb.
Converted 104_models.ResCNN.ipynb.
Converted 105_models.RNN.ipynb.
Converted 105_models.RNNPlus.ipynb.
Converted 106_models.XceptionTime.ipynb.
Converted 106b_models.XceptionTimePlus.ipy