In [1]:
import torch
import numpy as np
from gensim.models import KeyedVectors
from collections import Counter
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
from torch.nn import LayerNorm,attention,Embedding
import math

In [3]:
def load_pairs_from_file(file_path):
    pairs = []
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for i in range(0, len(lines), 3):  # every 3 lines: input, response, blank
            if i + 1 < len(lines):
                input_line = lines[i].strip().replace("Input: ", "")
                response_line = lines[i + 1].strip().replace("Response: ", "")
                pairs.append((input_line, response_line))
    return pairs

# Load datasets
train_pairs = load_pairs_from_file("dataset/conver_train.txt")
val_pairs   = load_pairs_from_file("dataset/conver_val.txt")
test_pairs  = load_pairs_from_file("dataset/conver_test.txt")

print(f"Train: {len(train_pairs)} | Val: {len(val_pairs)} | Test: {len(test_pairs)}")


Train: 76052 | Val: 7069 | Test: 6740


## Build Vocab

In [4]:
SPECIAL_TOKENS = ['<pad>', '<sos>', '<eos>', '<unk>']

def tokenize(text):
    return text.strip().split()


def build_vocab (pairs,min_frequency = 2):
    counter  = Counter()
    for input, resp in pairs:
        counter.update(tokenize(input))
        counter.update(tokenize(resp))
    vocab_words = [word for word, freq in counter.items() if freq >= min_frequency]
    vocab = SPECIAL_TOKENS + sorted(vocab_words)
    
    word2idx = {word: idx for idx, word in enumerate(vocab)}
    idx2word = {idx: word for word, idx in word2idx.items()}
    return word2idx, idx2word

In [5]:
word2idx, idx2word = build_vocab(train_pairs, min_frequency=1)

print(f"Vocab size: {len(word2idx)}")

Vocab size: 25191


## Load Fasttext 

In [7]:
fasttext_model = KeyedVectors.load_word2vec_format('fasttext file/cc.en.300.vec', binary=False)
# https://fasttext.cc/docs/en/crawl-vectors.html choose English choose text .vec file

In [17]:
embedding_dim = 300
embedding_matrix = np.zeros((len(word2idx),embedding_dim))

for words,idx in word2idx.items():
    if words in fasttext_model:
        embedding_matrix[idx] = fasttext_model[words]
    else:
        embedding_matrix[idx] = np.random.normal(scale= 0.6, size = (embedding_dim) )

fasttext_embeddings = torch.tensor(embedding_matrix, dtype=torch.float32)

## Encoding

In [9]:
def encoded_sentences(sentence,word2idx):
    token =  sentence.strip().split()
    return  [ word2idx.get(word,word2idx['<unk>']) for word in token]

def encoded_pair(pair,word2idx):
    encoded = []
    for input,resp in pair:
        inp_ids = encoded_sentences(input,word2idx)
        res_ids = [word2idx['<sos>']] + encoded_sentences (resp,word2idx) + [word2idx['<eos>']]
        encoded.append((inp_ids,res_ids))
    return encoded
        

In [10]:
encoded_train = encoded_pair(train_pairs, word2idx)
encoded_val   = encoded_pair(val_pairs, word2idx)
encoded_test  = encoded_pair(test_pairs, word2idx)

print("Input:", encoded_train[0][0])
print("Response:", encoded_train[0][1])


Input: [5909, 17, 3905, 17, 14859, 7487, 13949, 13461, 7469, 13114, 8715, 7736, 11654, 1067]
Response: [1, 7403, 15813, 23000, 15482, 22911, 9432, 15482, 19716, 17607, 13959, 13461, 18023, 13252, 20, 2]


##Custom Dataset

In [11]:
class ChatDataset (Dataset):
    def __init__(self,pair):
        self.pair = pair
    
    def __len__(self):
        return len(self.pair)
    
    def __getitem__(self, index):
        return self.pair[index]
    

def collate_fn(batch):
    input_seqs, target_seqs = zip(*batch)

    input_seqs = [torch.tensor(seq, dtype=torch.long) for seq in input_seqs]

    tgt_inputs = []
    tgt_outputs = []
    for tgt in target_seqs:
        tgt_input = tgt[:-1]  # all except <eos>
        tgt_output = tgt[1:]  # all except <sos>
        tgt_inputs.append(torch.tensor(tgt_input, dtype=torch.long))
        tgt_outputs.append(torch.tensor(tgt_output, dtype=torch.long))

    input_padded = pad_sequence(input_seqs, batch_first=True, padding_value=word2idx['<pad>'])
    tgt_input_padded = pad_sequence(tgt_inputs, batch_first=True, padding_value=word2idx['<pad>'])
    tgt_output_padded = pad_sequence(tgt_outputs, batch_first=True, padding_value=word2idx['<pad>'])

    return input_padded, tgt_input_padded, tgt_output_padded




In [12]:
batch_size = 32

