# Task 1: Part 2

In [45]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gensim.downloader as api
import numpy as np
from sklearn.metrics import f1_score
import simplejson as json
from torch.nn.utils.rnn import pad_sequence
import re


In [2]:
# Load pre-trained word embeddings
word2vec = api.load("word2vec-google-news-300")
glove = api.load("glove-wiki-gigaword-100")
fasttext = api.load("fasttext-wiki-news-subwords-300")

In [3]:
# Define the dataset class
class SequenceTaggingDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]['text'], self.data[idx]['labels']

In [4]:
# Define the vanilla RNN model
class VanillaRNNModel(nn.Module):
    def __init__(self, embedding_matrix, hidden_size, output_size):
        super(VanillaRNNModel, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(embedding_matrix)
        self.rnn = nn.RNN(embedding_matrix.shape[1], hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=2)

    def forward(self, x):
        embedded = self.embedding(x)
        output, _ = self.rnn(embedded)
        output = self.fc(output)
        output = self.softmax(output)
        return output

In [15]:
def data_formatter(data):
    formatted_data = []
    for i in data.keys():
        formatted_data.append({'text': data[i]['text'], 'labels': data[i]['labels']})
    return formatted_data

In [16]:
def load_dataset():
    with open('../data/NER_train.json', 'r') as f:
        train_data = json.load(f)
        train_data = data_formatter(train_data)
    with open('../data/NER_test.json', 'r') as f:
        test_data = json.load(f)
        test_data = data_formatter(test_data)
    with open('../data/NER_val.json', 'r') as f:
        val_data = json.load(f)
        val_data = data_formatter(val_data)
        
    return train_data, test_data, val_data
    

In [6]:
# Define the list of entities
entities = ["COURT","PETITIONER","RESPONDENT","JUDGE","DATE","ORG","GPE","STATUTE","PROVISION","PRECEDENT","CASE_NUMBER","WITNESS","OTHER_PERSON"]

# Generate BIO encoding for each entity
bio_encoding = []
for entity in entities:
    bio_encoding.extend(["B_" + entity, "I_" + entity])

bio_encoding.append("O")

def label_encoder(labels):
    encoded_labels = []
    for label in labels:
        if label in bio_encoding:
            encoded_labels.append(bio_encoding.index(label))
        else:
            encoded_labels.append(bio_encoding.index("O"))
    return encoded_labels

In [18]:
# Load and preprocess the dataset (replace this with your dataset loading code)
# Example dataset loading code
train_data, test_data, val_data = load_dataset()

In [8]:
# Define hyperparameters
num_classes = 27
hidden_size = 128
output_size = num_classes  # Replace num_classes with the number of output classes
learning_rate = 0.001
num_epochs = 10
batch_size = 32

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [64]:
def evaluate(model, validation_loader, criterion):
    model.eval()
    total_loss = 0.0
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for inputs, targets in validation_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs.view(-1, num_classes), targets.view(-1))
            total_loss += loss.item()
            predictions = outputs.argmax(dim=1).cpu().numpy()
            all_predictions.extend(predictions)
            all_targets.extend(targets.cpu().numpy())
    
    avg_loss = total_loss / len(validation_loader)
    f1_macro = f1_score(all_targets, all_predictions, average='macro')
    
    return avg_loss, f1_macro

def tokenize_inputs(text):
    # Remove special characters and convert to lowercase
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text.lower())
    # Tokenize by splitting on whitespace
    tokens = text.split(' ')
    return tokens


def collate_fn(batch):
    # This function is used to collate samples into a batch
    # Modify it as needed to handle variable-length sequences
    
    # Get maximum sequence length in the batch
    max_length = max(len(sample[0]) for sample in batch)
    
    # Initialize lists to store padded sequences and corresponding labels
    padded_inputs = []
    padded_labels = []
    
    # Pad sequences and adjust labels
    for inputs, labels in batch:
        # Pad inputs
        padded_inputs.append(inputs + " " + "0" * (max_length - len(inputs)))
        
        # Pad labels and adjust BIO encoding
        padded_labels.append(labels + ['O']) 
    
    return inputs, labels


In [65]:
# Iterate over different pre-trained word embeddings
for embedding_name, embedding_model in [("word2vec", word2vec), ("glove", glove), ("fasttext", fasttext)]:
    # Prepare embedding matrix
    embedding_matrix = []
    for word in embedding_model.index_to_key:
        embedding_matrix.append(embedding_model[word])
    embedding_matrix = np.array(embedding_matrix)

    # Create the model
    model = VanillaRNNModel(torch.FloatTensor(embedding_matrix), hidden_size, output_size).to(device)

    # Define loss function and optimizer
    criterion = nn.NLLLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Prepare data loader
    train_dataset = SequenceTaggingDataset(train_data)
    dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    
    val_dataset = SequenceTaggingDataset(val_data)
    validation_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    
    test_dataset = SequenceTaggingDataset(test_data)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for inputs, labels in dataloader:
            inputs = torch.tensor(tokenize_inputs(inputs)).to(device)  # Tokenize inputs (replace with your tokenizer)
            labels = torch.tensor(label_encoder(labels)).to(device)    # Encode labels (replace with your label encoder)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)

            # Calculate loss
            loss = criterion(outputs.view(-1, output_size), labels.view(-1))
            total_loss += loss.item()

            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            
        # Evaluation phase
        val_loss, val_f1 = evaluate(model, validation_loader, criterion)
        
        # Average Loss
        avg_loss = total_loss / len(dataloader)

        # Print loss
        print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_loss:.4f}, Val Loss: {val_loss:.4f}, Val F1 (macro): {val_f1:.4f}')

    # Save the trained model
    torch.save(model.state_dict(), f"vanilla_rnn_{embedding_name}.pth")


['the', 'high', 'court', 'of', 'calcutta', 'awarded', 'a', 'sum', 'of', 'rs', '48000', 'as', 'compensation']


ValueError: too many dimensions 'str'