In [None]:
# Import necessary libraries

import torch
import torch.nn as nn
import math
import re
import pickle

# If GPU is available, set the device to GPU, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
    torch.cuda.set_device(0)
    torch.set_default_device(torch.device("cuda:0"))
    print("Using GPU")

torch.cuda.empty_cache()

In [None]:
# Let's define again some classes and functions that might be needed later

# Function to normalize text by converting to lowercase, removing digits and punctuation
def normalize_text(text):
    text = text.lower()
    text = re.sub(r'\d+', '', text)  # Remove digits
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    text = re.sub(r'\s+', ' ', text)  # Remove extra whitespace
    text = text.strip()  # Remove leading and trailing whitespace
    return text

# Vocabulary class to handle word indexing and counting
PAD_token = 0
SOS_token = 1
EOS_token = 2
class Vocabulary:
    def __init__(self):
        self.trimmed = False
        self.reset_vocab()

    def reset_vocab(self):
        self.word2index = {}
        self.word2count = {}
        self.index2word = {PAD_token: "PAD", SOS_token: "SOS", EOS_token: "EOS"}
        self.num_words = 3

    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.num_words
            self.word2count[word] = 1
            self.index2word[self.num_words] = word
            self.num_words += 1
        else:
            self.word2count[word] += 1

    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            self.add_word(word)

    def trim(self, min_count=1):
        if self.trimmed:
            return
        self.trimmed = True

        keep_words = []
        for word, count in self.word2count.items():
            if count >= min_count:
                keep_words.append(word)

        self.reset_vocab()
        for word in keep_words:
            self.add_word(word)

    def __len__(self):
        return self.num_words
    
    def __getitem__(self, item):
        if isinstance(item, str):
            return self.word2index.get(item, None)
        elif isinstance(item, int):
            return self.index2word.get(item, None)
        else:
            raise TypeError("Item must be either a string or an integer.")
        
    def __contains__(self, item):
        if isinstance(item, str):
            return item in self.word2index
        elif isinstance(item, int):
            return item in self.index2word
        else:
            raise TypeError("Item must be either a string or an integer.")
        
# Layers and models for the Transformer architecture
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        # Initialize dimensions
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # Apply mask if provided (useful for preventing attention to certain parts like padding)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        
        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)
        
        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x  
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

# Model parameters
src_vocab_size = 23570
tgt_vocab_size = 23570
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 464
dropout = 0.1

In [None]:
# Load the model later using the following function

def load_transformer(filename, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
    model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
    model.load_state_dict(torch.load(filename))
    model.to(device)
    model.eval()
    print(f"Model loaded from {filename}")
    return model

transformer = load_transformer(
    "transformer_model.pth",
    src_vocab_size,
    tgt_vocab_size,
    d_model,
    num_heads,
    num_layers,
    d_ff,
    max_seq_length,
    dropout
)

# Also load the vocabulary
with open("vocab.pkl", "rb") as f:
    vocab = pickle.load(f)
    print("Vocabulary loaded with {} words.".format(len(vocab)))
    print("Vocabulary contains the following special tokens:", vocab[PAD_token], vocab[SOS_token], vocab[EOS_token])

In [None]:
# Define a function to predict the response for a given input sequence
def predict_response(input_seq):
    input_tensor = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)
    output = transformer(input_tensor, input_tensor[:, :-1])
    output_seq = output.argmax(dim=-1).squeeze().tolist()
    return output_seq if isinstance(output_seq, list) else [output_seq]

# Now let's convert input sentence string to output sentence string
def input_to_output(input_sentence):
    input_sentence = normalize_text(input_sentence)
    if not input_sentence:
        return "Input sentence is empty."
    input_seq = [vocab[word] for word in input_sentence.split() if word in vocab]
    output_seq = predict_response(input_seq)
    output_sentence = ' '.join([vocab[index] for index in output_seq if index in vocab])
    return output_sentence

In [None]:
# Example usage of the input_to_output function

sentences = [
    "Hello, how are you?",
    "What is your name?",
    "Tell me a joke.",
    "What is the weather like today?",
    "I love programming.",
    "Can you help me with my homework?",
    "What is your favorite color?",
    "Do you like music?",
    "What is the meaning of life?",
    "How do I train a machine learning model?",
    "What is the capital of France?",
    "Tell me a story.",
    "What is the best way to learn Python?",
    "How can I improve my coding skills?",
    "What is the latest news?",
    "What is your favorite movie?",
    "What is the best way to stay healthy?",
]

for sent in sentences:
    print(f"Input Sentence: {sent}")
    print("Output Sentence:", input_to_output(sent))
    print("-" * 50)

In [None]:
# Now let's chat with the model

def chat_with_model():
    print("Chat with the model! Type 'exit' to quit.")
    while True:
        user_input = input("You: ")
        if user_input.lower() == 'exit':
            print("Exiting chat. Goodbye!")
            break
        response = input_to_output(user_input)
        print(f"Model: {response}")

In [None]:
# Uncomment the line below to start chatting with the model
#chat_with_model()