In [None]:
import torch
import torch.nn as nn
import numpy as np
import math
import torch.nn.functional as F

In [None]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, nhead=8,
                 num_layers=6, dim_feedforward=2048, dropout=0.1, max_len=5000):
        super().__init__()
        self.d_model = d_model

        # Embedding layers
        self.src_embed = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embed = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout, max_len)

        # Encoder and Decoder stacks
        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.encoder = TransformerEncoder(encoder_layer, num_layers)

        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.decoder = TransformerDecoder(decoder_layer, num_layers)

        # Output layer
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

        self._init_weights()

    def _init_weights(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None,
               src_padding_mask=None, tgt_padding_mask=None):
        # Embedding + Positional Encoding
        src = self.pos_encoder(self.src_embed(src) * math.sqrt(self.d_model))
        tgt = self.pos_encoder(self.tgt_embed(tgt) * math.sqrt(self.d_model))

        # Encoder
        memory = self.encoder(src, src_mask, src_padding_mask)

        # Decoder
        output = self.decoder(tgt, memory, tgt_mask, None,
                             tgt_padding_mask, src_padding_mask)

        return self.fc_out(output)

class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.activation = F.relu

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        # Self attention
        src2 = self.self_attn(
            src, src, src,
            attn_mask=src_mask,
            key_padding_mask=src_key_padding_mask
        )[0]
        src = src + self.dropout1(src2)
        src = self.norm1(src)

        # Feedforward
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src

class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        self.activation = F.relu

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None):
        # Self attention
        tgt2 = self.self_attn(
            tgt, tgt, tgt,
            attn_mask=tgt_mask,
            key_padding_mask=tgt_key_padding_mask
        )[0]
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)

        # Encoder-decoder attention
        tgt2 = self.multihead_attn(
            tgt, memory, memory,
            attn_mask=memory_mask,
            key_padding_mask=memory_key_padding_mask
        )[0]
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)

        # Feedforward
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)
        return tgt

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super().__init__()
        self.layers = nn.ModuleList([encoder_layer for _ in range(num_layers)])

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        output = src
        for layer in self.layers:
            output = layer(output, src_mask, src_key_padding_mask)
        return output

class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers):
        super().__init__()
        self.layers = nn.ModuleList([decoder_layer for _ in range(num_layers)])

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None):
        output = tgt
        for layer in self.layers:
            output = layer(
                output, memory,
                tgt_mask=tgt_mask,
                memory_mask=memory_mask,
                tgt_key_padding_mask=tgt_key_padding_mask,
                memory_key_padding_mask=memory_key_padding_mask
            )
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

# Example usage
if __name__ == "__main__":
    # Hyperparameters
    src_vocab_size = 100
    tgt_vocab_size = 100
    d_model = 512
    nhead = 8
    num_layers = 6
    dim_feedforward = 2048
    dropout = 0.1

    # Create model
    model = Transformer(
        src_vocab_size=src_vocab_size,
        tgt_vocab_size=tgt_vocab_size,
        d_model=d_model,
        nhead=nhead,
        num_layers=num_layers,
        dim_feedforward=dim_feedforward,
        dropout=dropout
    )

    # Example forward pass
    src = torch.randint(0, src_vocab_size, (1, 10))  # (seq_len, batch_size)
    tgt = torch.randint(0, tgt_vocab_size, (2, 10))

    tgt_mask = generate_square_subsequent_mask(tgt.size(0))

    output = model(src, tgt, tgt_mask=tgt_mask)
    print(f"Output shape: {output.shape}")  # Should be (tgt_seq_len, batch_size, tgt_vocab_size)

Output shape: torch.Size([2, 10, 100])


In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=20):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, input_vocab_size, target_vocab_size, d_model=8, nhead=1, num_layers=1, dim_feedforward=16):
        super(TransformerModel, self).__init__()
        self.d_model = d_model
      # Embedding layers
        self.input_embedding = nn.Embedding(input_vocab_size, d_model)
        self.target_embedding = nn.Embedding(target_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        # Transformer
        self.transformer = nn.Transformer(
            d_model=d_model, nhead=nhead, num_encoder_layers=num_layers,
            num_decoder_layers=num_layers, dim_feedforward=dim_feedforward, batch_first=True
        )
        # Output layer
        self.fc_out = nn.Linear(d_model, target_vocab_size)
    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)
        return mask
    def forward(self, src, tgt):
        src_mask = None
        tgt_mask = self.generate_square_subsequent_mask(tgt.size(1)).to(src.device)
        src_emb = self.positional_encoding(self.input_embedding(src) * np.sqrt(self.d_model))
        tgt_emb = self.positional_encoding(self.target_embedding(tgt) * np.sqrt(self.d_model))
        output = self.transformer(src_emb, tgt_emb, src_mask=src_mask, tgt_mask=tgt_mask)
        return self.fc_out(output)

