In [1]:
import torch
from torch import nn
import os
import glob as glob
from torch.utils.data import DataLoader, Dataset, Subset
import torch.optim as optim
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import pandas as pd
from tqdm import tqdm
import random
import time
from IPython import display
import h5py
import math

In [2]:
class AE_Dataset(Dataset):
    def __init__(self, file, transform=None, target_transform=None):
        with open(file, 'r') as f:
            data = f.readlines()
        self.df = []
        self.soln = []
        self.soln_list = []
        self.one_hot_solns = {}
        
        for i in range(len(data)):
            self.df.append([float(x) for x in data[i].split(',')[:-1]])
            self.soln.append(data[i].split(',')[-1][:-1])
            
        for s in self.soln:
            if s not in self.soln_list:
                self.soln_list.append(s)
                
        for s in self.soln_list:
            z = np.zeros(len(self.soln_list))
            idx = self.soln_list.index(s)
            z[idx] = 1
            self.one_hot_solns[s] = z

        self.transform = transform
        self.target_transform = target_transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        data = np.array(self.df[idx])
        one_hot_label = self.one_hot_solns[self.soln[idx]]
        
        if self.transform:
            data = self.transform(data)
        if self.target_transform:
            label = self.target_transform(label)

        return data, one_hot_label

In [3]:
dataset = AE_Dataset('data_file.txt')

In [4]:
idx = np.random.randint(0, dataset.__len__())

dataset.__getitem__(idx)

(array([0.14217606, 0.        , 0.00093796, 0.00217202, 0.0003608 ,
        0.00047538, 0.05378675, 0.06140351, 0.14838068, 0.        ,
        0.        , 0.        , 0.        ]),
 array([1., 0., 0.]))

In [5]:
trainset = Subset(dataset, range(0,int(len(dataset)*0.7)))
testset = Subset(dataset, range(int(len(dataset)*0.7), len(dataset)))

In [6]:
train_loader = DataLoader(trainset, batch_size=128, shuffle=True)
test_loader = DataLoader(testset, batch_size=128, shuffle=False)

In [7]:
dataset.__len__()

160603

In [8]:
# use LeakyReLU to enable negative values
class Classifier_Leaky(nn.Module):
    def __init__(self, **kwargs):
        super().__init__()
        self.linear_lrelu_stack = nn.Sequential(
            nn.Linear(kwargs["input_shape"], 64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 1024),
            nn.LeakyReLU(0.2),
            nn.Linear(1024, kwargs["output_shape"]),
            #nn.Softmax(dim=1))
            nn.Sigmoid())

        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_lrelu_stack(x)
        return logits

In [9]:
model = Classifier_Leaky(input_shape=13, output_shape=3)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
#criterion = nn.MSELoss()
criterion = nn.BCELoss() # nn.CrossEntropyLoss()

