# LSTM

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pad_sequence
import torch

# Define dataset splits
splits = [
    f"hf://datasets/timdettmers/openassistant-guanaco/openassistant_best_replies_{s}.jsonl"
    for s in ['train', 'eval']
]

# Load training and evaluation datasets
df_train = pd.read_json(splits[0], lines=True)
df_test = pd.read_json(splits[1], lines=True)

# Function to extract prompt and response from the text
def extract_prompt_and_response(row):
    parts = row.split("### Assistant:")
    prompt = parts[0].replace("### Human:", "").strip()  # Remove '### Human:' prefix
    response = parts[1].strip() if len(parts) > 1 else ""  # Handle missing response
    return prompt, response

# Apply the function to extract prompt and response columns
df_train[["prompt", "response"]] = df_train["text"].apply(
    lambda x: pd.Series(extract_prompt_and_response(x))
)
df_test[["prompt", "response"]] = df_test["text"].apply(
    lambda x: pd.Series(extract_prompt_and_response(x))
)

# Extract questions and answers
train_questions = df_train["prompt"].tolist()
train_answers = ["<START> " + response + " <END>" for response in df_train["response"]]

test_questions = df_test["prompt"].tolist()
test_answers = ["<START> " + response + " <END>" for response in df_test["response"]]

# Combine train and test for tokenizer fitting
data_for_tokenizer = train_questions + train_answers + test_questions + test_answers

# Tokenizer setup
from collections import Counter

class Tokenizer:
    def __init__(self):
        self.word2idx = {}
        self.idx2word = []
        self.vocab_size = 0

    def fit_on_texts(self, texts):
        counter = Counter(word for text in texts for word in text.split())
        self.idx2word = ["<PAD>", "<UNK>"] + [word for word, _ in counter.most_common()]
        self.word2idx = {word: idx for idx, word in enumerate(self.idx2word)}
        self.vocab_size = len(self.idx2word)

    def texts_to_sequences(self, texts):
        return [[self.word2idx.get(word, 1) for word in text.split()] for text in texts]

    def sequences_to_texts(self, sequences):
        return [[self.idx2word[idx] for idx in seq] for seq in sequences]

# Initialize and fit tokenizer
tokenizer = Tokenizer()
tokenizer.fit_on_texts(data_for_tokenizer)
VOCAB_SIZE = tokenizer.vocab_size

# Tokenize and pad questions and answers
def tokenize_and_pad(texts, maxlen):
    tokenized = tokenizer.texts_to_sequences(texts)
    tokenized = [torch.tensor(seq, dtype=torch.long) for seq in tokenized]
    return pad_sequence(tokenized, batch_first=True, padding_value=0)[:, :maxlen]

# Determine max sequence lengths
maxlen_questions = max(len(seq) for seq in tokenizer.texts_to_sequences(train_questions + test_questions))
maxlen_answers = max(len(seq) for seq in tokenizer.texts_to_sequences(train_answers + test_answers))

# Prepare data
encoder_input_data = tokenize_and_pad(train_questions, maxlen_questions)
decoder_input_data = tokenize_and_pad(train_answers, maxlen_answers)

# Create decoder output data by shifting tokenized answers
def create_decoder_output(data, maxlen):
    shifted = [torch.tensor(seq[1:], dtype=torch.long) for seq in data]
    return pad_sequence(shifted, batch_first=True, padding_value=0)[:, :maxlen]

decoder_output_data = create_decoder_output(tokenizer.texts_to_sequences(train_answers), maxlen_answers)

# Verify dataset preparation
print(f"Encoder input shape: {encoder_input_data.shape}")
print(f"Decoder input shape: {decoder_input_data.shape}")
print(f"Decoder output shape: {decoder_output_data.shape}")


Encoder input shape: torch.Size([9846, 1640])
Decoder input shape: torch.Size([9846, 1743])
Decoder output shape: torch.Size([9846, 1742])


In [9]:
import torch.nn as nn

