In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import re
import numpy as np
import spacy
from torch.utils.data import random_split
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
%load_ext tensorboard

In [None]:
# Download with: python -m spacy download en_core_web_sm
spacy_eng = spacy.load("en_core_web_sm")

def preprocess_text(text):
    """
    Preprocess the input text by performing operations such as lowercasing,
    removing punctuation, and removing extra whitespace.
    """
    text = text.lower()  # Lowercase the text
    text = re.sub(f"[{re.escape(string.punctuation)}]", "", text)  # Remove punctuation
    text = re.sub(r"\s+", " ", text)  # Remove extra spaces
    text = text.strip()  # Remove leading and trailing spaces
    return text

In [1]:
class Vocab:
    def __init__(self, min_freq):
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v: k for k, v in self.itos.items()}
        self.min_freq = min_freq

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenize(text):
        return [tok.text.lower() for tok in spacy_eng.tokenizer(text)]

    def build_vocab(self, sentences):
        freqs = {}
        idx = 4
        for sentence in sentences:
            for word in self.tokenize(sentence):
                freqs[word] = freqs.get(word, 0) + 1
                if freqs[word] == self.min_freq:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer_eng(text)
        return [self.stoi[token] if token in self.stoi else self.stoi["<UNK>"] for token in tokenized_text]

In [None]:
class QADataset(Dataset):
    def __init__(self, qa_file, transform=None, freq_threshold=5):
        self.transform = transform

        with open(qa_file, 'r') as file:
            lines = file.readlines()

        self.questions = []
        self.answers = []

        for line in lines:
            q, a = line.strip().split('\t')
            self.questions.append(q)
            self.answers.append(a)

        self.vocab = Vocab(freq_threshold)
        self.vocab.build_vocabulary(self.questions + self.answers)

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, index):
        question = self.questions[index]
        answer = self.answers[index]

        numericalized_question = [self.vocab.stoi["<SOS>"]]
        numericalized_question += self.vocab.numericalize(question)
        numericalized_question.append(self.vocab.stoi["<EOS>"])

        numericalized_answer = [self.vocab.stoi["<SOS>"]]
        numericalized_answer += self.vocab.numericalize(answer)
        numericalized_answer.append(self.vocab.stoi["<EOS>"])

        return torch.tensor(numericalized_question), torch.tensor(numericalized_answer)
    
class BatchCollator:
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        questions = [item[0] for item in batch]
        answers = [item[1] for item in batch]
        questions_padded = pad_sequence(questions, batch_first=False, padding_value=self.pad_idx)
        answers_padded = pad_sequence(answers, batch_first=False, padding_value=self.pad_idx)
        return questions_padded, answers_padded

In [None]:
def create_data_loader(qa_file, batch_size=32, num_workers=2, shuffle=True, pin_memory=True, freq_threshold=5):
    dataset = QADataset(qa_file, freq_threshold=freq_threshold)
    pad_idx=dataset.vocab.stoi["<PAD>"]
    size = len(dataset)
    val_size = int(0.1 * size)
    train_size = dataset_size - val_size 
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size])

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        shuffle=shuffle,
        pin_memory=pin_memory,
        collate_fn=BatchCollator(pad_idx=pad_idx),
    )
    val_loader = DataLoader(
        dataset=val_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        shuffle=False,
        pin_memory=pin_memory,
        collate_fn=BatchCollator(pad_idx=pad_idx),
    )

    return train_loader, val_loader, dataset

In [None]:
QA_file = "Dataset.txt" 
train_loader, val_loader, dataset = create_data_loader(QA_file)

In [None]:
## Implementation of Encoder and decoder architecture using the context vectors for attention mechanism
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, bidirectional=True, dropout=dropout)
        self.fc_hidden = nn.Linear(hidden_size * 2, hidden_size)
        self.fc_cell = nn.Linear(hidden_size * 2, hidden_size)

    def forward(self, x):
        x = self.embedding(x)
        encoder_outputs, (hidden, cell) = self.lstm(x)
        hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=1))
        cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=1))
        return encoder_outputs, hidden, cell
    
