In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split

%matplotlib inline
sns.set_style('darkgrid')

In [2]:
mit_train = pd.read_csv('./data/mitbih_train.csv', header=None)
# mit_test = pd.read_csv('./data/mitbih_test.csv', header=None)

In [3]:
mit_train = mit_train.sample(2048)

In [4]:
# Separate target from data
y_train = mit_train[187]
X_train = mit_train.loc[:, :186]

# y_test = mit_test[187]
# X_test = mit_test.loc[:, :186]

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from torchdiffeq import odeint_adjoint as odeint

In [6]:
X_train, y_train = map(
    torch.from_numpy, (X_train.values, y_train.values)
)

In [7]:
# Convert to 3D tensor
X_train = X_train.unsqueeze(1)

# X_test = X_test.unsqueeze(1)

In [8]:
# Batch size
bs = 128

train_ds = TensorDataset(X_train, y_train)
train_dl = DataLoader(train_ds, batch_size=bs, shuffle=True)

# test_ds = TensorDataset(X_test, y_test)
# test_dl = DataLoader(test_ds, batch_size=bs * 2)

In [9]:
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv1d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv1d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


def norm(dim):
    return nn.GroupNorm(min(32, dim), dim)

In [10]:
class ConcatConv1d(nn.Module):

    def __init__(self, dim_in, dim_out, ksize=3, stride=1, padding=0, dilation=1, groups=1, bias=True, transpose=False):
        super(ConcatConv1d, self).__init__()
        module = nn.ConvTranspose1d if transpose else nn.Conv1d
        self._layer = module(
            dim_in + 1, dim_out, kernel_size=ksize, stride=stride, padding=padding, dilation=dilation, groups=groups,
            bias=bias
        )

    def forward(self, t, x):
        tt = torch.ones_like(x[:, :1, :]) * t
        ttx = torch.cat([tt, x], 1)
        return self._layer(ttx)

In [11]:
class ODEfunc(nn.Module):

    def __init__(self, dim):
        super(ODEfunc, self).__init__()
        self.norm1 = norm(dim)
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = ConcatConv1d(dim, dim, 3, 1, 1)
        self.norm2 = norm(dim)
        self.conv2 = ConcatConv1d(dim, dim, 3, 1, 1)
        self.norm3 = norm(dim)
        self.nfe = 0

    def forward(self, t, x):
        self.nfe += 1
        out = self.norm1(x)
        out = self.relu(out)
        out = self.conv1(t, out)
        out = self.norm2(out)
        out = self.relu(out)
        out = self.conv2(t, out)
        out = self.norm3(out)
        return out

In [12]:
class ODEBlock(nn.Module):

    def __init__(self, odefunc):
        super(ODEBlock, self).__init__()
        self.odefunc = odefunc
        self.integration_time = torch.tensor([0, 1]).float()

    def forward(self, x):
        self.integration_time = self.integration_time.type_as(x)
        out = odeint(self.odefunc, x, self.integration_time, rtol=1e-1, atol=1e-4)
        return out[1]

    @property
    def nfe(self):
        return self.odefunc.nfe

    @nfe.setter
    def nfe(self, value):
        self.odefunc.nfe = value

In [13]:
class Flatten(nn.Module):

    def __init__(self):
        super(Flatten, self).__init__()

    def forward(self, x):
        shape = torch.prod(torch.tensor(x.shape[1:])).item()
        return x.view(-1, shape)

In [14]:
downsampling_layers = [
            nn.Conv1d(1, 64, 3, 1),
            norm(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 64, 3, 2, 1),
            norm(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 64, 3, 2, 1)
        ]

feature_layers = [ODEBlock(ODEfunc(64))]

fc_layers = [norm(64), nn.ReLU(inplace=True), nn.AdaptiveAvgPool1d(1), Flatten(), nn.Linear(64, 5)]

In [15]:
model = nn.Sequential(*downsampling_layers, *feature_layers, *fc_layers)

In [16]:
criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters())

In [18]:
count = 0
for xb, yb in train_dl:
    count += 1
    print(f"Batch {count}")
    opt.zero_grad()
    logits = model(xb.float())
    loss = criterion(logits, yb.long())
    
    nfe_forward = feature_layers[0].nfe
    feature_layers[0].nfe = 0
    
    loss.backward()
    opt.step()
    
    nfe_backward = feature_layers[0].nfe
    feature_layers[0].nfe = 0
    

Batch 1
Batch 2
Batch 3
Batch 4
Batch 5
Batch 6
Batch 7
Batch 8
Batch 9
Batch 10
Batch 11
Batch 12
Batch 13
Batch 14
Batch 15
Batch 16


In [20]:
with torch.no_grad():
    logits = model(X_train.float())
    

In [21]:
criterion(logits, y_train.long())

tensor(0.6731)