train_dataset = ChatDataset(encoded_train)
val_dataset   = ChatDataset(encoded_val)
test_dataset  = ChatDataset(encoded_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

## Model Building

In [13]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)


In [15]:
class TransformerChatbot(nn.Module):
    def __init__(self, fasttext_embeddings ,vocab_size,pad_idx ,emb_dim = 300, nhead = 6, num_layers = 3, dim_ff = 512, dropout =0.1):
        super().__init__()
        self.embedding_layer = nn.Embedding.from_pretrained(fasttext_embeddings, freeze=False)
        self.pos_encoder = PositionalEncoding(emb_dim)

        self.transformer = nn.Transformer(
            d_model=emb_dim,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=dim_ff,
            dropout=dropout,
            batch_first=True
        )
        self.fc_out = nn.Linear(emb_dim, vocab_size)
        self.pad_idx = pad_idx

    def forward(self, src, tgt):
        # Padding masks
        src_key_padding_mask = (src == self.pad_idx)
        tgt_key_padding_mask = (tgt == self.pad_idx)

        # Causal mask for decoder
        tgt_seq_len = tgt.size(1)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_seq_len).to(src.device)

        # Embedding + Positional Encoding
        src_emb = self.pos_encoder(self.embedding_layer(src))
        tgt_emb = self.pos_encoder(self.embedding_layer(tgt))

        # Transformer
        output = self.transformer(
            src=src_emb,
            tgt=tgt_emb,
            src_key_padding_mask=src_key_padding_mask,
            tgt_key_padding_mask=tgt_key_padding_mask,
            tgt_mask=tgt_mask
        )

        return self.fc_out(output)  # shape [batch, tgt_len, vocab_size]


## Training Loop

In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


model = TransformerChatbot(
    vocab_size=len(word2idx),
    emb_dim=300,
    nhead=6,
    num_layers=3,
    dim_ff=512,
    dropout=0.1,
    pad_idx=word2idx['<pad>'],
    fasttext_embeddings=fasttext_embeddings
).to(device)


In [19]:
pad_idx = word2idx['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)
optimizer = torch.optim.Adam(model.parameters(),lr = 3e-4)

In [21]:
def train (model, loader, optimizer, criterion):
    model.train()
    total_loss = 0

    for src, tgt_inp, tgt_out in loader:
        src, tgt_inp, tgt_out = src.to(device), tgt_inp.to(device), tgt_out.to(device)

        optimizer.zero_grad()
        output = model(src, tgt_inp)  # [B, T, V]
        loss = criterion(output.view(-1, output.size(-1)), tgt_out.view(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(loader)

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0
    for src, tgt_inp, tgt_out in loader:
        src, tgt_inp, tgt_out = src.to(device), tgt_inp.to(device), tgt_out.to(device)
        output = model(src, tgt_inp)
        loss = criterion(output.view(-1, output.size(-1)), tgt_out.view(-1))
        total_loss += loss.item()
    return total_loss / len(loader)

@torch.no_grad()
def test(model, loader, criterion):
    model.eval()
    total_loss = 0
    for src, tgt_inp, tgt_out in loader:
        src, tgt_inp, tgt_out = src.to(device), tgt_inp.to(device), tgt_out.to(device)
        output = model(src, tgt_inp)
        loss = criterion(output.view(-1, output.size(-1)), tgt_out.view(-1))
        total_loss += loss.item()
    return total_loss / len(loader)



In [22]:
epochs = 2

for epoch in range(epochs):
    train_loss = train(model, train_loader, optimizer, criterion)
    val_loss = evaluate(model, val_loader, criterion)

    print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")




: 

In [None]:
test_loss = test(model, test_loader, criterion)
print(f"Test Loss: {test_loss:.4f}")


In [None]:
def generate_response(model, input_sentence, word2idx, idx2word, max_len=50):
    model.eval()

    # 1. Tokenize and numericalize input
    tokens = input_sentence.strip().split()  # Or use your tokenizer
    input_ids = [word2idx.get(tok, word2idx['<unk>']) for tok in tokens]
    src = torch.tensor([input_ids], dtype=torch.long).to(device)

    # 2. Start with <sos> token for decoder input
    generated_ids = [word2idx['<sos>']]
    for _ in range(max_len):
        tgt = torch.tensor([generated_ids], dtype=torch.long).to(device)

        # Forward pass
        with torch.no_grad():
            output = model(src, tgt)  # [1, T, V]
            next_token_logits = output[0, -1, :]  # last token's logits
            next_token_id = torch.argmax(next_token_logits).item()

        # Stop if <eos>
        if next_token_id == word2idx['<eos>']:
            break

        generated_ids.append(next_token_id)

    # 3. Decode token ids to words
    generated_words = [idx2word[idx] for idx in generated_ids[1:]]  # remove <sos>
    return ' '.join(generated_words)


In [None]:
while True:
    user_input = input("Input: ")
    if user_input.lower() in ['exit', 'quit']:
        break
    response = generate_response(model, user_input, word2idx, idx2word)
    print("Response:", response)
