In [None]:
!gdown --id 16MIleqoIr1vYxlGk4GKnGmrsCPuWkkpT

In [None]:
!unzip -qq ECG5000.zip

In [None]:
import torch

import copy
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split

from torch import nn, optim

import torch.nn.functional as F
#from arff2pandas import a2p


%matplotlib inline
%config InlineBackend.figure_format='retina'

sns.set(style='whitegrid', palette='muted', font_scale=1.2)

HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]

sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))

rcParams['figure.figsize'] = 12, 8

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

In [None]:
from scipy.io import arff

# TRAINデータの読み込み
with open('ECG5000_TRAIN.arff', 'r') as f:
    data, meta = arff.loadarff(f)
train = pd.DataFrame(data)

# TESTデータの読み込み
with open('ECG5000_TEST.arff', 'r') as f:
    data, meta = arff.loadarff(f)
test = pd.DataFrame(data)

In [None]:
df = train.append(test)
df = df.sample(frac=1.0)
df.shape

In [None]:
df.head()

In [None]:
CLASS_NORMAL = "b'1'"

class_names = ['Normal','R on T','PVC','SP','UB']

In [None]:
new_columns = list(df.columns)
new_columns[-1] = 'target'
df.columns = new_columns

In [None]:
df.columns

In [None]:
df.target.value_counts()

In [None]:
ax = sns.countplot(df.target)
ax.set_xticklabels(class_names);

In [None]:
def plot_time_series_class(data, class_name, ax, n_steps=10):
  time_series_df = pd.DataFrame(data)

  smooth_path = time_series_df.rolling(n_steps).mean()
  path_deviation = 2 * time_series_df.rolling(n_steps).std()

  under_line = (smooth_path - path_deviation)[0]
  over_line = (smooth_path + path_deviation)[0]

  ax.plot(smooth_path, linewidth=2)
  ax.fill_between(
    path_deviation.index,
    under_line,
    over_line,
    alpha=.125
  )
  ax.set_title(class_name)

In [None]:
classes = df.target.unique()

fig, axs = plt.subplots(
  nrows=len(classes) // 3 + 1,
  ncols=3,
  sharey=True,
  figsize=(14, 8)
)

for i, cls in enumerate(classes):
  ax = axs.flat[i]
  data = df[df.target == cls] \
    .drop(labels='target', axis=1) \
    .mean(axis=0) \
    .to_numpy()
  plot_time_series_class(data, class_names[i], ax)

fig.delaxes(axs.flat[-1])
fig.tight_layout();

In [None]:
CLASS_NORMAL = b'1'

In [None]:
normal_df = df[df.target == b'1'].drop(labels='target', axis=1)
normal_df.shape

In [None]:
anomaly_df = df[df.target != b'1'].drop(labels='target', axis=1)
anomaly_df.shape

In [None]:
train_df, val_df = train_test_split(
  normal_df,
  test_size=0.15,
  random_state=RANDOM_SEED
)

val_df, test_df = train_test_split(
  val_df,
  test_size=0.33,
  random_state=RANDOM_SEED
)

In [None]:
def create_dataset(df):

  sequences = df.astype(np.float32).to_numpy().tolist()

  dataset = [torch.tensor(s).unsqueeze(1).float() for s in sequences]

  n_seq, seq_len, n_features = torch.stack(dataset).shape

  return dataset, seq_len, n_features

In [None]:
train_dataset, seq_len, n_features = create_dataset(train_df)
val_dataset, _, _ = create_dataset(val_df)
test_normal_dataset, _, _ = create_dataset(test_df)
test_anomaly_dataset, _, _ = create_dataset(anomaly_df)

In [None]:
seq_len

In [None]:
class Encoder(nn.Module):
  def __init__(self,seq_len,n_features,embedding_dim = 64):
    super(Encoder,self).__init__()

    self.seq_len,self.n_features = seq_len,n_features
    self.embedding_dim,self.hidden_dim = embedding_dim,2*embedding_dim

    self.rnn1 = nn.LSTM(
        input_size = self.n_features,
        hidden_size = self.hidden_dim,
        num_layers = 1,
        batch_first = True
    )

    self.rnn2 = nn.LSTM(
        input_size = self.hidden_dim,
        hidden_size = self.embedding_dim,
        num_layers = 1,
        batch_first = True
    )

  def forward(self,x):
    x = x.reshape(1,self.seq_len,self.n_features)
    x,(_,_) = self.rnn1(x)
    x,(hidden_n,_) = self.rnn2(x)

    return hidden_n.reshape((self.n_features,self.embedding_dim))

In [None]:
class Decoder(nn.Module):
  def __init__(self,seq_len,input_dim = 64,n_features = 1):
    super(Decoder,self).__init__()

    self.seq_len,self.input_dim = seq_len,input_dim
    self.hidden_dim,self.n_features = 2*input_dim,n_features

    self.rnn1 = nn.LSTM(
        input_size = input_dim,
        hidden_size = input_dim,
        num_layers = 1,
        batch_first = True
    )

    self.rnn2 = nn.LSTM(
        input_size = input_dim,
        hidden_size = self.hidden_dim,
        num_layers = 1,
        batch_first = True
    )

    self.output_layer = nn.Linear(self.hidden_dim,n_features)

  def forward(self,x):
    x = x.repeat(self.seq_len,self.n_features)
    x = x.reshape(self.n_features,self.seq_len,self.input_dim)

    x,(hidden_dim,cell_n) = self.rnn1(x)
    x,(hidden_dim,cell_n) = self.rnn2(x)

    x = x.reshape((self.seq_len,self.hidden_dim))

    return self.output_layer(x)

In [None]:
class RecurrentAutoencoder(nn.Module):
  def __init__(self,seq_len,n_features,embedding_dim=64):
    super(RecurrentAutoencoder, self).__init__()

    self.encoder = Encoder(seq_len,n_features,embedding_dim).to(device)
    self.decoder = Decoder(seq_len,embedding_dim,n_features).to(device)

  def forward(self,x):
    x = self.encoder(x)
    x = self.decoder(x)

    return x

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RecurrentAutoencoder(seq_len, n_features, 128)
model = model.to(device)

In [None]:
def train_model(model,train_dataset,val_data_set,n_epochs):
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  criterion = nn.L1Loss(reduction = 'sum').to(device)
  history = dict(train = [],val = [])

  best_model_wts = copy.deepcopy(model.state_dict())
  best_loss = 10000.0

  for epoch in range(1,n_epochs+1):
    print('training')
    model = model.train()

    train_losses = []
    for seq_true in train_dataset:
      optimizer.zero_grad()

      seq_true = seq_true.to(device)
      seq_pred = model(seq_true)

      loss = criterion(seq_true,seq_pred)

      loss.backward()
      optimizer.step()

      train_losses.append(loss.item())

    val_losses = []
    model = model.eval()
    with torch.no_grad():
      for seq_true in val_data_set:
        seq_true = seq_true.to(device)
        seq_pred = model(seq_true)

        loss = criterion(seq_true,seq_pred)
        val_losses.append(loss.item())

    train_loss = np.mean(train_losses)
    val_loss = np.mean(val_losses)

    history['train'].append(train_loss)
    history['val'].append(val_loss)

    if val_loss < best_loss:
      best_loss = val_loss
      best_model_wts = copy.deepcopy(model.state_dict())

    print(f'Epoch {epoch}: train loss {train_loss} val loss {val_loss}')

  model.load_state_dict(best_model_wts)

  return model.eval(),history





In [None]:
model,history = train_model(
    model,
    train_dataset,
    val_dataset,
    n_epochs = 30
)

In [None]:
history