<a href="https://colab.research.google.com/github/MehrdadDastouri/chatbot_seq2seq_attention/blob/main/chatbot_seq2seq_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.functional import log_softmax
import random
import numpy as np

# Fix random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Sample data (pairs of question and answer)
data = [
    ("hello", "hi"),
    ("how are you", "i am fine"),
    ("what is your name", "my name is chatbot"),
    ("how old are you", "i am a bot, i do not age"),
    ("what can you do", "i can chat with you"),
    ("goodbye", "see you later"),
]

# Tokenizer
def tokenize(sentence):
    return sentence.lower().split()

# Vocabulary creation
def build_vocab(sentences):
    vocab = {"<pad>": 0, "<sos>": 1, "<eos>": 2}
    for sentence in sentences:
        for word in tokenize(sentence):
            if word not in vocab:
                vocab[word] = len(vocab)
    return vocab

# Build vocabularies for input and output
questions = [pair[0] for pair in data]
answers = [pair[1] for pair in data]

input_vocab = build_vocab(questions)
output_vocab = build_vocab(answers)

idx_to_word_output = {idx: word for word, idx in output_vocab.items()}

# Encode sentences to indices
def encode_sentence(sentence, vocab, max_len):
    tokens = tokenize(sentence)
    indices = [vocab["<sos>"]] + [vocab[token] for token in tokens] + [vocab["<eos>"]]
    return indices + [vocab["<pad>"]] * (max_len - len(indices))

max_len_input = max(len(tokenize(q)) for q in questions) + 2
max_len_output = max(len(tokenize(a)) for a in answers) + 2

encoded_questions = [encode_sentence(q, input_vocab, max_len_input) for q in questions]
encoded_answers = [encode_sentence(a, output_vocab, max_len_output) for a in answers]

encoded_questions = torch.tensor(encoded_questions, dtype=torch.long)
encoded_answers = torch.tensor(encoded_answers, dtype=torch.long)

# Define the Encoder with Attention
class Encoder(nn.Module):
    def __init__(self, input_size, embed_size, hidden_size, num_layers):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        embedding = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedding)
        return outputs, hidden, cell

class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attention = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        hidden = hidden[-1].unsqueeze(1)
        energy = torch.tanh(self.attention(torch.cat((hidden.expand(-1, encoder_outputs.size(1), -1), encoder_outputs), dim=2)))
        attention_weights = torch.softmax(self.v(energy).squeeze(2), dim=1)
        return attention_weights

# Define the Decoder with Attention
class Decoder(nn.Module):
    def __init__(self, output_size, embed_size, hidden_size, num_layers):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embed_size)
        self.lstm = nn.LSTM(embed_size + hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.attention = Attention(hidden_size)

    def forward(self, x, hidden, cell, encoder_outputs):
        x = x.unsqueeze(1)
        embedding = self.embedding(x)

        attention_weights = self.attention(hidden, encoder_outputs)
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)

        lstm_input = torch.cat((embedding.squeeze(1), context), dim=1).unsqueeze(1)
        outputs, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        predictions = self.fc(outputs.squeeze(1))
        return predictions, hidden, cell, attention_weights

# Define the Seq2Seq model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.size(0)
        target_len = target.size(1)
        target_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(source.device)

        encoder_outputs, hidden, cell = self.encoder(source)
        x = target[:, 0]

        for t in range(1, target_len):
            output, hidden, cell, _ = self.decoder(x, hidden, cell, encoder_outputs)
            outputs[:, t, :] = output
            best_guess = output.argmax(1)
            x = target[:, t] if random.random() < teacher_forcing_ratio else best_guess

        return outputs

# Initialize the model
input_size = len(input_vocab)
output_size = len(output_vocab)
embed_size = 256
hidden_size = 512
num_layers = 1

encoder = Encoder(input_size, embed_size, hidden_size, num_layers)
decoder = Decoder(output_size, embed_size, hidden_size, num_layers)
model = Seq2Seq(encoder, decoder).to(torch.device("cpu"))

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=output_vocab["<pad>"])
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
epochs = 500
for epoch in range(epochs):
    model.train()
    outputs = model(encoded_questions, encoded_answers)

    outputs = outputs[:, 1:].reshape(-1, outputs.size(2))
    targets = encoded_answers[:, 1:].reshape(-1)

    loss = criterion(outputs, targets)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 50 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

# Chat function
def chat(question):
    model.eval()
    with torch.no_grad():
        encoded_question = torch.tensor(encode_sentence(question, input_vocab, max_len_input), dtype=torch.long).unsqueeze(0)
        encoder_outputs, hidden, cell = model.encoder(encoded_question)

        x = torch.tensor([output_vocab["<sos>"]], dtype=torch.long)
        result = []

        for _ in range(max_len_output):
            output, hidden, cell, _ = model.decoder(x, hidden, cell, encoder_outputs)
            best_guess = output.argmax(1).item()
            if best_guess == output_vocab["<eos>"]:
                break
            result.append(idx_to_word_output[best_guess])
            x = torch.tensor([best_guess], dtype=torch.long)

        return " ".join(result)

# Test the chatbot:Type hello
while True:
    user_input = input("You: ")
    if user_input.lower() in ["quit", "exit"]:
        print("Chatbot: Goodbye!")
        break
    response = chat(user_input)
    print(f"Chatbot: {response}")

Epoch [50/500], Loss: 0.0022
Epoch [100/500], Loss: 0.0010
Epoch [150/500], Loss: 0.0007
Epoch [200/500], Loss: 0.0005
Epoch [250/500], Loss: 0.0004
Epoch [300/500], Loss: 0.0003
Epoch [350/500], Loss: 0.0002
Epoch [400/500], Loss: 0.0002
Epoch [450/500], Loss: 0.0002
Epoch [500/500], Loss: 0.0001
You: hello
Chatbot: hi
You: goodbye
Chatbot: see you later
You: exit
Chatbot: Goodbye!
