Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement random (from versions: none)
ERROR: No matching distribution found for random


In [1]:
pip install torch

Collecting torch
  Downloading torch-2.7.1-cp313-cp313-win_amd64.whl.metadata (28 kB)
Collecting filelock (from torch)
  Downloading filelock-3.18.0-py3-none-any.whl.metadata (2.9 kB)
Collecting sympy>=1.13.3 (from torch)
  Downloading sympy-1.14.0-py3-none-any.whl.metadata (12 kB)
Collecting networkx (from torch)
  Downloading networkx-3.5-py3-none-any.whl.metadata (6.3 kB)
Collecting fsspec (from torch)
  Downloading fsspec-2025.7.0-py3-none-any.whl.metadata (12 kB)
Collecting mpmath<1.4,>=1.1.0 (from sympy>=1.13.3->torch)
  Downloading mpmath-1.3.0-py3-none-any.whl.metadata (8.6 kB)
Downloading torch-2.7.1-cp313-cp313-win_amd64.whl (216.1 MB)
   ---------------------------------------- 0.0/216.1 MB ? eta -:--:--
   ---------------------------------------- 2.1/216.1 MB 14.4 MB/s eta 0:00:15
    --------------------------------------- 5.0/216.1 MB 14.2 MB/s eta 0:00:15
   - -------------------------------------- 7.6/216.1 MB 13.7 MB/s eta 0:00:16
   -- --------------------------------

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import random
import numpy as np

# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Dialog pairs: (input, response)
data_pairs = [
    ("Hi", "Hello! What is your name?"),
    ("My name is John", "Nice to meet you John. How old are you?"),
    ("I am 30 years old", "Got it. What is your gender?"),
    ("Male", "Please describe your symptoms."),
    ("I have a headache and fever", "Based on your symptoms, we recommend you to contact Dr. Smith, General Physician. Phone: 555-1234"),
    ("Thank you", "You're welcome! Take care!"),
    ("Bye", "Goodbye! Stay healthy!"),
]

# ---------- Vocabulary ----------
def tokenize(text):
    return text.lower().replace('.', '').replace(',', '').split()

class Vocab:
    def __init__(self):
        self.word2idx = {"<PAD>":0, "<SOS>":1, "<EOS>":2, "<UNK>":3}
        self.idx2word = {0:"<PAD>", 1:"<SOS>", 2:"<EOS>", 3:"<UNK>"}
        self.num_words = 4
    
    def add_sentence(self, sentence):
        for word in tokenize(sentence):
            self.add_word(word)
    
    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.num_words
            self.idx2word[self.num_words] = word
            self.num_words += 1
    
    def sentence_to_indices(self, sentence):
        return [self.word2idx.get(word, self.word2idx["<UNK>"]) for word in tokenize(sentence)]
    
    def indices_to_sentence(self, indices):
        return ' '.join([self.idx2word.get(idx, "<UNK>") for idx in indices])

vocab = Vocab()
for input_text, output_text in data_pairs:
    vocab.add_sentence(input_text)
    vocab.add_sentence(output_text)

# ---------- Dataset ----------
class ChatDataset(Dataset):
    def __init__(self, pairs, vocab):
        self.pairs = pairs
        self.vocab = vocab

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        input_seq = self.vocab.sentence_to_indices(self.pairs[idx][0]) + [self.vocab.word2idx["<EOS>"]]
        output_seq = [self.vocab.word2idx["<SOS>"]] + self.vocab.sentence_to_indices(self.pairs[idx][1]) + [self.vocab.word2idx["<EOS>"]]
        return torch.tensor(input_seq), torch.tensor(output_seq)

dataset = ChatDataset(data_pairs, vocab)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=lambda x: x)

def pad_sequences(batch):
    inputs, targets = zip(*batch)
    input_lens = [len(seq) for seq in inputs]
    target_lens = [len(seq) for seq in targets]

    inputs_pad = nn.utils.rnn.pad_sequence(inputs, batch_first=True, padding_value=vocab.word2idx["<PAD>"])
    targets_pad = nn.utils.rnn.pad_sequence(targets, batch_first=True, padding_value=vocab.word2idx["<PAD>"])
    return inputs_pad, torch.tensor(input_lens), targets_pad, torch.tensor(target_lens)

