# Note:
#### The models has been evaluated with two different accuracies

1- as used in paper (TP+TN/TP+TN+FN+FP)

2- Overall Accuray (TP/N)

# Implmentation of the RPCNet Paper

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as f
from torch.utils.data import TensorDataset, DataLoader

import sys
import numpy as np
from numpy import load
from tqdm import tqdm

from pycm import *

# Description of the Dataset

In [2]:
# Decription of the dataset
data = load('dataset.npz')
lst = data.files

print("x: Sequences \ny: Labels \n")

for item in lst:
    print(item)

print('Sequences: ',data['x'].shape,'|  Labels', data['y'].shape)


x: Sequences 
y: Labels 

x
y
Sequences:  (8920, 1182) |  Labels (8920,)


In [3]:
# temp = torch.zeros((16,16), dtype=torch.int)
# temp

# One Hot Encoding Conversion

In [4]:
'''
    One hot encoding function
    args: 'data'
            Type: a single np array (1182, 16)
    return: 'New Tensor'
            Takes the np array, finds the one hot encoding char
            Creates a new tensor and returns it (1182, 16, 16)
'''
def one_hot_encoded(data): #1182*16 to 1182*16*16
  new_tensor = torch.zeros((1182, 16,16))
  for i in range(data.shape[0]):
    row,col=data[i],data[i]
    new_tensor[i,row,col]=1
  return new_tensor

In [5]:
# Convert all the sequences to one hot encoded matrices of size 16*16

# Load the sequences
new_data = data['x'].astype(int)
sequences_n = torch.zeros(new_data.shape[0], new_data.shape[1], 16,16)

# Pass the sequences to one hot encoding function
for i in tqdm(range(new_data.shape[0])):
    sequences_n[i]=one_hot_encoded(new_data[i])
print(sequences_n.size())

# Load and convert the labels to torch tensor
labels = torch.from_numpy(data['y'].astype(np.int64))

100%|███████████████████████████████████████| 8920/8920 [01:43<00:00, 86.08it/s]

torch.Size([8920, 1182, 16, 16])





# Creation of Dataloaders

In [6]:
# Create the dataset
full_dataset = TensorDataset(sequences_n, labels)

# Split the dataset
train_ds, test_ds = torch.utils.data.random_split(full_dataset, (6320, 2600))

# Print and confirm the dataset
# print(train_ds, test_ds)
# print(len(train_ds.indices), len(test_ds))
# print(train_ds.indices, test_ds.indices)

# Create the train data loader
# Ignore if K-Fold training 
# train_dataloader = DataLoader(train_ds, batch_size=32, shuffle=False, sampler=None,
#            batch_sampler=None, num_workers=0, collate_fn=None,
#            pin_memory=False, drop_last=False, timeout=0,
#            worker_init_fn=None)

# Create the test data loader
test_dataloader = DataLoader(test_ds, batch_size=64, shuffle=False, sampler=None,
           batch_sampler=None, num_workers=0, collate_fn=None,
           pin_memory=False, drop_last=False, timeout=0,
           worker_init_fn=None)


# Creation of the Model

In [7]:


'''
    Type: RCPClass
    The class conatains the layers of the Dense and Transition blocks
    
    Args: Takes the dataset (batch size, sequence_size, w, h)
    return: Logits (Probabilities of the all 13 classes)
'''

