In [None]:
from functools import partial
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch
from matplotlib import pyplot as plt
from tqdm import tqdm
from transformers import logging, AutoTokenizer, AutoModel
import math
import torch.nn.functional as F
from torch import nn


**Load the data**

In [None]:
train_data = pd.read_csv('/content/drive/MyDrive/Copy of train_data.csv')
test_data = pd.read_csv('/content/drive/MyDrive/Copy of test_data.csv')

In [None]:
# Make MyDataset
class MyDataset(Dataset):
    def __init__(self, sentences, labels):
        # Initialize the MyDataset class with input sentences, labels, method_name, and model_name
        self.sentences = sentences
        self.labels = labels

        # Create a list to hold the tokenized dataset
        dataset = list()
        index = 0

        # Iterate through each data point (sentence) in the input sentences
        for data in sentences:
            # Tokenize the data into individual tokens
            tokens = data.split(' ')

            # Retrieve the corresponding label for the current data point
            labels_id = labels[index]

            # Increment the index for the next iteration
            index += 1

            # Append a tuple containing tokens and labels_id to the dataset list
            dataset.append((tokens, labels_id))

        # Store the tokenized dataset internally
        self._dataset = dataset

    def __getitem__(self, index):
        # Retrieve and return the data point (tokens, labels_id) at the given index
        return self._dataset[index]

    def __len__(self):
        # Return the total number of data points in the dataset
        return len(self.sentences)

In [None]:
# Make tokens for every batch
def my_collate(batch, tokenizer):
    # Extract tokens and label_ids from the input batch
    tokens, label_ids = map(list, zip(*batch))

    # Tokenize the input tokens using the provided tokenizer
    text_ids = tokenizer(tokens,
                         padding=True,
                         truncation=True,
                         max_length=320,
                         is_split_into_words=True,
                         add_special_tokens=True,
                         return_tensors='pt')

    # Return the tokenized text_ids and corresponding label_ids as PyTorch tensors
    return text_ids, torch.tensor(label_ids)

In [None]:
# Load dataset
def load_dataset(tokenizer, train_batch_size, test_batch_size):
    # Read data from 'datasets.csv' file using pandas
    data = pd.read_csv('/content/drive/MyDrive/datasets.csv', sep=None, header=0, encoding='utf-8', engine='python')

    # Take a subset (10%) of the data for faster testing, assuming 'labels' and 'sentences' columns exist
    len1 = int(len(list(data['labels'])) * 0.1)
    labels = list(data['labels'])[0:len1]
    sentences = list(data['sentences'])[0:len1]

    # Split data into training and testing sets
    train_sen, test_sen, train_lab, test_lab = train_test_split(sentences, labels, train_size=0.8)

    # Create MyDataset instances for training and testing
    train_set = MyDataset(train_sen, train_lab)
    test_set = MyDataset(test_sen, test_lab)

    # Create DataLoader instances for training and testing
    collate_fn = partial(my_collate, tokenizer=tokenizer)
    train_loader = DataLoader(train_set, batch_size=train_batch_size, shuffle=True, num_workers=0,
                              collate_fn=collate_fn, pin_memory=True)
    test_loader = DataLoader(test_set, batch_size=test_batch_size, shuffle=True, num_workers=0,
                             collate_fn=collate_fn, pin_memory=True)

    # Return the created DataLoader instances for training and testing
    return train_loader, test_loader

In [None]:
# FNN
class Transformer(nn.Module):
    def __init__(self, base_model, num_classes, input_size):
        # Initialize the Transformer class
        super().__init__()

        # Set the base model (BERT or other transformer model)
        self.base_model = base_model

        # Number of output classes
        self.num_classes = num_classes

        # Input size (dimensionality of input features)
        self.input_size = input_size

        # Fully connected layer for classification
        self.linear = nn.Linear(base_model.config.hidden_size, num_classes)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(0.5)

        # Softmax activation function for probability distribution
        self.softmax = nn.Softmax()

        # Set requires_grad to True for fine-tuning the base model
        for param in base_model.parameters():
            param.requires_grad = True

    def forward(self, inputs):
        # Forward pass through the transformer base model
        raw_outputs = self.base_model(**inputs)

        # Extract the classification features from the last hidden state
        cls_feats = raw_outputs.last_hidden_state[:, 0, :]

        # Apply dropout for regularization
        cls_feats_dropout = self.dropout(cls_feats)

        # Apply linear layer for classification
        predicts = self.softmax(self.linear(cls_feats_dropout))

        # Return the predicted probabilities
        return predicts


