## **Mount Drive**

In [None]:
from google.colab import drive
drive.mount('/gdrive')

## **Change directory**

In [None]:
cd /gdrive/My Drive/IIITH/CharCNN

## **Data Loader**

In [None]:
import torch
from torch.utils.data import Dataset,DataLoader
import csv
import json

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class AGNEWS_Dataset(Dataset):
    """
    Defines the AG News dataset
    """

    def __init__(self,csv_path,alphabet_path,max_seq_len):
        """
        Initializes the dataset
        @params csv_path (str): Path to csv file that contains train/test data
        @params alphabet_path (str): Path to json file that contains the
        characters considered
        @params max_seq_len (int): Maximum number of characters considered]
        for input
        """
        self.max_seq_len = max_seq_len
        with open(alphabet_path) as f:
            self.alphabet = json.load(f)
        with open(csv_path) as f:
            self.data = csv.reader(f,delimiter=',')
            self.data = list(self.data)

    def __getitem__(self,idx):
        """
        Returns a text, class pair
        @params idx (int): Index into the dataset
        @returns (self.seq,self.cls) tuple(torch.Tensor,int): Returns a tensor
        of shape (num_characters,max_seq_len) representing the input text and an
        integer representing the class index
        """
        self.cls = int(self.data[idx][0])
        self.seq = torch.zeros(len(self.alphabet),self.max_seq_len)
        seq_len = 0
        sequence = "".join(self.data[idx][1:])
        sequence = sequence[::-1]
        for char in sequence:
            if seq_len > self.max_seq_len:
                break
            try:
                self.seq[self.alphabet.index(char)][seq_len] = 1
            except:
                pass
            seq_len += 1
        return self.seq,self.cls

    def __len__(self):
        """
        Returns the dataset length
        """
        return len(list(self.data))

def one_hot(data,alphabet):
    """
    Converts a character to its one-hot vector representation
    @params data (char): The character that is input to the CNN
    @params alphabet (list): The list of characters considered.
    NOTE: Characters outside the alphabet are considered to be a zero vector
    @returns t (torch.Tensor): Tensor of shape (len(alphabet))
    """
    t = torch.zeros(len(alphabet))
    try:
        t[alphabet.index(data)] = 1
    except:
        return t
    return t


## **Model Architecture**

In [8]:
import torch.nn as nn

class charCNN(nn.Module):
    """
    Defines the character level CNN architecture
    """
    def __init__(self,num_features,conv_channel_size,fc_size,num_class):
        """
        Initializes the model
        @params num_features (int): The number of features (the number of
         characters) considered
        @params conv_channel_size (int): Number of 1D Convolutional kernels used
        @params fc_size (int): Number of units in the fully-connected layers
        @num_class (int): Number of classes in the dataset
        @returns object of this class when implicitly called
        """
        super(charCNN, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=num_features,out_channels=conv_channel_size,kernel_size=7,stride=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3,stride=3)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=conv_channel_size,out_channels=conv_channel_size,kernel_size=7,stride=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3,stride=3)
        )
        self.conv3 = nn.Sequential(
            nn.Conv1d(in_channels=conv_channel_size,out_channels=conv_channel_size,kernel_size=3,stride=1),
            nn.ReLU(),
        )
        self.conv4 = nn.Sequential(
            nn.Conv1d(in_channels=conv_channel_size,out_channels=conv_channel_size,kernel_size=3,stride=1),
            nn.ReLU(),
        )
        self.conv5 = nn.Sequential(
            nn.Conv1d(in_channels=conv_channel_size,out_channels=conv_channel_size,kernel_size=3,stride=1),
            nn.ReLU(),
        )
        self.conv6 = nn.Sequential(
            nn.Conv1d(in_channels=conv_channel_size,out_channels=conv_channel_size,kernel_size=3,stride=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3,stride=3)
        )
        self.fc1 = nn.Sequential(
            nn.Linear(in_features=8704,out_features=fc_size),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.fc2 = nn.Sequential(
            nn.Linear(in_features=fc_size,out_features=fc_size),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.fc3 = nn.Linear(in_features=fc_size,out_features=num_class)

        self.log_softmax = nn.LogSoftmax(dim=1)
        
        self._create_weights() # weight initialization

    def forward(self,inputs):
        """
        Forward pass through CNN
        @params inputs (torch.Tensor): Tensor of shape 
        (batch_size,num_characters,max_seq_len) representing a batch of
        sentences
        @returns x (torch.Tensor): Tensor of shape (batch_size,num_cls) that
        contains a batch of vectors having unnormalized log probabilities for
        each class computed according to the input text
        """
        x = self.conv1(inputs)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = x.view(x.size(0),-1)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.log_softmax(x)
        return x

    def _create_weights(self, mean=0.0, std=0.05):
        """
        Initialization of weights using a Gaussian distribution
        @params mean (int): Mean of the distribution
        @params std (int): Standard deviation of the distribution
        """
        for module in self.modules():
            if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
                module.weight.data.normal_(mean, std)

## **Weights & Biases for visualization**

In [None]:
!pip install --upgrade wandb
!wandb login 26fd22ecbe5d0d2e53f656dbee7cedad16503a06

## **Train - Eval loop**

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import os
import wandb

def get_lr(optimizer):
    """
    Gets the current learning rate from the optimizer during training
    @params optimizer (torch.optim.Optimizer): Optimizer object
    @returns param_group['lr'] (int): Current learning rate
    """
    for param_group in optimizer.param_groups:
        return param_group['lr']

wandb.init(project="charcnn")

train_path = './ag_news_csv/train.csv'
test_path = './ag_news_csv/test.csv'
alpha_path = 'alphabet.json' 
max_seq_len = 1014
batch_size = 128
num_classes = 4
fc_size = 1024
conv_channel_size = 256
num_characters = 70
milestones = [3,6,9,12,15,18,21,24,27,30]
total_epochs = 200
resume = 0
model_path = './models/'
model_name = 'model_22_.ckpt'
start_epoch = 1
save_every = 4
print_every = 1
loss_history = []
max_norm = 400

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ag_dataset_train = AGNEWS_Dataset(train_path,alpha_path,max_seq_len)
dataloader_train = DataLoader(ag_dataset_train,batch_size=128,shuffle=True,num_workers=4)
ag_dataset_test = AGNEWS_Dataset(test_path,alpha_path,max_seq_len)
dataloader_test = DataLoader(ag_dataset_test,batch_size=128,shuffle=True,num_workers=4)


model = charCNN(num_characters,conv_channel_size,fc_size,num_classes)
wandb.watch(model)

model = model.to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)


if resume:
    checkpoint = torch.load(os.path.join(model_path,model_name))
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    #scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    start_epoch = checkpoint['epoch']
    loss_history = checkpoint['loss']
    print('Loaded model from checkpoint ...')

scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones, gamma=0.5, last_epoch=-1)