# Create the RCPNet Class
class RPCNet(nn.Module):
  def __init__(self, in_channels):
    super(RPCNet, self).__init__()
    
    self.in_channels = in_channels
    self.conv1 = nn.Conv2d(in_channels=1182, out_channels=128, kernel_size=(16,17), padding=(4,6))
    
    # First Dense Net Block
    self.DenseBlock1 = DenseBlock(128)
    
    # Second Dense Net Block
    self.DenseBlock2 = DenseBlock(256)
    
    # Third Dense Net Block
    self.DenseBlock3 = DenseBlock(512)
    
    # Transition Block 1
    self.pos_dense1 = nn.Sequential(
          nn.BatchNorm2d(128),
          nn.ReLU(inplace=True),
          nn.Conv2d(in_channels=128,out_channels=256,kernel_size=(1,1),padding=(2,2)),
          nn.MaxPool2d((1,4)))
    
    # Transition Block 2
    self.pos_dense2 = nn.Sequential(
          nn.BatchNorm2d(256),
          nn.ReLU(inplace=True),
          nn.Conv2d(in_channels=256,out_channels=512,kernel_size=(1,1),padding=(2,2), stride=1),
          nn.MaxPool2d((1,4)))
    
    # Transition Block 3
    self.pos_dense3 = nn.Sequential(
          nn.BatchNorm2d(512),
          nn.ReLU(inplace=True),
          nn.Conv2d(in_channels=512,out_channels=1024,kernel_size=(1,1),padding=(2,2),stride=1),
          nn.MaxPool2d((1,4)))
    
    # Final linear layer, can be used with a Flatten Layer 
    self.f= nn.Flatten()
    self.Linear = nn.Linear(611328, 13)
      
  def forward(self, input):
    out = self.conv1(input)
    

    out= self.DenseBlock1(out)
    out= self.pos_dense1(out)

    out= self.DenseBlock2(out)
    out = self.pos_dense2(out)

    out= self.DenseBlock3(out)
    out = self.pos_dense3(out)
    
    logits = self.f(out)
    logits= self.Linear(logits)


    return logits

'''
    Type: Class DenseBlock
          Contains Batch Normalization, ReLU, Conv2D
          
    Input: Declaration: Num of input channels 
           Forward: Output of previous layer
    
    Can be used before transition blocks or other layers
'''
class DenseBlock(nn.Module):
  def __init__(self, num_features):
    super(DenseBlock, self).__init__()

    self.Block1 = Block(num_features) # applies relu and BN
    self.Block2 = Block(num_features)

  def forward(self, o_input):

    block1_res = self.Block1(o_input)
    block2_res = self.Block2((torch.cat((block1_res, o_input), dim=2))) # o_inpu (concat) block1res

    return  torch.cat((block1_res, block2_res, o_input), dim=2) # concat, block1_res, block2_res, o_input

'''
    Type: Block Class
    
    Input: Declaration: Num of input channels 
           Forward: Output of the previous layer
           
    return: Concatenated input and output of the BN, ReLU, Conv2D
'''

class Block(nn.Module):
  def __init__(self, num_features):
    super(Block, self).__init__()
    self.num_features = num_features

    self.BN = nn.BatchNorm2d(self.num_features) # try 2d
    self.conv1 = nn.Conv2d(in_channels=self.num_features, out_channels=num_features, kernel_size=(16,17), padding=(7,8))
  
  def forward(self, input):
    res_relu = f.relu(self.BN(input))
    res_relu_conv = self.conv1(res_relu)

    return res_relu_conv

# Initializaiton of Model, Optimizer, Loss function 

In [10]:
device = torch.device("cuda:3" if torch.cuda.is_available() else "cpu")

In [11]:
model = RPCNet(1182)


model.cuda()
if torch.cuda.device_count() > 1:
    #  print("Let's use", torch.cuda.device_count(), "GPUs!")
    model = torch.nn.DataParallel(model)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_function = nn.CrossEntropyLoss()

In [12]:
model

