In [1]:
import pandas as pd
import numpy as np

In [2]:
mit_train = pd.read_csv("mitbih_train.csv", header=None)
mit_test = pd.read_csv("mitbih_test.csv", header=None)

In [3]:
# seperate x, y
x_train_df = mit_train.loc[:, 0: 186]
y_train_df = mit_train.loc[:, 187]

x_test_df = mit_test.loc[:, 0: 186]
y_test_df = mit_test.loc[:, 187]

In [4]:
x_train_np = x_train_df.values
y_train_np = y_train_df.values
x_test_np = x_test_df.values
y_test_np = y_test_df.values

In [5]:
# 调整类别数量
# 0类降采样
len_train_0 = len(y_train_np[y_train_np==0])
index_train_0 = int(0.1*len_train_0)

len_test_0 = len(y_test_np[y_test_np==0])
index_test_0 = int(0.1*len_test_0)
# 重新取样
x_train_np_re = np.copy(x_train_np[y_train_np==0])
np.random.shuffle(x_train_np_re)
x_train_slected = x_train_np_re[0: index_train_0, :]

x_test_np_re = np.copy(x_test_np[y_test_np==0])
np.random.shuffle(x_test_np_re)
x_test_selected = x_test_np_re[0: index_test_0, :]

# 数据重新组合
X_train = np.concatenate([x_train_slected, x_train_np[y_train_np!=0]])
Y_train = np.concatenate([np.zeros(index_train_0), y_train_np[y_train_np!=0]])

X_test = np.concatenate([x_test_selected, x_test_np[y_test_np!=0]])
Y_test = np.concatenate([np.zeros(index_test_0), y_test_np[y_test_np!=0]])
# 数量向量
num = np.array([index_train_0, len(y_train_np[y_train_np==1]), len(y_train_np[y_train_np==2]), len(y_train_np[y_train_np==3]), len(y_train_np[y_train_np==4])])

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

In [7]:
X_Train, Y_Train, X_Test, Y_Test = map(
    torch.from_numpy,
    (X_train, Y_train, X_test, Y_test)
)

In [8]:
X_Train = X_Train.unsqueeze(1)
X_Test = X_Test.unsqueeze(1)
# Data Set
bs = 128
train_ds = TensorDataset(X_Train, Y_Train)
test_ds = TensorDataset(X_Test, Y_Test)
# Data Loader
train_dl = DataLoader(train_ds, batch_size=bs, shuffle=False)
test_dl = DataLoader(test_ds, batch_size=bs*2)

In [9]:
from torchdiffeq import odeint_adjoint

In [81]:
# model
def norm(dim):
    return nn.GroupNorm(min(32, dim), dim) # input_size = output_size Normalize

class ConcatConv1d(nn.Module):

    def __init__(self, dim_in, dim_out, kernel_size=3, stride=1, padding=0, bias=True):
        super(ConcatConv1d, self).__init__()
        module = nn.Conv1d
        self._layer = module(
            dim_in + 1, dim_out, kernel_size=kernel_size, stride=stride, padding=padding,
            bias=bias
        )

    def forward(self, t, x):
        tt = torch.ones_like(x[:, :1, :]) * t
        ttx = torch.cat([tt, x], 1)
        return self._layer(ttx)

class ODEfunc(nn.Module):

    def __init__(self, dim):
        super(ODEfunc, self).__init__()
        self.norm1 = norm(dim)
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = ConcatConv1d(dim, dim, 3, 1, 1)
        self.norm2 = norm(dim)
        self.conv2 = ConcatConv1d(dim, dim, 3, 1, 1)
        self.norm3 = norm(dim)
        self.nfe = 0

    def forward(self, t, x):
        self.nfe += 1
        out = self.norm1(x)
        out = self.relu(out)
        out = self.conv1(t, out)
        out = self.norm2(out)
        out = self.relu(out)
        out = self.conv2(t, out)
        out = self.norm3(out)
        return out
    
class ODEnet(nn.Module): # Solver

    def __init__(self, odefunc, rtol=1e-3, atol=1e-3):
        super(ODEnet, self).__init__()
        self.odefunc = odefunc
        self.forward_time = torch.tensor([0, 1]).float()
        self.rtol = rtol
        self.atol = atol

    def forward(self, x):
        self.forward_time = self.forward_time.type_as(x)
        out = odeint_adjoint(self.odefunc, x, self.forward_time, rtol=self.rtol, atol=self.atol)
        return out[1]
    
    @property
    def nfe(self):
        return self.odefunc.nfe
    
    @nfe.setter
    def nfe(self, value):
        self.odefunc.nfe = value

class Flatten(nn.Module): #  3D->2D linear input
    def _init_(self):
        super(Flatten, self)._init_()

    def forward(self, x):
        shape = torch.prod(torch.tensor(x.shape[1:])).item()
        return x.view(-1, shape)

In [84]:
def model_building(dim=64, **kwargs):
    # ODE net building
    downsampling_layers = [
        nn.Conv1d(1, dim, 3, 1),
        norm(dim),
        nn.ReLU(inplace=True),
        nn.Conv1d(dim, dim, 4, 2, 1),
        norm(dim),
        nn.ReLU(inplace=True),
        nn.Conv1d(dim, dim, 4, 2, 1)
    ]
    # feature extraction layers
    feature_layers = [ODEnet(ODEfunc(dim), **kwargs)]
    # fc_layers
    fc_layers = [
        norm(dim), 
        nn.ReLU(inplace=True),
        nn.AdaptiveAvgPool1d(1),
        Flatten(),
        nn.Linear(dim, 5)
        ]
    
    model = nn.Sequential(*downsampling_layers, *feature_layers, *fc_layers)
    opt = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
    return model, opt

# loss function
def criterion_bulid(weight_vector,weight=False):
    if weight:
        criterion = nn.CrossEntropyLoss(weight=torch.from_numpy(weight_vector).float())
    else:
        criterion = nn.CrossEntropyLoss()
    return criterion

def fit(epochs, model, criterion, opt, train_dl, valid_dl):
    num_batches = len(train_dl)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    criterion.to(device)
    batch_count = 0
    for epoch in range(epochs):
        print(f"Training epoch{epoch + 1}")
        model.train() #training mode
        for xt, yt in train_dl:
            batch_count+=1
            xt = xt.to(device)
            yt = yt.to(device)

            output_train = model(xt.float())
            loss = criterion(output_train, yt.long())
            # 反向传播
            loss.backward()
            opt.step()
            opt.zero_grad()
        
        model.eval() # validation mode
        predicted_labels = []
        actual_labels = []
        for xv, yv in valid_dl:
            xv = xv.to(device)
            yv=  yv.to(device)
            output_valid = model(xv.float())
            _, predicted = torch.max(output_valid, 1)
            predicted_labels.extend(predicted.tolist())
            actual_labels.extend(yv.tolist())
        
        correct_prediction = sum(p == a for p, a in zip(predicted_labels, actual_labels))
        accuracy = correct_prediction / len(actual_labels)

        print(f'validation accuracy: {accuracy * 100:.2f}%')
        return predicted_labels, actual_labels

In [None]:
criterion = criterion_bulid(1/num, weight=True)
odenet, odeopt = model_building(dim=64, rtol=1e-3, atol=1e-3)
fit(epochs=5, model=odenet, criterion=criterion, opt=odeopt, train_dl=train_dl, valid_dl=test_dl)