"""
#Implementation using BERT for better semantic understanding
class ContextualEncoder(nn.Module):
    def __init__(self, hidden_size, num_layers, p):
        super(ContextualEncoder, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.lstm = nn.LSTM(self.bert.config.hidden_size, hidden_size, num_layers, bidirectional=True, dropout=dropout)
        #self.fc_hidden = nn.Linear(hidden_size * 2, hidden_size)
        #self.fc_cell = nn.Linear(hidden_size * 2, hidden_size)

    def forward(self, x):
        with torch.no_grad():
            outputs = self.bert(x)
        encoder_outputs = outputs.last_hidden_state
        hidden, cell = self.lstm(encoder_states)
        hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2))
        cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))
        return encoder_outputs, hidden, cell
"""

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size, num_layers, dropout):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(hidden_size*2 + embed_size, hidden_size, num_layers, dropout=dropout)
        self.attention = nn.Linear(hidden_size*3, 1)
        self.fc_out = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_outputs, hidden, cell):
        x = x.unsqueeze(0)
        embed = self.dropout(self.embedding(x))
        sequence_length = encoder_outputs.shape[0]
        h_reshaped = hidden.repeat(sequence_length, 1, 1)
        energy = torch.tanh(self.attention(torch.cat((h_reshaped, encoder_outputs), dim=2)))
        attention_weights = torch.softmax(energy, dim=0)
        context_vector = torch.einsum("snk,snl->knl", attention_weights, encoder_outputs)
        lstm_input = torch.cat((context_vector, embed), dim=2)
        outputs, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        predictions = self.fc_out(outputs).squeeze(0)
        return predictions, hidden, cell

In [None]:
## Implementation of Sequence to Sequence model using LSTM cells 
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = len(dataset.vocab)

        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(src.device)
        encoder_outputs, hidden, cell = self.encoder(src)

        x = trg[0]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(x, encoder_outputs, hidden, cell)
            outputs[t] = output
            best_guess = output.argmax(1)
            x = trg[t] if torch.rand(1).item() < teacher_forcing_ratio else best_guess

        return outputs

In [None]:
# Training parameters
num_epochs = 100
learning_rate = 3e-4
batch_size = 32

# Model hyperparameters
input_size_encoder = len(dataset.vocab)
input_size_decoder = len(dataset.vocab)
output_size = len(dataset.vocab)
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024
num_layers = 1
enc_dropout = 0.0
dec_dropout = 0.0

# Tensorboard for tracking loss
writer = SummaryWriter(f"runs/loss_plot")
step = 0

# Define models
encoder_net = Encoder(input_size_encoder, encoder_embedding_size, hidden_size, num_layers, enc_dropout).to(device)
decoder_net = Decoder(output_size, decoder_embedding_size, hidden_size, num_layers, dec_dropout).to(device)
model = Seq2Seq(encoder_net, decoder_net).to(device)

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
pad_idx = dataset.vocab.stoi["<PAD>"]
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

# Load model
load_model = False
save_model = False

if load_model:
    checkpoint = torch.load('seq2seq_model.pth')
    encoder.load_state_dict(checkpoint['encoder_state_dict'])
    decoder.load_state_dict(checkpoint['decoder_state_dict'])

    encoder.eval()
    decoder.eval()

# Training loop
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    print(f"[Epoch {epoch + 1} / {num_epochs}]")

    model.train()
    train_loss = 0
    for batch_idx, (questions, answers) in enumerate(train_loader):
        questions = questions.to(device)
        answers = answers.to(device)

        # Forward pass
        output = model(questions, answers)

        # Reshape output and target for loss computation
        output = output[1:].reshape(-1, output.shape[2])
        answers = answers[1:].reshape(-1)

        optimizer.zero_grad()
        loss = criterion(output, answers)
        train_loss += loss.item()

        # Backward pass
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
        optimizer.step()

        # Track loss
        writer.add_scalar("Training loss", loss.item(), global_step=step)
        step += 1

    train_losses.append(train_loss / len(train_loader))
    print(f"Training loss: {train_loss / len(train_loader):.4f}")

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for questions, answers in val_loader:
            questions = questions.to(device)
            answers = answers.to(device)

            output = model(questions, answers)

            output = output[1:].reshape(-1, output.shape[2])
            answers = answers[1:].reshape(-1)

            loss = criterion(output, answers)
            val_loss += loss.item()

    val_losses.append(val_loss / len(val_loader))
    print(f"Validation loss: {val_loss / len(val_loader):.4f}")

    # Save the models state_dicts
    if save_model:
        checkpoint = {
                      'encoder_state_dict': encoder.state_dict(),
                      'decoder_state_dict': decoder.state_dict(),
                    }
        torch.save(checkpoint, 'seq2seq_model.pth')

