In [14]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [25]:
import torch
# Device-independent code
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cpu')

In [12]:
# Common Variables
MAX_LENGTH = 30
MIN_LENGTH = 1

# Load Dataset

In [13]:
import pickle
with open('sentencespairs.pkl', 'rb') as f:
    pairs = pickle.load(f)

In [14]:
len(pairs)

1718

# Preprocess

In [15]:
len(pairs[0][0].split())

13

In [16]:
# filter
pairs = [pair for pair in pairs if (len(pair[0].split()) < MAX_LENGTH and len(pair[1].split(' ')) < MAX_LENGTH)]
pairs = [pair for pair in pairs if (len(pair[0].split()) > MIN_LENGTH and len(pair[1].split(' ')) > MIN_LENGTH)]
print(len(pairs))

1200


In [17]:
pairs

[["Why don't you just cook the breakfast, and try not to burn anything.",
  'Yes, Aunt Petunia.'],
 ['Hurry up! Bring my coffee, boy!', 'Yes, Uncle Vernon.'],
 ['What happened?!',
  "I swear I don't know!  One minute, the glass was there and then it was gone! It was like magic!"],
 ['Caveat Smeltona. Proudest moment of my life.',
  'Will I have to wear that, too?'],
 ["Oh, don't be so stupid, you're going to the state school where you belong.  And this is what you're gonna be wearing when I've finished dyeing it.",
  "But that's Dudley's old uniform. It'll fit me like bits of old Elephant skin."],
 ["Dad! Look! Harry's got a letter!", "Hey, give it back! It's mine!"],
 ['Shoo! Go on! Fine day Sunday. In my opinion, best day of the week. Why is that, Dudley?',
  "Because there's no post on Sunday?"],
 ['Give me that! Give me that letter!', 'Get off! Ahh!'],
 ["Daddy's gone mad, hasn't he?!", 'Make a wish, Harry.'],
 ["I-I-I'm not Harry.", 'I-I am.'],
 ["It's not every day that your youn

## Check for multiple sentences

In [18]:
import re

def preprocess(text):
    # remove punctuation and numbers with a regular expression
    text = re.sub(r'[.?!,:;\'()\-\n\d]','', text.lower().strip())
    return text

In [19]:
for index, pair in enumerate(pairs):
    # preprocess
    pair[0] = preprocess(pair[0])
    pair[1] = preprocess(pair[1])



## Create Vocab

- Add each unique word to vocab set

In [20]:
from nltk import word_tokenize

def create_vocab(doc):
    vocab = {}

    for pair in doc:
        sent1 = pair[0]
        sent2 = pair[1]

        for t in sent1.split():
            if t in vocab:
                vocab[t] += 1
            else:
                vocab[t] = 1
        
        for t in sent2.split():
            if t in vocab:
                vocab[t] += 1
            else:
                vocab[t] = 1
        
    return vocab

In [21]:
vocab = create_vocab(pairs)

In [22]:
PAD_TOKEN = "<PAD>"
END_TOKEN = "<EOS>"
START_TOKEN = "<SOS>"

if PAD_TOKEN not in vocab:
    vocab[PAD_TOKEN] = 1
if START_TOKEN not in vocab:
    vocab[START_TOKEN] = 1
if END_TOKEN not in vocab:
    vocab[END_TOKEN] = 1

In [23]:
word2idx = {}
idx2word = {}

for i, word in enumerate(list(vocab.keys())):
    word2idx[word] = i
    idx2word[i] = word

In [44]:
# saves word2idx
with open('word2idx.pkl', 'wb') as f:
    pickle.dump(word2idx, f)

# saves idx2word
with open('idx2word.pkl', 'wb') as f:
    pickle.dump(idx2word, f)

In [45]:
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch

# Try Alternate Pairings

In [46]:
pairs1 = [pairs[i] for i in range(len(pairs)) if i % 2 == 0]
pairs2 = [pairs[i] for i in range(len(pairs)) if i % 2 == 1]

In [43]:
# Encoding: convert words -> numbers using word2idx
converted_pairs = []

for pair in pairs:
    sent1 = pair[0]
    sent2 = pair[1]
    sent1_converted = []
    sent2_converted = []

    for word in sent1.split(' '):
        if len(word) > 0:
            sent1_converted.append(word2idx[word])
    for word in sent2.split(' '):
        if len(word) > 0:
            sent2_converted.append(word2idx[word])

    converted_pairs.append([sent1_converted, sent2_converted])

print(converted_pairs[0:5])

[[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], [13, 14, 15]], [[16, 17, 18, 19, 20, 21], [13, 22, 23]], [[24, 25], [26, 27, 26, 1, 28, 29, 30, 5, 31, 32, 33, 7, 34, 35, 32, 36, 35, 32, 37, 38]], [[39, 40, 41, 42, 43, 19, 44], [45, 26, 46, 10, 47, 48, 49]], [[50, 1, 51, 52, 53, 54, 55, 10, 5, 56, 57, 58, 2, 59, 7, 60, 61, 24, 54, 62, 51, 63, 64, 65, 66, 67, 35], [68, 69, 70, 71, 72, 73, 74, 75, 37, 76, 43, 71, 77, 78]]]


In [47]:
class TranslationDataset(Dataset):
    def __init__(self, pairs, word2idx):
        self.pairs = pairs
        self.word2idx = word2idx

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        eng, ita = self.pairs[idx]
        phrase = torch.tensor([self.word2idx[word] for word in eng.split() if len(word) > 1]
                                  + [self.word2idx[END_TOKEN]], dtype=torch.long)
        response = torch.tensor([self.word2idx[word] for word in ita.split() if len(word) > 1]
                                  + [self.word2idx[END_TOKEN]], dtype=torch.long)
        return phrase, response

# Custom collate function to handle padding
def collate_fn(batch):
    eng_batch, ita_batch = zip(*batch)
    eng_batch_padded = pad_sequence(eng_batch, batch_first=True, padding_value=word2idx[PAD_TOKEN])
    ita_batch_padded = pad_sequence(ita_batch, batch_first=True, padding_value=word2idx[PAD_TOKEN])
    return eng_batch_padded, ita_batch_padded

In [61]:
# Create the dataset and DataLoader
sequence_pair_dataset = TranslationDataset(pairs, word2idx)
batch_size = 16
sequence_pair_dataloader = DataLoader(sequence_pair_dataset, batch_size=batch_size,
                                    shuffle=True,  drop_last=True, collate_fn=collate_fn)

print("samples: ", len(sequence_pair_dataset))
print("batches: ", len(sequence_pair_dataloader))

samples:  1200
batches:  75


In [62]:
# Example: iterating over the DataLoader
for encoder_input, decoder_output in sequence_pair_dataloader:
    print("Encoder batch:", encoder_input)
    print("Decoder batch:", decoder_output)
    break # remove this to iterate over the whole dataset

Encoder batch: tensor([[   7,   24, 1659,    2,  374,   48, 2863, 2861, 2861, 2861, 2861, 2861,
         2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861,
         2861, 2861, 2861, 2861],
        [  48,   32, 1070,   35,   32, 1885,   60,   32,  298, 2251,  806,    2,
          141,  187,   35,   96,  319,  291,  827,    2,  163, 2252,  745,    3,
         2253, 2863, 2861, 2861],
        [   5,  469,  470,  471, 2863, 2861, 2861, 2861, 2861, 2861, 2861, 2861,
         2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861,
         2861, 2861, 2861, 2861],
        [ 112, 1121,   10,  676,  613,  219,  319, 2863, 2861, 2861, 2861, 2861,
         2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861,
         2861, 2861, 2861, 2861],
        [ 563,  279,  114,   54,   62,  520,  119,  564,  565, 2863, 2861, 2861,
         2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861, 2861,
         2861, 2861, 2861, 2861],
     

# Seq2Seq Model using LSTM

In [7]:
import torch
import torch.nn as nn

In [8]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers,
                            batch_first=True)

    def forward(self, x):
        # Reversing the sequence of indices
        x = torch.flip(x, [1])
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, hidden, cell

In [9]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers,
                            batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden, cell):
        out = self.embedding(x)
        out, (hidden, cell) = self.lstm(out, (hidden, cell))
        out = self.fc(out).reshape(out.size(0), -1)
        return out, hidden, cell

In [26]:
# Hyperparameters
vocab_size = len(vocab)
embed_size = 256
hidden_size = 512
num_layers = 1

# Initialize the models
encoder = Encoder(vocab_size, embed_size, hidden_size, num_layers).to(DEVICE)
decoder = Decoder(vocab_size, embed_size, hidden_size, num_layers).to(DEVICE)

# Train Model

In [67]:
len(sequence_pair_dataloader)

75

In [68]:
import torch.optim as optim
import torch.nn as nn
import random

# Loss Function (exclude padding)
loss_fn = nn.CrossEntropyLoss(ignore_index=word2idx[PAD_TOKEN])

# Optimizers
encoder_optimizer = optim.AdamW(encoder.parameters())
decoder_optimizer = optim.AdamW(decoder.parameters())

# Number of epochs
num_epochs = 40

# Training Loop
encoder.train()
decoder.train()

for epoch in range(num_epochs):
    print(f'epoch: {epoch}')
    for i, (input_tensor, target_tensor) in enumerate(sequence_pair_dataloader):
        input_tensor, target_tensor = input_tensor.to(DEVICE), target_tensor.to(DEVICE)
        # Zero gradients of both optimizers
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        target_length = target_tensor.size(1)

        # Encoder
        _, encoder_hidden, encoder_cell = encoder(input_tensor)

        # Decoder
        decoder_input = torch.full((batch_size, 1), word2idx[START_TOKEN], dtype=torch.long).to(DEVICE)
        decoder_hidden = encoder_hidden
        decoder_cell = encoder_cell

        # Randomly select a word index from the target sequence
        random_word_index = random.randint(0, target_length - 1)

        loss = 0

        for di in range(target_length):
            logits, decoder_hidden, decoder_cell  = decoder(decoder_input, decoder_hidden, decoder_cell)
            loss += loss_fn(logits, target_tensor[:,di])
            decoder_input = target_tensor[:, di].reshape(batch_size, 1)  # Teacher forcing

        # Backpropagation
        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()


epoch: 0
epoch: 1
epoch: 2
epoch: 3
epoch: 4
epoch: 5
epoch: 6
epoch: 7
epoch: 8
epoch: 9
epoch: 10
epoch: 11
epoch: 12
epoch: 13
epoch: 14
epoch: 15
epoch: 16
epoch: 17
epoch: 18
epoch: 19
epoch: 20
epoch: 21
epoch: 22
epoch: 23
epoch: 24
epoch: 25
epoch: 26
epoch: 27
epoch: 28
epoch: 29
epoch: 30
epoch: 31
epoch: 32
epoch: 33
epoch: 34
epoch: 35
epoch: 36
epoch: 37
epoch: 38
epoch: 39


In [37]:
# Test model
def respond(encoder, decoder, sentence, word2idx, max_length=15):
    encoder.eval()
    decoder.eval()
    with torch.inference_mode():
        # Tokenize and encode the sentence
        input_tensor = torch.tensor([word2idx[word] for word in sentence.split() if len(word) > 1]
                                    + [word2idx[END_TOKEN]], dtype=torch.long)
        input_tensor = input_tensor.view(1, -1).to(DEVICE)  # batch_first=True

        # Pass the input through the encoder
        _, encoder_hidden, encoder_cell = encoder(input_tensor)

        # Initialize the decoder input with the SOS token
        decoder_input = torch.tensor([[word2idx[START_TOKEN]]], dtype=torch.long)  # SOS
        # Initialize the hidden state of the decoder with the encoder's hidden state
        decoder_hidden, decoder_cell = encoder_hidden, encoder_cell

        # Decoding the sentence
        decoded_words = []
        last_word = torch.tensor([[word2idx[START_TOKEN]]]).to(DEVICE)
        for di in range(max_length):
            logits, decoder_hidden, decoder_cell = decoder(last_word, decoder_hidden, decoder_cell)
            next_token = logits.argmax(dim=1) # greedy
            last_word = torch.tensor([[next_token]]).to(DEVICE)
            if next_token.item() == word2idx[END_TOKEN]:
                break
            else:
                decoded_words.append(idx2word.get(next_token.item()))
        return ' '.join(decoded_words)

In [101]:
test_query = "hi potter"
response = respond(encoder, decoder, test_query, word2idx)
print(response)

oh hi sir


In [86]:
# save encoder and decoder
torch.save(encoder.state_dict(), 'encoder.pth')
torch.save(decoder.state_dict(), 'decoder.pth')

In [103]:
# test loading models
encoder = Encoder(vocab_size, embed_size, hidden_size, num_layers).to(DEVICE)
encoder.load_state_dict(torch.load('encoder.pth'))
decoder = Decoder(vocab_size, embed_size, hidden_size, num_layers).to(DEVICE)
decoder.load_state_dict(torch.load('decoder.pth'))

<All keys matched successfully>

# Chatbot Implementation

In [2]:
class User:
    def __init__(self, name, age) -> None:
        self.name = name
        self.age = age
        self.likes = []
        self.dislikes = []
    
    def add_like(self, like):
        self.likes.append(like)
    
    def add_dislike(self, dislike):
        self.dislikes.append(dislike)

In [3]:
user_base = {}
with open('user_base.pkl', 'wb') as f:
    pickle.dump(user_base, f)

In [29]:
import secrets
from nltk import word_tokenize
from nltk import pos_tag
import pickle
bot_name = "muggle-bot"

def get_user():
    print(f"Enter \'wizard\' to talk to Harry Potter.")
    print(f"Enter \'quit\' to exit from conversation.")
    print(f'Before that, let me get some info from you.\n')

    global name
    print(f'{bot_name}: Hello! What is your name?')
    name = input()
    print(name)
    old_user_greetings = ['Welcome back ', 'Good to see you again, ', 'Long time no see, ']
    new_user_greetings = ['Hello there, ', 'Hi,', 'Nice to meet you,']

    # Load data (deserialize)
    with open('user_base.pkl', 'rb') as handle:
        user_base = pickle.load(handle)

    if name in user_base:
        user_obj = user_base[name]
        if user_obj.likes:
            print(f'{bot_name}: {secrets.choice(old_user_greetings)} {name}! Hope to talk more about {secrets.choice(user_obj.likes)} today!')
        else:
            print(f'{bot_name}: {secrets.choice(old_user_greetings)}{name}! How can I help you today?')
    else:
        # construct new Person object and store information
        print(f'{bot_name}: {secrets.choice(new_user_greetings)} {name}! How old are you? (Enter a number)')
        age = input(f'{name}: ')
        user_obj = User(name, age)
        user_base[name] = user_obj
    return (user_base, user_obj)


In [40]:
def chatbot_loop(user_base, user_obj):
    bot_response = ""
    wizard_mode = False    

    like_phrases = ['i like', 'i love', 'i adore', 'my favorite']
    dislike_phrases = ['i hate', 'i do not like', 'i dislike', 'not particularly']

    # retrieve model
    with open('word2idx.pkl', 'rb') as handle:
        word2idx = pickle.load(handle)
    
    # test loading models
    encoder = Encoder(vocab_size, embed_size, hidden_size, num_layers).to(DEVICE)
    encoder.load_state_dict(torch.load('encoder.pth'))
    decoder = Decoder(vocab_size, embed_size, hidden_size, num_layers).to(DEVICE)
    decoder.load_state_dict(torch.load('decoder.pth'))

    while True:
        # user input
        user_input = input(f'{name}: ')
        print(f'{name}: {user_input}')
        # if it contains like or dislike message from user
        if not wizard_mode:
            preference_present = False
            for l in like_phrases:
                if l in user_input.lower():
                    # add to user likes
                    tokens = word_tokenize(user_input)
                    tags = pos_tag(tokens)
                    preference_present = True
                    for i in reversed(tags):
                        if i[1] == 'NN' or i[1] == 'NNS':
                            user_obj.add_like(i[0])
                            break
                    print(f'{bot_name}: Haha who does\'t')
                    break

            for l in dislike_phrases:
                if l in user_input:
                    # add to user likes
                    tokens = word_tokenize(user_input)
                    tags = pos_tag(tokens)
                    for i in reversed(tags):
                        if i[1] == 'NN' or i[1] == 'NNS':
                            if i[0] in user_obj.dislikes:
                                print("I am aware! We don't have to talk about it further.")
                                break
                            user_obj.add_dislike(i[0])
                            print(f'{bot_name}: I will keep that in mind!')
                            break
                    preference_present = True
                    break

        if user_input == 'quit':
            # save info before quitting
            # pickle updated dictionary
            with open('user_base.pkl', 'wb') as handle:
                pickle.dump(user_base, handle, protocol=pickle.HIGHEST_PROTOCOL)
            break
        if user_input == 'wizard':
            print('You are connected with a wizard...')
            wizard_mode = True
            continue
        
        if not preference_present:
            if wizard_mode == True:
                bot_response = respond(encoder, decoder, user_input, word2idx)
                print(f'Harry: {bot_response}')
            else:
                print(f'{bot_name}: what else?')
    print(f'{bot_name}: Have a nice day!')

if __name__ == "__main__":
    user_base, user_obj = get_user()
    chatbot_loop(user_base, user_obj)

Enter 'wizard' to talk to a wizard from the Harry Potter world.
Enter 'quit' to exit from conversation.
Before that, let me get some info from you.

muggle-bot: Hello! What is your name?
chris
muggle-bot: Good to see you again, chris! How can I help you today?
chris: 22
muggle-bot: what else?
chris: i love gryffindor
muggle-bot: Haha who does't
chris: quit
muggle-bot: Have a nice day!


In [None]:
with open('user_base.pkl', 'rb') as handle:
    user_base = pickle.load(handle)
print(user_base)