# ---------- Models ----------
class EncoderRNN(nn.Module):
    def __init__(self, input_size, emb_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.embedding = nn.Embedding(input_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)

    def forward(self, input_seqs, input_lengths):
        embedded = self.embedding(input_seqs)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, input_lengths.cpu(), batch_first=True, enforce_sorted=False)
        outputs, hidden = self.gru(packed)
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        return outputs, hidden

class DecoderRNN(nn.Module):
    def __init__(self, output_size, emb_size, hidden_size):
        super(DecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)
        self.log_softmax = nn.LogSoftmax(dim=2)

    def forward(self, input_step, hidden):
        # input_step: (batch_size, 1)
        embedded = self.embedding(input_step)  # (batch_size, 1, emb_size)
        output, hidden = self.gru(embedded, hidden)  # (batch_size, 1, hidden_size)
        output = self.log_softmax(self.out(output))  # (batch_size, 1, vocab_size)
        return output, hidden

# ---------- Training ----------
def train(input_tensor, input_lengths, target_tensor, target_lengths, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_target_len):
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    batch_size = input_tensor.size(0)
    encoder_outputs, encoder_hidden = encoder(input_tensor, input_lengths)

    decoder_input = torch.full((batch_size, 1), vocab.word2idx["<SOS>"], dtype=torch.long, device=input_tensor.device)
    decoder_hidden = encoder_hidden
    loss = 0

    for t in range(max_target_len):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)  # (batch, 1, vocab_size)
        decoder_output = decoder_output.squeeze(1)  # (batch, vocab_size)

        if t < target_tensor.size(1):
            target = target_tensor[:, t]
        else:
            target = torch.full((batch_size,), vocab.word2idx["<PAD>"], dtype=torch.long, device=input_tensor.device)

        loss += criterion(decoder_output, target)

        teacher_force = random.random() < 0.5
        topv, topi = decoder_output.topk(1)
        if teacher_force and t < target_tensor.size(1):
            decoder_input = target.unsqueeze(1)
        else:
            decoder_input = topi.detach()

    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()
    return loss.item() / max_target_len

# ---------- Hyperparameters ----------
EMB_SIZE = 64
HIDDEN_SIZE = 128
NUM_EPOCHS = 400
LEARNING_RATE = 0.01

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = EncoderRNN(vocab.num_words, EMB_SIZE, HIDDEN_SIZE).to(device)
decoder = DecoderRNN(vocab.num_words, EMB_SIZE, HIDDEN_SIZE).to(device)
encoder_optimizer = optim.Adam(encoder.parameters(), lr=LEARNING_RATE)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=LEARNING_RATE)
criterion = nn.NLLLoss(ignore_index=vocab.word2idx["<PAD>"])

# ---------- Training Loop ----------
print("Starting training...")
for epoch in range(1, NUM_EPOCHS+1):
    total_loss = 0
    for batch in dataloader:
        inputs_pad, input_lens, targets_pad, target_lens = pad_sequences(batch)
        inputs_pad = inputs_pad.to(device)
        targets_pad = targets_pad.to(device)
        input_lens = input_lens.to(device)
        target_lens = target_lens.to(device)
        max_target_len = max(target_lens)

        loss = train(inputs_pad, input_lens, targets_pad, target_lens, encoder, decoder,
                     encoder_optimizer, decoder_optimizer, criterion, max_target_len)
        total_loss += loss
    if epoch % 50 == 0:
        print(f"Epoch {epoch}, Avg Loss: {total_loss / len(dataloader):.4f}")

print("Training complete!")

# ---------- Evaluation ----------
def evaluate(sentence, encoder, decoder, max_length=50):
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        input_indices = vocab.sentence_to_indices(sentence) + [vocab.word2idx["<EOS>"]]
        input_tensor = torch.tensor(input_indices, dtype=torch.long, device=device).unsqueeze(0)
        input_length = torch.tensor([len(input_indices)], device=device)

        encoder_outputs, encoder_hidden = encoder(input_tensor, input_length)
        decoder_input = torch.tensor([[vocab.word2idx["<SOS>"]]], device=device)
        decoder_hidden = encoder

Starting training...
Epoch 50, Avg Loss: 0.0021
Epoch 100, Avg Loss: 0.0008
Epoch 150, Avg Loss: 0.0004
Epoch 200, Avg Loss: 0.0003
Epoch 250, Avg Loss: 0.0002
Epoch 300, Avg Loss: 0.0001
Epoch 350, Avg Loss: 0.0001
Epoch 400, Avg Loss: 0.0001
Training complete!