DataParallel(
  (module): RPCNet(
    (conv1): Conv2d(1182, 128, kernel_size=(16, 17), stride=(1, 1), padding=(4, 6))
    (DenseBlock1): DenseBlock(
      (Block1): Block(
        (BN): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv1): Conv2d(128, 128, kernel_size=(16, 17), stride=(1, 1), padding=(7, 8))
      )
      (Block2): Block(
        (BN): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv1): Conv2d(128, 128, kernel_size=(16, 17), stride=(1, 1), padding=(7, 8))
      )
    )
    (DenseBlock2): DenseBlock(
      (Block1): Block(
        (BN): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv1): Conv2d(256, 256, kernel_size=(16, 17), stride=(1, 1), padding=(7, 8))
      )
      (Block2): Block(
        (BN): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv1): Conv2d(256, 256, kernel_size=(16, 17), s

# Train Loop 
### (Please ignore this part if K-Fold is of interest)
### (To run it please uncomment the train data loader that has been created in the dataset preprocessing part)

In [13]:
# total_loss_e = 0.0

# plot_loss = []
# for epoch in range(35):
#     total_loss_i = 0
#     total=0.0
#     correct=0.0
    
#     for i, batch in enumerate(train_dataloader):
#         x,y=batch
#         logits = model(x.cuda())
#         loss = loss_function(logits,y.cuda())
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
#         total_loss_i += loss.item()
#         _, predicted = torch.max(logits.data, 1)
#         total += y.size(0)
#         correct += (predicted.cpu() == y).sum().item()
#         if i%50 ==0:
#             print("Iter : {}  Train Loss : {}".format(i,loss.item()))
#     total_loss_e+=total_loss_i/len(train_dataloader)
#     print("Epoch  {} :   Train Loss : {}  Accuracy : {}".format(epoch, total_loss_e, (100 * (correct / total))))
#     plot_loss.append(total_loss_e)
#     total_loss_e=0
    

In [14]:
# torch.save(model.state_dict(), "rpcnet.pt")

In [15]:
# import matplotlib.pyplot as plt
# plt.title("Train Loss")
# plt.xlabel("Epochs")
# plt.ylabel("Loss")
# plt.plot(plot_loss)
# plt.show()

# K-Fold Training

In [16]:
'''
    Type: Parameter reset function
    Return: None
    
    The funciton is used to reset the parameters of the model in K-Fold manner
'''

def reset_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        m.reset_parameters()

In [20]:
from sklearn.metrics import accuracy_score
validation_accuracy=[]
test_accuracy=[]

'''
    Type: Train function
    Return: None
    
    The funciton is used to train the model in K-Fold manner
'''
def train(fold, model, train_loader, epoch):
    model.train()
    total=0.0
    correct=0.0
    for batch_idx, batch in enumerate(train_loader):
        data, target = batch 
        data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, target)
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        
        if batch_idx % 100 == 0:
            cm = ConfusionMatrix(actual_vector = target.cpu().numpy().tolist(), predict_vector =predicted.cpu().numpy().tolist())
            print("Accuracy as in paper: ",(sum(cm.TP.values())+ sum(cm.TN.values()))/ (sum(cm.TP.values())+ sum(cm.TN.values())+sum(cm.FP.values())+sum(cm.FN.values())))
            print('Train Fold/Epoch: {}/{} [{}/{} ({:.0f}%)]\tLoss: {:.6f} , Overall Accuracy : {}'.format(
                fold,epoch, batch_idx * len(train_loader), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item(), (100 * (correct / total))))
'''
    Type: Validation function
    Return: Accuracies
    
    The funciton is used to validate the model in K-Fold manner
'''

def validate(fold, model, train_loader):
    model.eval()
    total=0.0
    correct=0.0
    for batch_idx, batch in enumerate(train_loader):
        data, target = batch 
        data, target = data.cuda(), target.cuda()
        output = model(data)
        loss = loss_function(output, target)
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        if batch_idx % 100 == 0:
            cm = ConfusionMatrix(actual_vector = target.cpu().numpy().tolist(), predict_vector =predicted.cpu().numpy().tolist())
            print("Accuracy as in paper: ",(sum(cm.TP.values())+ sum(cm.TN.values()))/ (sum(cm.TP.values())+ sum(cm.TN.values())+sum(cm.FP.values())+sum(cm.FN.values())))
            print('Validation Fold: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f} , Overall Accuracy : {}'.format(
                fold, batch_idx * len(train_loader), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item(), (100 * (correct / total))))
    print('Overall Accuracy: ',100 * (correct / total))
    cm = ConfusionMatrix(actual_vector = target.cpu().numpy().tolist(), predict_vector =predicted.cpu().numpy().tolist())
    validation_accuracy.append((sum(cm.TP.values())+ sum(cm.TN.values()))/ (sum(cm.TP.values())+ sum(cm.TN.values())+sum(cm.FP.values())+sum(cm.FN.values())) )
    
'''
    Type: Test function
    Return: None
    
    The funciton is used to test  the model, it prints the test accuracy

'''
           
def test(model, test_loader):
    total_loss_i = 0
    total=0.0
    correct=0.0
    model.eval()
    for i, batch in enumerate(test_dataloader):
        x,target=batch
        logits = model(x.cuda())
        loss = loss_function(logits,target.cuda())
        total_loss_i += loss.item()
        _, predicted = torch.max(logits.data, 1)
        total += target.size(0)
        correct += (predicted.cpu() == target).sum().item()

    # total_loss_e+=total_loss_i/len(test_dataloader)
    print("Overall Test Accuracy : {}".format( (100 * (correct / total))))
    cm = ConfusionMatrix(actual_vector = target.cpu().numpy().tolist(), predict_vector =predicted.cpu().numpy().tolist())
    print("Accuracy as in paper: ",((sum(cm.TP.values())+ sum(cm.TN.values()))/ (sum(cm.TP.values())+ sum(cm.TN.values())+sum(cm.FP.values())+sum(cm.FN.values())) ))
    

