In [None]:
!pip install ConfigFactory

# Prepare Data

### Create train-test split

In [91]:
with open ('neg_A0201.txt', 'r') as f:
    neg_text = f.read().splitlines() 
    neg_label = [0] * len(neg_text)
    print(f"number of neg labels: {len(neg_label)}")

with open ('pos_A0201.txt', 'r') as f:
    pos_text = f.read().splitlines() 
    pos_label = [1] * len(pos_text)
    print(f"number of pos labels: {len(pos_label)}")


data_text = neg_text + pos_text
data_label = neg_label + pos_label

# print(data_text)
# print(data_label)

number of neg labels: 24492
number of pos labels: 2991


In [106]:
import string

def one_hot_encoder(text):
    alphabet = string.ascii_uppercase
    vector = torch.Tensor([[0 if char != letter else 1 for char in alphabet] for letter in text])
    return vector

In [92]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(data_text, data_label, test_size=0.10, random_state=42)

In [93]:
import torch 
import string
from torch.utils.data import DataLoader, Dataset

class Dataset(Dataset):
  def __init__(self, data_text, data_labels):
    self.data_text = data_text
    self.data_labels = data_labels

  def __len__(self):
    return len(self.data_text)
    
  def one_hot_encoder(text):
    alphabet = string.ascii_uppercase
    encoding = torch.Tensor([[0 if char != letter else 1 for char in alphabet] for letter in text])
    return encoding

    def __getitem__(self, index: int):
        return dict(gene = self.data_text[index],
                    encoded_gene = one_hot_encoder(self.data_text[index])
                    label = torch.FloatTensot(self.data_labels[index]))
  

### Create data loaders

In [94]:
BATCH_SIZE = 64

In [95]:
from torch.utils.data import sampler, WeightedRandomSampler
import torch

# Oversample minority class
class_sample_count = torch.Tensor([len(neg_text), len(pos_text)])
weights = 1. / class_sample_count.float()
samples_weights = torch.tensor([weight[t] for t in data_label])

# checking the code above is correct
# target = torch.Tensor(data_label)
# class_sample_count = torch.tensor(
#     [(target == t).sum() for t in torch.unique(target, sorted=True)])
# print(class_sample_count)


# check if replacemnent???
sampler = WeightedRandomSampler(weights=samples_weights, num_samples=len(samples_weights))


In [97]:
from torch.utils.data import DataLoader

# make Dataset
train_dataset = Dataset(X_train, y_train)
test_dataset = Dataset(X_test, y_test)

# maske DataLoader
train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, sampler=sampler)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, sampler=sampler)

# Model

### Hyperparameters

In [None]:
activiation function = ....

# Set up a multi-layered perceptron network to accept this data and output the proper prediction 
# (detect / not detect). Try different architectural changes (e.g., different number of levels, neurons at each level, etc.), 
#and non-linearities (RelU, sigmoid) and pick the one achieving the highest accuracy on the test set

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # 1 input image channel, 6 output channels, 5x5 square convolution
        # kernel
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(16 * 5 * 5, 120)  # 5*5 from image dimension
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        # If the size is a square, you can specify with a single number
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)
        x = torch.flatten(x, 1)  # flatten all dimensions except the batch dimension
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


# Train

In [None]:
!pip install -q pyhocon

In [50]:
import argparse
import os
from pyhocon import ConfigFactory

In [None]:
import argparse
import os

import ConfigFactory as ConfigFactory
import numpy as np
import torch
from torch import optim

from data_iterator import get_train_test_data
from models import Net
from stats import Stats

classes = ['positive', 'negative']


def save_test_eval_to_tensorboard(stats, total_loss, correct, total, epoch, class_total, class_correct):
    stats.summary_writer.add_scalar('test/loss', total_loss, epoch)
    stats.summary_writer.add_scalar('test/acc', 100 * correct / total, epoch)
    stats.summary_writer.add_scalar('test/correct', correct, epoch)
    w_acc = np.average([100 * class_correct[i] / class_total[i] for i in range((len(classes)))])
    stats.summary_writer.add_scalar('test/weight_acc', w_acc, epoch)
    print('Test Accuracy of the model: {} % after %{} epochs'.format(100 * correct / total, epoch))
    print('weight Accuracy of the model: {} % after %{} epochs'.format(w_acc, epoch))

    for i in range(len(classes)):
        stats.summary_writer.add_scalar('test/correct_%s' % classes[i], class_correct[i], epoch)
        print(class_total[i], i)
        stats.summary_writer.add_scalar('test/acc_%s' % classes[i], 100 * class_correct[i] / class_total[i],
                                        epoch)


