# Define Model

In [7]:
from TorchCRF import CRF
import torch
import torch.nn as nn
class BiLSTMCRF(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, num_tags):
        super(BiLSTMCRF, self).__init__()
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
                            num_layers=1, bidirectional=True, batch_first=True)
        self.hidden2tag = nn.Linear(hidden_dim, num_tags)
        self.crf = CRF(num_tags)

    def forward(self, x, mask=None):
        lstm_out, _ = self.lstm(x)
        emissions = self.hidden2tag(lstm_out)
        return emissions

## Task 1

# Import Data

In [3]:
import pickle as pkl
with open("training_set_1.pkl","rb") as file:
  train_set = pkl.load(file)
with open("val_set_1.pkl","rb") as file:
  val_set = pkl.load(file)
with open("test_set_1.pkl","rb") as file:
  test_set = pkl.load(file)

training_data = []

for element_iter in train_set.keys():
  training_data.append((train_set[element_iter]["text"].split(),train_set[element_iter]["labels"]))

val_data = []

for element_iter in val_set.keys():
  val_data.append((val_set[element_iter]["text"].split(),val_set[element_iter]["labels"]))


test_data = []

for element_iter in test_set.keys():
  test_data.append((test_set[element_iter]["text"].split(),test_set[element_iter]["labels"]))

# Word Embeddings

## Word2Vec

In [5]:
from gensim.models import Word2Vec

# Assuming you have loaded your training data
training_data_sentences = [training_sample[0] for training_sample in training_data]
# Define parameters for Word2Vec model
vector_size = 100  # Dimensionality of the word vectors
window_size = 5  # Maximum distance between the current and predicted word within a sentence
min_count = 1  # Ignores all words with total frequency lower than this
workers = 4  # Number of CPU cores to use for training the model

# Train Word2Vec model
word2vec_model = Word2Vec(
    sentences=training_data_sentences,
    vector_size=vector_size,
    window=window_size,
    min_count=min_count,
    workers=workers
)

# Save the trained Word2Vec model for future use
word2vec_model.save("word2vec_model.bin")


In [6]:
import torch
import torch.optim as optim
import numpy as np
from gensim.models import Word2Vec

# Define your pre-trained word2vec model
word2vec_model = Word2Vec.load("word2vec_model.bin")

word2vec_dict = {}

for word in word2vec_model.wv.index_to_key:
  word2vec_dict[word] = word2vec_model.wv.get_vector(word)


## Glove

In [None]:
from gensim.models import Word2Vec


training_data_sentences = [training_sample[0] for training_sample in training_data]
# Train GloVe embeddings using Word2Vec with skip-gram
model = Word2Vec(training_data_sentences, vector_size=100, window=10, sg=1, epochs=30, min_count=5, workers=4)

# Save the model
model.save('glove_model')

# Now you can use the trained GloVe embeddings with Gensim
glove_dict = {}

for word in model.wv.index_to_key:
  glove_dict[word] = model.wv.get_vector(word)

## Fasttext

In [None]:
import fasttext
from huggingface_hub import hf_hub_download

model_path = hf_hub_download(repo_id="facebook/fasttext-en-vectors", filename="model.bin")
model = fasttext.load_model(model_path)
model.words


len(model.words)

## Task 1

# Training the Model

In [None]:
import torch.optim as optim
import numpy as np

# Assuming you have defined the BiLSTMCRF model and loaded the training data and word embeddings as mentioned earlier
# # Define START_TAG and STOP_TAG
# START_TAG = "START"
# STOP_TAG = "STOP"

# Define embedding dimension and hidden dimension
EMBEDDING_DIM = 100  # Assuming your word2vec model has 100-dimensional embeddings
HIDDEN_DIM = 40

# Create word_to_ix dictionary
word_to_ix = {}
for sentence, tags in training_data:
    for word in sentence:
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)

# Add unknown token to word_to_ix
word_to_ix["Unknown"] = len(word_to_ix)

# Create tag_to_ix dictionary
tag_to_ix_inverted = {
    "0": "B_ORG",
    "1": "I_ORG",
    "2": "B_OTHER_PERSON",
    "3": "I_OTHER_PERSON",
    "4": "B_WITNESS",
    "5": "I_WITNESS",
    "6": "B_GPE",
    "7": "I_GPE",
    "8": "B_STATUTE",
    "9": "I_STATUTE",
    "10": "B_DATE",
    "11": "I_DATE",
    "12": "B_PROVISION",
    "13": "I_PROVISION",
    "14": "B_COURT",
    "15": "I_COURT",
    "16": "B_PRECEDENT",
    "17": "I_PRECEDENT",
    "18": "B_CASE_NUMBER",
    "19": "I_CASE_NUMBER",
    "20": "B_PETITIONER",
    "21": "I_PETITIONER",
    "22": "B_JUDGE",
    "23": "I_JUDGE",
    "24": "B_RESPONDENT",
    "25": "I_RESPONDENT",
    "26": "O"
}