In [21]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score

kfolds = 5

kfold = KFold(n_splits=kfolds, shuffle = True)
val_id = 4
for i in range(1,kfolds+1):
    for fold,(train_idx, test_idx) in enumerate(kfold.split(train_ds),1 ):   

        print('------------fold no---------{}----------------------'.format(fold))
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
        train_dataloader = DataLoader(train_ds, batch_size=64, sampler=train_subsampler)
        

        if fold == i:
            continue
        for epoch in range(1, 9+1):
            train(fold, model, train_dataloader, epoch)
    print("validate",i)
    validate(i, model, train_dataloader)
    test(model, test_dataloader)
    model.apply(reset_weights)

------------fold no---------1----------------------
------------fold no---------2----------------------
Accuracy as in paper:  0.8653846153846154
Accuracy as in paper:  0.8822115384615384
Accuracy as in paper:  0.8701923076923077
Accuracy as in paper:  0.8846153846153846
Accuracy as in paper:  0.9038461538461539
Accuracy as in paper:  0.9038461538461539
Accuracy as in paper:  0.8942307692307693
Accuracy as in paper:  0.8990384615384616
Accuracy as in paper:  0.9038461538461539
------------fold no---------3----------------------
Accuracy as in paper:  0.9086538461538461
Accuracy as in paper:  0.9206730769230769
Accuracy as in paper:  0.9278846153846154
Accuracy as in paper:  0.9495192307692307
Accuracy as in paper:  0.9471153846153846
Accuracy as in paper:  0.9326923076923077
Accuracy as in paper:  0.9375
Accuracy as in paper:  0.9567307692307693
Accuracy as in paper:  0.9471153846153846
------------fold no---------4----------------------
Accuracy as in paper:  0.9567307692307693
Accura

# Testing (Already done with Validation)

In [None]:
# total_loss_i = 0
# total=0.0
# correct=0.0
# model.eval()
# for i, batch in enumerate(test_dataloader):
#     x,y=batch
#     logits = model(x.to(device))
#     loss = loss_function(logits,y.cuda())
#     total_loss_i += loss.item()
#     _, predicted = torch.max(logits.data, 1)
#     total += y.size(0)
#     correct += (predicted.cpu() == y).sum().item()
#     print("Iter : {}  Test Loss : {}".format(i,loss.item()))
    
# # total_loss_e+=total_loss_i/len(test_dataloader)
# print(" Accuracy : {}".format( (100 * (correct / total))))


# Test the Model

In [18]:
# x,y = next(iter(test_dataloader))
# print("Ground Truth:", y)
# new_arr = x.to(device)
# preds = model(new_arr)
# print("Predictions", torch.max(preds, dim =1 ))


('Ground Truth:', tensor([ 6,  1,  5,  8,  2,  7,  3, 12,  1, 10,  0,  8,  0, 11,  4, 11,  4,  5,
         0,  9,  5,  8, 10, 11,  5,  3,  0, 10,  3,  8,  5,  5]))
('Predictions', torch.return_types.max(
values=tensor([ 7.1839, 10.2467, 26.1294,  9.4398,  7.1561,  9.4496, 10.6262, 17.2464,
        19.4904,  3.7054, 18.4795,  6.4708, 26.0225, 11.1400,  6.3326, 14.0131,
         7.4746,  6.4471, 16.5906,  8.3950,  8.9410,  5.3150, 12.0303,  5.2834,
        22.6050,  7.6941, 22.2037, 11.0065,  9.8128, 12.8329, 21.3922, 24.8926],
       device='cuda:0', grad_fn=<MaxBackward0>),
indices=tensor([ 6,  7,  5,  8,  8,  7,  3, 12,  1,  3,  0,  8,  0, 11,  4, 11,  8,  8,
         0,  8,  5,  8, 10,  7,  5,  7,  0,  5,  2,  2,  5,  5],
       device='cuda:0')))