In [None]:
# Bidirectional LSTM Model
class BiLstm_Model(nn.Module):
    def __init__(self, base_model, num_classes, input_size):
        # Initialize the BiLstm_Model class
        super().__init__()

        # Set the base model (e.g., BERT) as the embedding layer
        self.base_model = base_model

        # Number of output classes for classification
        self.num_classes = num_classes

        # Input size (dimensionality of input features)
        self.input_size = input_size

        # Bidirectional LSTM layer
        self.BiLstm = nn.LSTM(input_size=self.input_size,
                              hidden_size=320,
                              num_layers=1,
                              batch_first=True,
                              bidirectional=True)

        # Fully connected layers for classification
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(320 * 2, 80),
            nn.Linear(80, 20),
            nn.Linear(20, self.num_classes),
            nn.Softmax(dim=1)
        )

        # Set requires_grad to True for fine-tuning the base model
        for param in base_model.parameters():
            param.requires_grad = True

    def forward(self, inputs):
        # Forward pass through the base model (e.g., BERT)
        raw_outputs = self.base_model(**inputs)

        # Extract the hidden states from the last layer of the base model
        cls_feats = raw_outputs.last_hidden_state

        # Apply bidirectional LSTM to the hidden states
        outputs, _ = self.BiLstm(cls_feats)

        # Select the last time step's output from the LSTM sequence
        outputs = outputs[:, -1, :]

        # Forward pass through the fully connected layers for classification
        outputs = self.fc(outputs)

        # Return the final outputs
        return outputs

In [None]:
# TextCNN Model
class TextCNN_Model(nn.Module):
    def __init__(self, base_model, num_classes):
        # Initialize the TextCNN_Model class
        super().__init__()

        # Set the base model (e.g., BERT) as the embedding layer
        self.base_model = base_model

        # Number of output classes for classification
        self.num_classes = num_classes

        # Set requires_grad to True for fine-tuning the base model
        for param in base_model.parameters():
            param.requires_grad = True

        # Hyperparameters for TextCNN
        self.filter_sizes = [2, 3, 4]
        self.num_filters = 2
        self.encode_layer = 12

        # Define Convolutional Layers
        self.convs = nn.ModuleList(
            [nn.Conv2d(in_channels=1, out_channels=self.num_filters,
                       kernel_size=(K, self.base_model.config.hidden_size)) for K in self.filter_sizes]
        )

        # Fully connected layers for classification
        self.block = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(self.num_filters * len(self.filter_sizes), self.num_classes),
            nn.Softmax(dim=1)
        )

    def conv_pool(self, tokens, conv):
        # Apply convolution, activation, squeeze, and max pooling
        tokens = conv(tokens)
        tokens = F.relu(tokens)
        tokens = tokens.squeeze(3)
        tokens = F.max_pool1d(tokens, tokens.size(2))
        out = tokens.squeeze(2)
        return out

    def forward(self, inputs):
        # Forward pass through the base model (e.g., BERT)
        raw_outputs = self.base_model(**inputs)

        # Extract the hidden states from the last layer of the base model
        tokens = raw_outputs.last_hidden_state.unsqueeze(1)

        # Apply convolutional layers and pooling
        out = torch.cat([self.conv_pool(tokens, conv) for conv in self.convs], 1)

        # Forward pass through the fully connected layers for classification
        predicts = self.block(out)

        # Return the final predicted probabilities
        return predicts

