In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import torchvision.transforms as transforms

from torch.nn import Module
from torch.nn import Conv1d
from torch.nn import Linear
from torch.nn import MaxPool2d
from torch.nn import ReLU
from torch.nn import LogSoftmax
from torch import flatten

import pandas as pd

from metrics.helper import metrics

In [25]:
def median_seq(seq, logits):
    # parse a sequence (which will be the output from the model)
    # for every series of consecutive ones, leave only the median as one - the rest turn to 0
    # seq = [1,1,0,0,0,0,0,1,1,1,0,0,0,0,0,1,1,1,0,1,1]
    new_seq = np.zeros(len(seq))
    consecutive_ones_indices = []
    medians = []

    # marks if the first consecutive zeros that are less than the typical size is the first (meaning is the beginning of the first domain)
    for i in range(len(seq)):
        if seq[i] == 0:
            if len(consecutive_ones_indices) > 0:
                h = [logits[elt] for elt in consecutive_ones_indices]
                point = consecutive_ones_indices[np.argmax(h)]
                medians.append(point)
            consecutive_ones_indices = []

        if seq[i] == 1:
            consecutive_ones_indices.append(i)

    if len(consecutive_ones_indices) > 0:
        h = [logits[elt] for elt in consecutive_ones_indices]
        point = consecutive_ones_indices[np.argmax(h)]
        medians.append(point)

    for elt in medians:
        new_seq[elt] = 1

    return new_seq


In [4]:
class CNN(Module):
    def __init__(self, numChannels=20):
        # call the parent constructor
        super(CNN, self).__init__()
        self.conv1 = Conv1d(in_channels=numChannels, out_channels=15, kernel_size=15, padding=7)
        self.sig1 = nn.Sigmoid()
        self.conv2 = Conv1d(in_channels=15, out_channels=10, kernel_size=15, padding=7)
        self.sig2 = nn.Sigmoid()
        self.conv3 = Conv1d(in_channels=10, out_channels=5, kernel_size=15, padding=7)
        self.sig3 = nn.Sigmoid()
        self.conv4 = Conv1d(in_channels=5, out_channels=2, kernel_size=15, padding=7)
        self.sig4 = nn.Sigmoid()
        self.conv5 = Conv1d(in_channels=2, out_channels=1, kernel_size=15, padding=7)
        self.sig5 = nn.Sigmoid()
        # self.logSoftmax = LogSoftmax(dim=1)

    def forward(self, x):
        x = self.conv1(x)
        x = self.sig1(x)

        x = self.conv2(x)
        x = self.sig2(x)

        x = self.conv3(x)
        x = self.sig3(x)

        x = self.conv4(x)
        x = self.sig4(x)

        x = self.conv5(x)
        x = self.sig5(x)
        
        # output = self.logSoftmax(x)
        return x

In [5]:
class SequenceDataset(Dataset):
    def __init__(self, transform):
        self.data = pd.read_pickle('../../data/cnn/one_hot/data.csv')
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        # x = input[index].T # channels should be first
        # y = output[index].reshape((1, -1)) # make the data match the shape of X after passing through all layers
        x = self.data['in'].iloc[index]
        # if index == 1:
        #     print(x.shape)
        # x = x.T
        y = self.data['out'].iloc[index]
        y = y.reshape((1,-1))

        if self.transform:
            x = self.transform(x)[0]

        return x, y

In [6]:
dataset = SequenceDataset(transforms.ToTensor())
length = len(pd.read_pickle('../../data/cnn/one_hot/data.csv'))
train_len = (length * 9) // 10
test_len = length - train_len
train_set, test_set = torch.utils.data.random_split(dataset, [train_len, test_len])

train_loader = DataLoader(dataset=train_set, batch_size=1, shuffle=True)
test_loader = DataLoader(dataset=test_set, batch_size=1, shuffle=True)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = CNN().double()

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [7]:
for epoch in range(1):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, (inputs, target) in enumerate(train_loader):


        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        target = target.reshape(-1)
        outputs = outputs.reshape(-1)
        # print(target)
        # print()
        # print(outputs)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

print('Finished Training')

[1,  2000] loss: 0.669
[1,  4000] loss: 0.666
[1,  6000] loss: 0.660
[1,  8000] loss: 0.658
[1, 10000] loss: 0.654
[1, 12000] loss: 0.652
Finished Training


In [8]:
# # test
def test(model, threshhold=0.5):
    with torch.no_grad():
        n_samples = len(test_loader)
        accuracy = 0
        precision = 0
        recall = 0
        f1 = 0
        for input, target in test_loader:
            outputs = model(input)
            outputs = outputs.reshape(-1).to(device)
            # outputs = torch.round(outputs)
            target = target.reshape(-1).to(device)
            outputs = (outputs > threshhold).float()
            # print(outputs)
            # print()
            # print(target)
            # break
            accuracy_, precision_, recall_, f1_ = metrics(outputs, target)
            accuracy += accuracy_
            precision += precision_
            f1 += f1_
            recall += recall_

        accuracy = accuracy / n_samples
        precision = precision / n_samples
        recall = recall / n_samples
        f1 = f1 / n_samples

        print(f'Accuracy: {accuracy}')
        print(f'Precision: {precision}')
        print(f'Recall: {recall}')
        print(f'F1: {f1}')

    return (accuracy, precision, recall, f1)


In [9]:
threshholds = [0.3, 0.4, 0.5, 0.6, 0.7]
for t in threshholds:
    print(f"Threshhold: {t}")
    test(model, t)
    print()

Threshhold: 0.3
Accuracy: 0.38765762530392006
Precision: 0.38765762530392006
Recall: 1.0
F1: 0.5434704150216114

Threshhold: 0.4
Accuracy: 0.6771696790656383
Precision: 0.8859814331422823
Recall: 0.20043380234170646
F1: 0.30214252537713693

Threshhold: 0.5
Accuracy: 0.6320278421061355
Precision: 0.9257893838538999
Recall: 0.05734024226333336
F1: 0.10469450157570968

Threshhold: 0.6
Accuracy: 0.6123423746960799
Precision: 0.0
Recall: 0.0
F1: 0.0

Threshhold: 0.7
Accuracy: 0.6123423746960798
Precision: 0.0
Recall: 0.0
F1: 0.0