class Seq2SeqModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(Seq2SeqModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.encoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.decoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, encoder_input, decoder_input):
        encoder_embedded = self.embedding(encoder_input)
        _, (hidden, cell) = self.encoder(encoder_embedded)

        decoder_embedded = self.embedding(decoder_input)
        decoder_output, _ = self.decoder(decoder_embedded, (hidden, cell))

        output = self.fc(decoder_output)
        return output

In [10]:
# Hyperparameters
embedding_dim = 200
hidden_dim = 256
learning_rate = 0.001

# Model, loss, and optimizer
model = Seq2SeqModel(VOCAB_SIZE, embedding_dim, hidden_dim)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


In [11]:
from torch.utils.data import DataLoader, TensorDataset

# Prepare dataloaders
train_dataset = TensorDataset(encoder_input_data, decoder_input_data, decoder_output_data)
train_loader = DataLoader(
    TensorDataset(encoder_input_data, decoder_input_data, decoder_output_data),
    batch_size=1,
    shuffle=True
)

test_dataset = TensorDataset(
    tokenize_and_pad(test_questions, maxlen_questions),
    tokenize_and_pad(test_answers, maxlen_answers),
    create_decoder_output(tokenizer.texts_to_sequences(test_answers), maxlen_answers),
)
test_loader = DataLoader(test_dataset, batch_size=64)

In [None]:
from tqdm import tqdm

num_epochs = 10
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")

model.to(device)

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0

    for encoder_input, decoder_input, target_output in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        optimizer.zero_grad()

        encoder_input, decoder_input, target_output = (
            encoder_input.to(device),
            decoder_input.to(device),
            target_output.to(device),
        )

        output = model(encoder_input, decoder_input)
        output = output[:, :target_output.size(1), :]  # Align sequence lengths
        output = output.reshape(-1, VOCAB_SIZE)
        target_output = target_output.reshape(-1)

        loss = criterion(output, target_output)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {epoch_loss / len(train_loader)}")


Epoch 1:   0%|          | 0/9846 [00:04<?, ?it/s]


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, mps:0 and cpu!

In [None]:
# Save model weights
torch.save(model.state_dict(), "seq2seq_model.pth")

# Save tokenizer
import pickle
with open("tokenizer.pkl", "wb") as f:
    pickle.dump(tokenizer, f)

In [None]:
from nltk.translate.bleu_score import sentence_bleu

def evaluate_bleu(model, test_loader):
    model.eval()
    total_bleu_score = 0

    with torch.no_grad():
        for encoder_input, decoder_input, target_output in test_loader:
            output = model(encoder_input, decoder_input)
            predicted = torch.argmax(output, dim=-1)

            for i in range(len(predicted)):
                reference = tokenizer.sequences_to_texts([target_output[i].tolist()])
                hypothesis = tokenizer.sequences_to_texts([predicted[i].tolist()])
                total_bleu_score += sentence_bleu([reference[0].split()], hypothesis[0].split())

    return total_bleu_score / len(test_loader.dataset)

bleu_score = evaluate_bleu(model, test_loader)
print(f"BLEU Score: {bleu_score}")

In [None]:
def generate_response(model, tokenizer, input_text, max_len=50):
    model.eval()
    input_seq = tokenize_and_pad([input_text], maxlen_questions).to(device)

    with torch.no_grad():
        hidden, cell = model.encoder(input_seq)
        decoder_input = torch.tensor([tokenizer.word2idx["<START>"]], device=device).unsqueeze(0)

        output_sentence = []
        for _ in range(max_len):
            output, (hidden, cell) = model.decoder(decoder_input, (hidden, cell))
            token = torch.argmax(output.squeeze(0), dim=1).item()
            word = tokenizer.idx2word[token]

            if word == "<END>":
                break

            output_sentence.append(word)
            decoder_input = torch.tensor([[token]], device=device)

    return " ".join(output_sentence)

# Example usage
input_text = "What is your name?"
response = generate_response(model, tokenizer, input_text)
print(f"Bot: {response}")