In [10]:
epochs = 5
path = './Classifier_MkII.pth'
for epoch in range(epochs):
    with tqdm(train_loader, unit='batch') as tepoch:
        start = time.time()
        total = 0
        correct = 0
        for data, target in tepoch:
            score = 0
            total+=1
            tepoch.set_description(f'Epoch {epoch+1}')
            
            optimizer.zero_grad()
            
            outputs = model(data.float())
            train_loss = criterion(outputs, target.float())
            train_loss.backward()

            #nn.utils.clip_grad_norm(model.parameters(), max_norm=2.0)
            optimizer.step()
            
            predicted = []
            actual = []
            for i in range(len(outputs[0])):
                if outputs[0][i] >= 0.5:
                    predicted.append(dataset.soln_list[i])
                if target[0][i] >= 0.5:
                    actual.append(dataset.soln_list[i])
            for i in range(len(actual)):
                for j in range(4):
                    try:
                        if predicted[j] == actual[i]:
                            score+=1
                    except IndexError:
                        score+=0
            score/=len(actual)
            correct+=score
            
            tepoch.set_postfix(loss=round(train_loss.item(),4), accuracy=100*correct//total)
            
        torch.save(model.state_dict(), path)

Epoch 1: 100%|██████████| 879/879 [00:04<00:00, 193.04batch/s, accuracy=97, loss=0.0228]
Epoch 2: 100%|██████████| 879/879 [00:04<00:00, 191.48batch/s, accuracy=98, loss=0.038] 
Epoch 3: 100%|██████████| 879/879 [00:04<00:00, 189.98batch/s, accuracy=99, loss=0.0015] 
Epoch 4: 100%|██████████| 879/879 [00:04<00:00, 193.42batch/s, accuracy=99, loss=0.0184] 
Epoch 5: 100%|██████████| 879/879 [00:04<00:00, 191.03batch/s, accuracy=98, loss=0.0141]


In [11]:
# eval one at a time
test_loader = DataLoader(testset, batch_size=1, shuffle=True)
dataiter = iter(test_loader)
data, labels = dataiter.next()

model = Classifier_Leaky(input_shape=13, output_shape=3)
model.load_state_dict(torch.load(path))
model.eval()
predicted = []
actual = []

with torch.no_grad():
    
    outputs = model(data.float())
    
    for i in range(len(outputs[0])):
        if outputs[0][i] >= 0.5:
            predicted.append(dataset.soln_list[i])
        if labels[0][i] >= 0.5:
            actual.append(dataset.soln_list[i])

    print('Predicted: ', *predicted)
    print('Acutal: ',*actual)

Predicted:  Background
Acutal:  Background


In [12]:
# eval each soln
model = Classifier_Leaky(input_shape=13, output_shape=3)
model.load_state_dict(torch.load(path))
model.eval()

for soln in dataset.soln_list:
    if soln == 'Background':
        pass
    else:
        idxs = []
        for d in range(len(dataset.soln)):
            if dataset.soln[d] == soln:
                idxs.append(d)
        
        preds = {}
        tc = 0
        t = 0
        with torch.no_grad():
            print(soln)
            for j in idxs:
                data, labels = dataset.__getitem__(j)

                outputs = model(torch.from_numpy(np.array([data])).float())

                _, predicted = torch.max(outputs.data, 1)
                _, actual = torch.max(torch.from_numpy(np.array([labels])), 1)

                correct = (predicted==actual).sum().item()
                tc+=correct
                t+=len(actual)
                
            print(f'{soln} Accuracy: {tc*100//t} %')  

Baby Powder
Baby Powder Accuracy: 81 %
Riboflavin
Riboflavin Accuracy: 99 %


In [13]:
# create confusion matrix
model = Classifier_Leaky(input_shape=13, output_shape=3)
model.load_state_dict(torch.load(path))
model.eval()

soln_cm = {}
with torch.no_grad():
    with tqdm(total=dataset.__len__()) as pbar:
        for idx in range(dataset.__len__()):
            pbar.set_description(f'Progress')
            pbar.update(1)
            
            data, labels = dataset.__getitem__(idx)
            outputs = model(torch.from_numpy(np.array([data])).float())
            
            lo = list(outputs[0])
            ll = list(labels)

            pred_soln = dataset.soln_list[lo.index(max(lo))]
            act_soln = dataset.soln_list[ll.index(max(ll))]
            
            if act_soln not in soln_cm:
                soln_cm[act_soln] = {}
            if pred_soln not in soln_cm[act_soln]:
                soln_cm[act_soln][pred_soln] = 0
            soln_cm[act_soln][pred_soln] += 1
            
print(soln_cm)
            
            

Progress: 100%|██████████| 160603/160603 [03:03<00:00, 872.85it/s]

{'Background': {'Background': 147190}, 'Baby Powder': {'Baby Powder': 4098, 'Riboflavin': 958, 'Background': 1}, 'Riboflavin': {'Riboflavin': 8293, 'Baby Powder': 63}}





In [None]:
soln_cm