In [None]:
# NLP Method Class
class NLPMethod:
    def __init__(self, method_name, train_batch_size, test_batch_size, num_epoch, lr, weight_decay):
        # Initialization of NLPMethod class

        # Model and Training Parameters
        self.model_name = 'bert'
        self.method_name = method_name
        self.train_batch_size = train_batch_size
        self.test_batch_size = test_batch_size
        self.num_epoch = num_epoch
        self.lr = lr
        self.weight_decay = weight_decay
        self.device = 'cuda'

        # Create BERT tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
        self.input_size = 768  # BERT hidden size
        base_model = AutoModel.from_pretrained('bert-base-uncased')

        # Initialize model based on the selected method
        if method_name == 'fnn':
            self.Mymodel = Transformer(base_model, 2, self.input_size)
        elif method_name == 'bilstm':
            self.Mymodel = BiLstm_Model(base_model, 2, self.input_size)
        elif method_name == 'textcnn':
            self.Mymodel = TextCNN_Model(base_model, 2)

        # Move the model to the specified device
        self.Mymodel.to(self.device)

    def _train(self, dataloader, criterion, optimizer):
        # Training function

        train_loss, n_correct, n_train = 0, 0, 0

        # Set the model to train mode
        self.Mymodel.train()

        for inputs, targets in tqdm(dataloader, disable=False, ascii='>='):
            inputs = {k: v.to(self.device) for k, v in inputs.items()}
            targets = targets.to(self.device)

            # Forward pass
            predicts = self.Mymodel(inputs)

            # Compute loss
            loss = criterion(predicts, targets)

            # Backward pass and optimization step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Update training statistics
            train_loss += loss.item() * targets.size(0)
            n_correct += (torch.argmax(predicts, dim=1) == targets).sum().item()
            n_train += targets.size(0)

        return train_loss / n_train, n_correct / n_train

    def _test(self, dataloader, criterion):
        # Testing function

        test_loss, n_correct, n_test = 0, 0, 0

        # Set the model to evaluation mode
        self.Mymodel.eval()

        with torch.no_grad():
            for inputs, targets in tqdm(dataloader, disable=True, ascii=' >='):
                inputs = {k: v.to(self.device) for k, v in inputs.items()}
                targets = targets.to(self.device)

                # Forward pass
                predicts = self.Mymodel(inputs)

                # Compute loss
                loss = criterion(predicts, targets)

                # Update testing statistics
                test_loss += loss.item() * targets.size(0)
                n_correct += (torch.argmax(predicts, dim=1) == targets).sum().item()
                n_test += targets.size(0)

        return test_loss / n_test, n_correct / n_test

    def run(self):
        # Main training and testing function

        # Load train and test dataloaders
        train_dataloader, test_dataloader = load_dataset(
            tokenizer=self.tokenizer,
            train_batch_size=self.train_batch_size,
            test_batch_size=self.test_batch_size
        )

        # Get parameters for optimization (excluding frozen layers)
        _params = filter(lambda x: x.requires_grad, self.Mymodel.parameters())

        # Define loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.AdamW(_params, lr=self.lr, weight_decay=self.weight_decay)

        # Lists for storing metrics across epochs
        l_acc, l_trloss, l_teloss, l_epo = [], [], [], []

        # Initialize best loss and best accuracy
        best_loss, best_acc = 0, 0

        # Training loop
        for epoch in range(self.num_epoch):
            train_loss, train_acc = self._train(train_dataloader, criterion, optimizer)
            test_loss, test_acc = self._test(test_dataloader, criterion)

            # Append metrics to lists
            l_epo.append(epoch)
            l_acc.append(test_acc)
            l_trloss.append(train_loss)
            l_teloss.append(test_loss)

            # Update best metrics
            if test_acc > best_acc or (test_acc == best_acc and test_loss < best_loss):
                best_acc, best_loss = test_acc, test_loss

            # Print epoch statistics
            print('{}/{} - {:.2f}%'.format(epoch + 1, self.num_epoch, 100 * (epoch + 1) / self.num_epoch))
            print('[train] loss: {:.4f}, acc: {:.2f}'.format(train_loss, train_acc * 100))
            print('[test] loss: {:.4f}, acc: {:.2f}'.format(test_loss, test_acc * 100))

        # Print best metrics achieved during training
        print('Best loss: {:.4f}, Best accuracy: {:.2f}'.format(best_loss, best_acc * 100))

# **Bert with Bi-LSTM**

In [None]:
# Creating an instance of NLPMethod
method = NLPMethod(method_name='bilstm', train_batch_size=4, test_batch_size=16, num_epoch=10, lr=1e-5, weight_decay=0.01)