def calculate_model_results(test_labels, outputs, total, correct, total_loss, class_total, class_correct):
    _, predicted = torch.max(outputs.detach(), 1)
    test_labels_tensor = torch.as_tensor(test_labels)

    total += test_labels_tensor.size(0)
    correct += (predicted == test_labels_tensor).sum().item()
    total_loss += criterion(outputs, test_labels_tensor)

    c = (predicted == test_labels_tensor).squeeze()
    for i in range(len(test_labels_tensor)):
        label = test_labels_tensor[i]
        class_correct[label] += c[i].item()
        class_total[label] += 1


def eval_test_data(net, test_loader, stats, epoch):
    with torch.no_grad():
        correct, total, total_loss = 0, 0, 0
        class_correct, class_total = [0] * len(classes), [0] * len(classes)

        for test_data in test_loader:
            test_inputs, test_labels = test_data
            outputs = net(test_inputs)
            calculate_model_results(test_labels, outputs, total, correct, total_loss, class_total,
                                    class_correct)

        # save to tensorboard object
        save_test_eval_to_tensorboard(stats, total_loss, correct, total, epoch, class_total, class_correct)


def train(model_path, config):
    net = Net()

    lr = config['learning_rate']
    epochs = config['epochs']
    batch_size = config['batch_size']

    # choose the optimizer
    if config['optimizer'] == 'adam':
        optimizer = optim.Adam(params=net.parameters(), lr=lr)
    else:
        optimizer = optim.SGD(net.parameters(), lr=lr)

    train_loader, test_loader = get_train_test_data(batch_size)
    print('train_loader len is {}'.format(len(train_loader.dataset)))
    print('test_loader len is {}'.format(len(test_loader.dataset)))

    stats_keys = ['loss']
    print_step = 1
    stats = Stats(stats_keys, log_dir=model_path, print_step=print_step, prefix='train/')
    step = 0
    for epoch in range(epochs):

        for data in train_loader:
            step += 1
            inputs, inputs_labels = data

            # zero the parameter gradients
            optimizer.zero_grad()
            # forward + backward + optimize
            outputs = net(inputs.float())

            loss = criterion(outputs, inputs_labels)
            loss.backward()
            optimizer.step()

            # print statistics of train
            stats.summary_writer.add_scalar('train/loss', loss, step)

            net.eval()
            if epoch % config['checkpoint_every'] == 0:
                net.save(os.path.join(model_path, '%d.ckpt' % epoch))

            eval_test_data(net, test_loader, stats, epoch)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('config', help='config args for train model')
    parser.add_argument('--config-file', default=os.path.join(os.getcwd(), 'config.conf'))
    args = parser.parse_args()

    model_path = os.path.join(os.getcwd(), args.config)
    config = ConfigFactory.parse_file(args.config_file)[args.config]
    if not os.path.exists(model_path):
        os.makedirs(model_path)
    train(model_path, config)


# Stats

In [None]:
import numpy as np
from time import time
from torch.utils.tensorboard import SummaryWriter


class Stats(object):
    def __init__(self, keys, log_dir=None, print_step=100, prefix=None):
        self.keys = keys
        self.summary_writer = SummaryWriter(log_dir)
        self.print_step = print_step
        self.prefix = prefix
        self.sums = {k: .0 for k in keys}
        self.start_time = time()
        self.count = 0

    def clear(self):
        for key in self.sums:
            self.sums[key] = .0
        self.start_time = time()
        self.count = 0

    def update(self, *args):
        for key, val in zip(self.keys, args):
            self.sums[key] += float(val)
        self.count += 1

    def summarize(self, step):
        stats = dict.fromkeys(self.sums)
        for key in self.sums:
            stats[key] = self.sums[key] / self.count
            tag = key if self.prefix is None else self.prefix + key
            self.summary_writer.add_scalar(tag, stats[key], step)
        time_ms = int(np.round(1e3 * (time() - self.start_time)) / self.count)
        return stats, time_ms

    def pretty_print(self, step, stats, time_ms):
        step_str = ['{:<8}'.format(str(step) + ')')]
        stats_str = ['{}: {:<9.4f}'.format(k, stats[k]) for k in self.keys]
        time_str = ['{:>10}'.format('(' + str(time_ms) + ' msec)')]
        str_out = ' '.join(step_str + stats_str + time_str)
        print(str_out)

    def __call__(self, step, *args):
        self.update(*args)
        if (step + 1) % self.print_step == 0:
            stats, time_ms = self.summarize(step)
            self.clear()
            self.pretty_print(step + 1, stats, time_ms)