## A ChatBot

In [None]:
input_texts  =[]
target_texts = []
with open("/content/dialogs.txt",'r') as f :
    for line in f :
        line  =  line.split('\t')
        input_texts.append(line[0])
        target_texts.append(line[1])
print(len(input_texts) == len(target_texts))

True


In [None]:
target_texts[:5]

["i'm fine. how about yourself?\n",
 "i'm pretty good. thanks for asking.\n",
 'no problem. so how have you been?\n',
 "i've been great. what about you?\n",
 "i've been good. i'm in school right now.\n"]

In [None]:
# Filter out the first 100 prompts
input_texts = input_texts[:10]
target_texts = target_texts[:10]

In [None]:
import re
def clean_text(text):
       text = re.sub(r"[^a-zA-Z0-9 ]", "", text)  # Keep only letters, numbers, and spaces
       return text

In [None]:
input_texts = [clean_text(text) for text in input_texts]
target_texts = [clean_text(text) for text in target_texts]

In [None]:
from collections import Counter


def build_vocab(texts):
    words = [word for sentence in texts for word in sentence.split()]
    word_counts = Counter(words)
    vocab = ['<pad>', '<sos>', '<eos>'] + list(word_counts.keys())
    word_to_idx = {word: idx for idx, word in enumerate(vocab)}
    idx_to_word = {idx: word for word, idx in word_to_idx.items()}
    return vocab, word_to_idx, idx_to_word

input_vocab, input_word_to_idx, input_idx_to_word = build_vocab(input_texts)
target_vocab, target_word_to_idx, target_idx_to_word = build_vocab(target_texts)

print("Input Vocabulary:", input_vocab)
print("Target Vocabulary:", target_vocab)
print('*'*8)
print("Input Word to Index:", input_word_to_idx)
print("Target Word to Index:", target_word_to_idx)
print('*'*8)
print("Input Index to Word:", input_idx_to_word)
print("Target Index to Word:", target_idx_to_word)