acc_history = [0]

for epoch in range(start_epoch,total_epochs+1):
    batch_loss_history = []
    for i,(char_seq,cls) in enumerate(dataloader_train):

        cls = torch.LongTensor(cls)
        
        char_seq = char_seq.to(device)
        cls = cls.to(device)
        optimizer.zero_grad()

        outputs = model(char_seq)
        loss = F.nll_loss(outputs,cls-1)
        batch_loss_history.append(loss.item())
        loss_history.append(loss.item())
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
        optimizer.step()
    
    scheduler.step()

    acc = 0
    batches = 0
    for i,(char_seq,cls) in enumerate(dataloader_test):
        batches += 1
        cls = torch.LongTensor(cls)
        
        char_seq = char_seq.to(device)
        cls = cls.to(device)
        
        outputs = model(char_seq)
        predicted_cls = outputs.max(1)[1]
        acc += metrics.accuracy_score((cls-1).cpu().numpy(),predicted_cls.cpu().numpy())
    avg_accuracy = acc/batches
    

    if epoch == start_epoch or avg_accuracy > max(acc_history):
        torch.save(
            {
                'model_state_dict':model.state_dict(),
                'optimizer_state_dict':optimizer.state_dict(),
                'scheduler_state_dict':scheduler.state_dict(),
                'epoch':epoch+1,
                'loss_history':loss_history
            },
            os.path.join(model_path,'model_'+str(epoch)+'_.ckpt')
            )
        print('Saved model ...')

    acc_history.append(avg_accuracy)

    if epoch % print_every == 0:
        mean_loss_per_epoch = sum(batch_loss_history)/len(batch_loss_history)
        print('[{}/{}] Loss: {}'.format(epoch,total_epochs,mean_loss_per_epoch))
        wandb.log({"Train Loss": mean_loss_per_epoch,"Learning Rate": get_lr(optimizer)})
        print('Accruacy: ',avg_accuracy)
        print('Test error: ',1-avg_accuracy)
        wandb.log({"Test Accuracy": avg_accuracy,"Test Error": 1-avg_accuracy})



print('Training complete')

## **Accruacy:  0.8794270833333333**
## **Test error:  0.1205729166666667**

## **Find best model** (not used)

In [None]:
import torch
import torch.nn as nn
import os
from sklearn import metrics


alpha_path = 'alphabet.json' 
max_seq_len = 1014
batch_size = 128
num_classes = 4
fc_size = 1024
conv_channel_size = 256
num_characters = 70
model_path = './models/'
model_name = ['model_24_.ckpt','model_16_.ckpt','model_12_.ckpt','model_8_.ckpt','model_4_.ckpt']

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ag_dataset_test = AGNEWS_Dataset(test_path,alpha_path,max_seq_len)
dataloader_test = DataLoader(ag_dataset_test,batch_size=128,shuffle=True,num_workers=4)

model = charCNN(num_characters,conv_channel_size,fc_size,num_classes)


model = model.to(device)

for mname in model_name:

    checkpoint = torch.load(os.path.join(model_path,mname))
    model.load_state_dict(checkpoint['model_state_dict'])
    print('Loaded model from checkpoint ...')

    acc = 0
    batches = 0
    for i,(char_seq,cls) in enumerate(dataloader_test):
        batches += 1
        cls = torch.LongTensor(cls)
        
        char_seq = char_seq.to(device)
        cls = cls.to(device)
        
        outputs = model(char_seq)
        predicted_cls = outputs.max(1)[1]
        acc += metrics.accuracy_score((cls-1).cpu().numpy(),predicted_cls.cpu().numpy())
    avg_accuracy = acc/batches
    print('Model: ',mname)
    print('Accruacy: ',avg_accuracy)
    print('Test error: ',1-avg_accuracy)
    print('-----------------------------')