# Running the training and testing process
method.run()



1/10 - 10.00%
[train] loss: 0.5118, acc: 79.97
[test] loss: 0.4356, acc: 87.70




2/10 - 20.00%
[train] loss: 0.4077, acc: 90.48
[test] loss: 0.4392, acc: 86.60




3/10 - 30.00%
[train] loss: 0.3778, acc: 93.40
[test] loss: 0.4198, acc: 89.10




4/10 - 40.00%
[train] loss: 0.3639, acc: 94.92
[test] loss: 0.4413, acc: 86.80




5/10 - 50.00%
[train] loss: 0.3621, acc: 94.85
[test] loss: 0.4215, acc: 88.50




6/10 - 60.00%
[train] loss: 0.3495, acc: 96.33
[test] loss: 0.4205, acc: 89.10




7/10 - 70.00%
[train] loss: 0.3462, acc: 96.65
[test] loss: 0.4262, acc: 88.40




8/10 - 80.00%
[train] loss: 0.3500, acc: 96.20
[test] loss: 0.4495, acc: 85.80




9/10 - 90.00%
[train] loss: 0.3412, acc: 97.08
[test] loss: 0.4208, acc: 88.90




10/10 - 100.00%
[train] loss: 0.3356, acc: 97.70
[test] loss: 0.4435, acc: 86.90
best loss: 0.4198, best acc: 89.10





In [None]:
# Creating an instance of NLPMethod
TC_method = NLPMethod(method_name='textcnn', train_batch_size=4, test_batch_size=16, num_epoch=10, lr=1e-5, weight_decay=0.01)

# Running the training and testing process
TC_method.run()



1/10 - 10.00%
[train] loss: 0.6054, acc: 71.43
[test] loss: 0.5000, acc: 88.10




2/10 - 20.00%
[train] loss: 0.5253, acc: 85.95
[test] loss: 0.5022, acc: 88.30




3/10 - 30.00%
[train] loss: 0.5098, acc: 87.92
[test] loss: 0.4891, acc: 89.90




4/10 - 40.00%
[train] loss: 0.4945, acc: 90.25
[test] loss: 0.5143, acc: 84.70




5/10 - 50.00%
[train] loss: 0.4834, acc: 91.70
[test] loss: 0.4903, acc: 89.50




6/10 - 60.00%
[train] loss: 0.4782, acc: 92.22
[test] loss: 0.4994, acc: 87.10




7/10 - 70.00%
[train] loss: 0.4806, acc: 91.30
[test] loss: 0.4982, acc: 88.50




8/10 - 80.00%
[train] loss: 0.4777, acc: 91.45
[test] loss: 0.4971, acc: 87.60




9/10 - 90.00%
[train] loss: 0.4688, acc: 92.75
[test] loss: 0.4874, acc: 88.60




10/10 - 100.00%
[train] loss: 0.4571, acc: 94.33
[test] loss: 0.5224, acc: 81.90
best loss: 0.4891, best acc: 89.90





In [None]:
# Creating an instance of NLPMethod
FN_method = NLPMethod(method_name='fnn', train_batch_size=4, test_batch_size=16, num_epoch=10, lr=1e-5, weight_decay=0.01)

# Running the training and testing process
FN_method.run()



1/10 - 10.00%
[train] loss: 0.4947, acc: 80.38
[test] loss: 0.4350, acc: 86.90




2/10 - 20.00%
[train] loss: 0.3937, acc: 91.83
[test] loss: 0.4154, acc: 89.50




3/10 - 30.00%
[train] loss: 0.3770, acc: 93.45
[test] loss: 0.4388, acc: 87.10




4/10 - 40.00%
[train] loss: 0.3610, acc: 95.15
[test] loss: 0.4546, acc: 85.50




5/10 - 50.00%
[train] loss: 0.3534, acc: 96.03
[test] loss: 0.4487, acc: 86.20




6/10 - 60.00%
[train] loss: 0.3568, acc: 95.58
[test] loss: 0.4262, acc: 88.60


 27%|==>>>>>>>>| 268/1000 [01:08<02:44,  4.46it/s]