# Plot training and validation loss
plt.figure(figsize=(10, 5))
plt.plot(range(1, num_epochs + 1), train_losses, label="Training Loss")
plt.plot(range(1, num_epochs + 1), val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.title("Training and Validation Loss")
plt.show()

In [None]:
%tensorboard --logdir runs

In [None]:
# Funtion to generate output greedily
def decode_response(encoder, decoder, test_input, vocab):
    model.eval()
    
    with torch.no_grad():
        encoder_states, hidden, cell = encoder(test_input)
        target_seq = torch.tensor([vocab.stoi["<SOS>"]]).unsqueeze(0).to(device)
        decoded_sentence = ''
        stop_condition = False
        
        while not stop_condition:
            output, hidden, cell = decoder(target_seq, encoder_states, hidden, cell)
            topi = output.argmax(1).item()
            if topi == vocab.stoi["<EOS>"] or len(decoded_sentence.split()) > 50:
                stop_condition = True
            else:
                decoded_sentence += vocab.itos[topi] + ' '
            target_seq = torch.tensor([topi]).unsqueeze(0).to(device)
            
    return decoded_sentence.strip()

In [None]:
# Function to generate output using beam search
def decode_response_beam_search(encoder, decoder, input_seq, max_length, vocab, beam_width=3):
    encoder_states, hidden, cell = encoder(input_seq)
    
    # Initialize the beams with the start token, hidden states, and a score of 0
    beams = [(torch.tensor([[vocab.stoi["<SOS>"]]]).to(device), hidden, cell, 0)]
    completed_sentences = []

    for _ in range(max_length):
        new_beams = []
        for seq, hidden, cell, score in beams:
            if seq[-1] == question_vocab.stoi["<EOS>"]:
                completed_sentences.append((seq, score))
                continue

            decoder_input = seq[-1].unsqueeze(0)
            output, hidden, cell = decoder(decoder_input, encoder_states, hidden, cell)
            topk_probs, topk_indices = torch.topk(output, beam_width)

            for i in range(beam_width):
                new_seq = torch.cat([seq, topk_indices[0, i].unsqueeze(0)], dim=0)
                new_score = score + torch.log(topk_probs[0, i]).item()
                new_beams.append((new_seq, hidden, cell, new_score))
        
        beams = sorted(new_beams, key=lambda x: x[3], reverse=True)[:beam_width]
    
    if not completed_sentences:
        completed_sentences = beams
    
    best_seq = max(completed_sentences, key=lambda x: x[1])[0]   
    return [vocab.itos[idx] for idx in best_seq.squeeze().tolist() if idx not in [vocab.stoi["<sos>"], vocab.stoi["<eos>"]]]

In [None]:
class ChatBot:
    negative_responses = ("no", "nope", "nah", "naw", "not a chance", "sorry")
    exit_commands = ("quit", "pause", "exit", "goodbye", "bye", "later", "stop")

    def __init__(self, encoder, decoder, vocab):
        self.encoder = encoder
        self.decoder = decoder
        self.vocab = vocab

    def start_chat(self):
        user_response = input("Hi, I'm a chatbot trained on question-answer pairs. Would you like to chat with me?\n")
        if user_response.lower() in self.negative_responses:
            print("Ok, have a great day!")
            return
        self.chat(user_response)

    def chat(self, reply):
        while not self.make_exit(reply):
            reply = input(self.generate_response(reply) + "\n")

    def preprocess_input(self, user_input):
        tokens = [token.text.lower() for token in spacy_eng.tokenizer(user_input)]
        numericalized_text = [self.vocab.stoi["<SOS>"]]
        numericalized_text += [self.vocab.stoi.get(token, self.vocab.stoi["<UNK>"]) for token in tokens]
        numericalized_text.append(self.vocab.stoi["<EOS>"])
        return torch.tensor(numericalized_text).unsqueeze(1).to(device)

    def generate_response(self, user_input, beam_search = False):
        input_tensor = self.preprocess_input(user_input)
        if beam_search:
            chatbot_response = decode_response_beam_search(self.encoder, self.decoder, input_tensor, 50, self.vocab)

        chatbot_response = decode_response(self.encoder, self.decoder, input_tensor, self.vocab)
        chatbot_response = chatbot_response.replace("<SOS>", '').replace("<EOS>", '')
        return chatbot_response

    def make_exit(self, reply):
        for exit_command in self.exit_commands:
            if exit_command in reply.lower():
                print("Ok, have a great day!")
                return True
        return False

chatbot = ChatBot(encoder, decoder, dataset.vocab)
chatbot.start_chat()