tag_to_ix = {label:(int(tag) +1)//2 - 1 for tag,label in tag_to_ix_inverted.items()}


# Convert sentences to word embeddings
training_data_word_embeddings = []
training_tags = []
sequence_lengths = []

device = "cpu"

for sentence, labels in training_data:
    embeddings = []
    sequence_lengths.append(len(sentence))
    training_tags.append(labels)
    for word in sentence:
        if word in word2vec_dict.keys():
            embeddings.append(word2vec_dict[word])
        else:
            # If word not found in word2vec model, use a random embedding
            embeddings.append(np.random.uniform(-0.25, 0.25, EMBEDDING_DIM))
    training_data_word_embeddings.append(embeddings)

max_seq_len = max(sequence_lengths)


mask = torch.zeros(len(sequence_lengths), max_seq_len, dtype=torch.bool).to(device)

for i, seq_len in enumerate(sequence_lengths):
    mask[i, :seq_len] = 1

# Convert data and labels to PyTorch tensors
training_data_word_embeddings = [torch.tensor(embeddings) for embeddings in training_data_word_embeddings]
train_labels = []
for sentence, tags in zip(training_data_word_embeddings, training_tags):
    # Move inputs to GPU
    sentence = sentence.to(device)
    targets = torch.tensor([tag_to_ix[t] for t in tags], dtype=torch.long).to(device)
    train_labels.append(targets)

import torch
from torch.nn.utils.rnn import pad_sequence
training_data_word_embeddings = pad_sequence(training_data_word_embeddings, batch_first=True, padding_value=0)
train_labels = pad_sequence(train_labels, batch_first=True, padding_value=0)

# Convert data and labels to PyTorch tensors
train_data_tensor = torch.tensor(training_data_word_embeddings, dtype=torch.float32).to(device)
train_labels_tensor = torch.tensor(train_labels, dtype=torch.long).to(device)



import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
# Create DataLoader for training data
train_dataset = TensorDataset(train_data_tensor, train_labels_tensor, mask, torch.tensor(sequence_lengths).to(device))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
# Create DataLoader for training data
train_dataset = TensorDataset(train_data_tensor, train_labels_tensor, mask, torch.tensor(sequence_lengths).to(device))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# Initialize model, criterion, and optimizer
model = BiLSTMCRF(EMBEDDING_DIM, HIDDEN_DIM, 14)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Transfer model to CUDA
model.to(device)

# Transfer all tensors to CUDA
train_data_tensor = train_data_tensor.to(device)
train_labels_tensor = train_labels_tensor.to(device)


# Initialize transition matrix
transition_matrix = torch.zeros(14, 14)  # Add 2 for <START> and <END> tags

# Training loop
for epoch in range(10):
    model.train()
    total_loss = 0
    total_batches = 0  # Track the total number of batches
    for batch_data, batch_labels, batch_mask, sequence_lengths in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{10}", leave=False):

        # Transfer batch data and labels to CUDA
        batch_data = batch_data.to(device)
        batch_labels = batch_labels.to(device)

        optimizer.zero_grad()
        # Forward pass
        model_output = model(batch_data, mask=batch_mask)
        emissions = model_output
        # Apply mask to emissions
        masked_emissions = emissions.masked_fill(~batch_mask.unsqueeze(-1), 0)
        # Compute CRF loss
        loss = - model.crf(masked_emissions, batch_labels, mask=batch_mask)
        valid_loss = (loss * batch_mask.unsqueeze(-1).float()).mean()
        valid_loss.backward()
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        for labels, seq_len, seq_mask in zip(batch_labels, sequence_lengths, batch_mask):
          for i in range(seq_len - 1):  # Skip last label
              # Check if both the current and next token are not masked
              if seq_mask[i] == 1 and seq_mask[i + 1] == 1:
                  from_label = labels[i]
                  to_label = labels[i + 1]
                  transition_matrix[from_label][to_label] += 1

        total_loss += valid_loss.item()
        total_batches += 1

    print(f"Epoch {epoch + 1}/{100}, Loss: {total_loss / total_batches}")

In [None]:
import torch

def viterbi_decode(emissions, transition_matrix, mask):
    batch_size, sequence_length, num_labels = emissions.shape
    device = emissions.device

    path_scores = torch.zeros((batch_size, num_labels), device=device)
    backpointers = torch.zeros((batch_size, sequence_length, num_labels), dtype=torch.long, device=device)

    # Initialize with the start probabilities
    path_scores[:, :] = emissions[:, 0, :]  # Shape: (batch_size, num_labels)

    # Iterate over the sequence
    for t in range(1, sequence_length):
        broadcast_scores = path_scores.unsqueeze(2)  # Shape: (batch_size, num_labels, 1)
        trans_scores = broadcast_scores + transition_matrix.unsqueeze(0)  # Shape: (batch_size, num_labels, num_labels)
        max_scores, max_indices = trans_scores.max(dim=1)  # Shape: (batch_size, num_labels)
        path_scores = max_scores + emissions[:, t, :]  # Shape: (batch_size, num_labels)

        # Store backpointers
        backpointers[:, t, :] = max_indices

        # Apply mask
        path_scores = path_scores * mask[:, t].unsqueeze(1)

    # Decode the best path
    best_paths = torch.zeros((batch_size, sequence_length), dtype=torch.long, device=device)
    best_last_tags = path_scores.argmax(dim=1)

    for t in range(sequence_length - 1, 0, -1):
        best_paths[:, t] = best_last_tags
        best_last_tags = backpointers[torch.arange(batch_size), t, best_last_tags]

    best_paths[:, 0] = best_last_tags

    return best_paths

## Load the model

## Evaluating

In [None]:
EMBEDDING_DIM = 100
HIDDEN_DIM = 40

Task1_BiLSTMCRF_word2vec = BiLSTMCRF(EMBEDDING_DIM, HIDDEN_DIM, 14)
Task1_BiLSTMCRF_word2vec.load_state_dict(torch.load("bilstmcrf_task1_word2vec.pth"))


Task1_BiLSTMCRF_glove = BiLSTMCRF(EMBEDDING_DIM, HIDDEN_DIM, 14)
Task1_BiLSTMCRF_glove.load_state_dict(torch.load("bilstmcrf_task1_glove.pth"))