Input Vocabulary: ['<pad>', '<sos>', '<eos>', 'hi', 'how', 'are', 'you', 'doing', 'im', 'fine', 'about', 'yourself', 'pretty', 'good', 'thanks', 'for', 'asking', 'no', 'problem', 'so', 'have', 'been', 'ive', 'great', 'what', 'in', 'school', 'right', 'now', 'do', 'go', 'to', 'i', 'pcc', 'like', 'it', 'there', 'its', 'okay', 'a', 'really', 'big', 'campus']
Target Vocabulary: ['<pad>', '<sos>', '<eos>', 'im', 'fine', 'how', 'about', 'yourself', 'pretty', 'good', 'thanks', 'for', 'asking', 'no', 'problem', 'so', 'have', 'you', 'been', 'ive', 'great', 'what', 'in', 'school', 'right', 'now', 'do', 'go', 'to', 'i', 'pcc', 'like', 'it', 'there', 'its', 'okay', 'a', 'really', 'big', 'campus', 'luck', 'with']
********
Input Word to Index: {'<pad>': 0, '<sos>': 1, '<eos>': 2, 'hi': 3, 'how': 4, 'are': 5, 'you': 6, 'doing': 7, 'im': 8, 'fine': 9, 'about': 10, 'yourself': 11, 'pretty': 12, 'good': 13, 'thanks': 14, 'for': 15, 'asking': 16, 'no': 17, 'problem': 18, 'so': 19, 'have': 20, 'been': 21, 

In [None]:
def encode_text(text, word_to_idx, max_len=10):
    tokens = ['<sos>'] + text.split() + ['<eos>']
    token_ids = [word_to_idx.get(word, word_to_idx['<pad>']) for word in tokens]
    token_ids += [word_to_idx['<pad>']] * (max_len - len(token_ids))
    return torch.tensor(token_ids[:max_len])

src_data = torch.stack([encode_text(text, input_word_to_idx) for text in input_texts])
tgt_data = torch.stack([encode_text(text, target_word_to_idx) for text in target_texts])

print("Source Data:", src_data)
print("Target Data:", tgt_data)


Source Data: tensor([[ 1,  3,  4,  5,  6,  7,  2,  0,  0,  0],
        [ 1,  8,  9,  4, 10, 11,  2,  0,  0,  0],
        [ 1,  8, 12, 13, 14, 15, 16,  2,  0,  0],
        [ 1, 17, 18, 19,  4, 20,  6, 21,  2,  0],
        [ 1, 22, 21, 23, 24, 10,  6,  2,  0,  0],
        [ 1, 22, 21, 13,  8, 25, 26, 27, 28,  2],
        [ 1, 24, 26, 29,  6, 30, 31,  2,  0,  0],
        [ 1, 32, 30, 31, 33,  2,  0,  0,  0,  0],
        [ 1, 29,  6, 34, 35, 36,  2,  0,  0,  0],
        [ 1, 37, 38, 37, 39, 40, 41, 42,  2,  0]])
Target Data: tensor([[ 1,  3,  4,  5,  6,  7,  2,  0,  0,  0],
        [ 1,  3,  8,  9, 10, 11, 12,  2,  0,  0],
        [ 1, 13, 14, 15,  5, 16, 17, 18,  2,  0],
        [ 1, 19, 18, 20, 21,  6, 17,  2,  0,  0],
        [ 1, 19, 18,  9,  3, 22, 23, 24, 25,  2],
        [ 1, 21, 23, 26, 17, 27, 28,  2,  0,  0],
        [ 1, 29, 27, 28, 30,  2,  0,  0,  0,  0],
        [ 1, 26, 17, 31, 32, 33,  2,  0,  0,  0],
        [ 1, 34, 35, 34, 36, 37, 38, 39,  2,  0],
        [ 1,  9, 40, 41

In [None]:
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(len(input_vocab), len(target_vocab)).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=target_word_to_idx['<pad>'])
optimizer = optim.Adam(model.parameters(), lr=0.01)


In [None]:
def train_model(model, src_data, tgt_data, epochs=100):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for src, tgt in zip(src_data, tgt_data):
            src, tgt = src.unsqueeze(0).to(device), tgt.unsqueeze(0).to(device)
            # print("Source shape",src.shape)
            # Split target into input and output
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            optimizer.zero_grad()
            output = model(src, tgt_input)
            # print(output.shape)
            # Reshape for calculating loss
            output = output.reshape(-1, len(target_vocab)) # Adjust dimensions so that 2nd dimension is len(targt_vocab)
            tgt_output = tgt_output.reshape(-1) # Flatten
            # print(output.shape, tgt_output.shape)
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(src_data)}")


In [None]:
def evaluate_model(model, input_text):
    model.eval()
    src = encode_text(input_text, input_word_to_idx).unsqueeze(0).to(device)
    # print(src.shape)
    tgt_input = torch.tensor([[target_word_to_idx['<sos>']]], dtype=torch.long).to(device)

    for _ in range(10):
        with torch.no_grad():
            output = model(src, tgt_input)
            # print(output.shape)
            next_token_logits = output[:, -1, :]
            # print(next_token_logits.shape)
            next_token_id = torch.argmax(next_token_logits, dim=-1).item()

            if next_token_id == target_word_to_idx['<eos>']:
                break

            tgt_input = torch.cat([tgt_input, torch.tensor([[next_token_id]], dtype=torch.long).to(device)], dim=1)

    result = [target_idx_to_word[idx.item()] for idx in tgt_input[0]]
    return ' '.join(result[1:])

# Example
print(evaluate_model(model, "hi how are you"))


good for <pad> there have there have there have there


In [None]:
train_model(model, src_data, tgt_data)

Epoch 1/100, Loss: 3.9007261037826537
Epoch 2/100, Loss: 3.5297161102294923
Epoch 3/100, Loss: 3.3479026794433593
Epoch 4/100, Loss: 3.171660852432251
Epoch 5/100, Loss: 2.9848204135894774
Epoch 6/100, Loss: 2.827900457382202
Epoch 7/100, Loss: 2.6371424198150635
Epoch 8/100, Loss: 2.5025051116943358
Epoch 9/100, Loss: 2.4049530029296875
Epoch 10/100, Loss: 2.2357290744781495
Epoch 11/100, Loss: 2.0342037200927736
Epoch 12/100, Loss: 1.9803359389305115
Epoch 13/100, Loss: 1.712653386592865
Epoch 14/100, Loss: 1.6954543948173524
Epoch 15/100, Loss: 1.5787148833274842
Epoch 16/100, Loss: 1.3269709646701813
Epoch 17/100, Loss: 1.2812429785728454
Epoch 18/100, Loss: 1.153561520576477
Epoch 19/100, Loss: 1.1810798048973083
Epoch 20/100, Loss: 1.1622466027736664
Epoch 21/100, Loss: 0.9953729748725891
Epoch 22/100, Loss: 0.9568857908248901
Epoch 23/100, Loss: 0.9276186406612397
Epoch 24/100, Loss: 0.8350179433822632
Epoch 25/100, Loss: 0.8391339957714081
Epoch 26/100, Loss: 0.8965742945671081

In [None]:
print(evaluate_model(model, "hello how are you doing?"))

im fine how about yourself
