In [1]:
!pip install pyttsx3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
from torch.nn import Transformer, TransformerEncoder, TransformerEncoderLayer
import json
import pyttsx3

In [3]:
# Training data from JSON file
def load_training_data(file_path):
    with open(file_path) as file:
        data = json.load(file)
    train_data = []
    for item in data['data']:
        for paragraph in item['paragraphs']:
            context = paragraph['context']
            for qa in paragraph['qas']:
                question = qa['question']
                answers = [answer['text'] for answer in qa['answers']]
                train_data.append((context, question, answers))
    return train_data

train_data_file = 'dev-v1.1.json'
train_data = load_training_data(train_data_file)

# Data preprocessing
def preprocess_data(data, max_seq_length):
    src_data = []
    tgt_data = []
    vocab_token_to_id = {'<PAD>':0, '<UNK>':1}
    for context, question, answers in data:
        src_tokens = context.split()
        tgt_tokens = answers[0].split()  # Considering only the first answer

        # update the vocabulary with tokens
        for token in src_tokens + tgt_tokens:
            if token not in vocab_token_to_id:
                vocab_token_to_id[token] = len(vocab_token_to_id)

        # Truncate or pad the sequences to a fixed length
        src_tokens = src_tokens[:max_seq_length]
        tgt_tokens = tgt_tokens[:max_seq_length]

        # convert tokens to token IDs
        src_ids = ([vocab_token_to_id.get(token, vocab_token_to_id["<UNK>"]) for token in src_tokens])
        tgt_ids = ([vocab_token_to_id.get(token, vocab_token_to_id["<UNK>"]) for token in tgt_tokens])
        
        # Pad sequences if they are shorter than the max length
        src_ids = src_ids + [vocab_token_to_id["<PAD>"]] * (max_seq_length - len(src_ids))
        tgt_ids = tgt_ids + [vocab_token_to_id['<PAD>']] * (max_seq_length - len(tgt_ids)) 

        src_data.append(src_ids)
        tgt_data.append(tgt_ids)
    return src_data, tgt_data, vocab_token_to_id

# Example usage with JSON data
max_seq_length = 124
src_data, tgt_data, vocab_token_to_id = preprocess_data(train_data, max_seq_length)
src_data = torch.tensor(src_data)
tgt_data = torch.tensor(tgt_data)

In [4]:
src_data[:1]

tensor([[ 2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 12, 15, 16, 17, 18,
         19, 12, 20, 21, 22,  7, 16, 23, 24, 13, 25, 26, 27, 12, 15, 16, 23, 28,
         13, 29, 30, 31, 10, 32, 33, 34,  2,  3, 35, 22,  9,  5, 36, 37, 38, 39,
         40, 41, 42, 43, 44, 12, 45, 46, 47, 48, 41, 49, 50, 51, 52, 53,  5, 12,
         54,  2, 55, 12, 56, 57, 12, 58, 59, 60, 61, 62, 63, 64, 65, 64, 66, 67,
         12, 68, 14, 69, 70,  2,  3,  9, 60, 71, 72, 73, 74, 12,  9, 75, 76, 77,
         78, 64, 79,  3, 80, 81, 82, 12, 83, 84, 85, 86, 12, 87, 72, 88]])

In [5]:
# chatbot model
class Chatbot(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim) -> None:
        super(Chatbot, self).__init__()
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding(input_dim, hidden_dim)
        self.transformer_encoder_layer = TransformerEncoderLayer(input_dim, nhead=2)
        self.transformer_encoder = TransformerEncoder(self.transformer_encoder_layer, num_layers=2)
        self.decoder = nn.Linear(hidden_dim, output_dim)

    def forward(self, src):
        src = self.embedding(src)  # encoder layer
        # we change the shape of the tensor of tokens so the transformer can work
        src = src.permute(1,0,2)   # (sequence_length, batch_size, embedding_size) ------> (batch_size, sequence_length, embedding_size)
        output = self.transformer_encoder(src)
        output = self.decoder(output.permute(1,0,2))
        return output

In [6]:
len(vocab_token_to_id)

42586

In [None]:
# create the chatbot model
input_dim = len(vocab_token_to_id)
hidden_dim = 1 # 128
output_dim = len(vocab_token_to_id)
chatbot = Chatbot(input_dim, hidden_dim, output_dim)

# training 
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(chatbot.parameters(), lr=0.001)
num_epochs = 10
scaler = GradScaler()

for epoch in range(num_epochs):
    for data in load_training_data:
        inputs, labels = data
        # Zero the gradients
        optimizer.zero_grad()
        # Cast the inputs to half-precision
        inputs = inputs.half()
        with autocast():
            # Forward pass
            outputs = chatbot(inputs)
            loss = criterion(outputs, labels)
        # Backward pass and gradient scaling
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
    print(f'Epoch: {epoch+1}, loss: {loss.item()}')

In [1]:
# save the model
torch.save(chatbot.state_dict(), 'chatbot_model.pt')

NameError: name 'torch' is not defined

In [None]:
# Load the trained model
chatbot_model = Chatbot(input_dim, hidden_dim, output_dim)
chatbot_model.load_state_dict(torch.load('chatbot_model.pt'))
chatbot_model.eval()

In [None]:
# Generating responses
def generate_response(input_text):
    input_tokens = input_text.split()
    input_ids = [vocab_token_to_id.get(token, vocab_token_to_id["<UNK>"]) for token in input_tokens]
    input_tensor = torch.tensor(input_ids).unsqueeze(0)

    with torch.no_grad():
        output = chatbot_model(input_tensor)
        output_ids = torch.argmax(output, dim=2).squeeze(0).tolist()

    response_tokens = [list(vocab_token_to_id.keys())[id] for id in output_ids]
    response_text = " ".join(response_tokens)

    return response_text

# Advanced response generation strategy (sample from the distribution instead of choosing the max)
def generate_response_advanced(input_text, temperature=1.0):
    input_tokens = input_text.split()
    input_ids = [vocab_token_to_id.get(token, vocab_token_to_id["<UNK>"]) for token in input_tokens]
    input_tensor = torch.tensor(input_ids).unsqueeze(0)

    with torch.no_grad():
        output = chatbot_model(input_tensor)
        logits = output.squeeze(0) / temperature
        probabilities = nn.functional.softmax(logits, dim=-1)
        sampled_ids = torch.multinomial(probabilities, num_samples=1).squeeze(1).tolist()

    response_tokens = [list(vocab_token_to_id.keys())[id] for id in sampled_ids]
    response_text = " ".join(response_tokens)

    return response_text

# Example usage
input_text = "Hello, how are you?"
response = generate_response(input_text)
print("Chatbot Response:", response)

# Example usage with advanced response generation strategy
input_text = "Hello, how are you?"
response = generate_response_advanced(input_text, temperature=0.8)
print("Advanced Chatbot Response:", response)

# Text-to-speech conversion
def speak(text):
    engine = pyttsx3.init()
    engine.say(text)
    engine.runAndWait()