In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import re


In [61]:
# Example Dataset
sentences = [
    "The sky is clear, and the sun is shining brightly.",
    "Tomorrow's forecast predicts a chance of thunderstorms.",
    "The temperature is expected to drop below freezing tonight.",
    "The weather is perfect for a day at the beach.",
    "Strong winds are causing power outages across the region.",
    "A hurricane is approaching the coastline, and residents are advised to evacuate.",
    "There is a severe weather warning in effect until midnight.",
    "The sunset painted the sky with hues of orange and pink.",
    "The heatwave has broken temperature records this year.",
    "It's a cloudy day with a chance of light showers in the afternoon.",
    "The weather has been unpredictable lately, changing from sunny to rainy within hours.",
    "The spring blossoms are early this year due to mild weather.",
    "People are enjoying outdoor concerts as the nights get warmer.",
    "A warm breeze carried the scent of blooming flowers through the air.",
    "A heat advisory has been issued for the upcoming days.",
    "The local weather station reported record high temperatures today.",
    "A cool breeze is a welcome relief from the afternoon sun.",
    "Unexpected weather changes have become a common theme this year.",
    "The windchill factor makes it feel much colder outside.",
]

# Build vocabulary mapping words to IDs
def build_vocab(sentences):
    vocab = {"<pad>": 0, "<unk>": 1}
    index = 2
    for sentence in sentences:
        for word in sentence.lower().split():
            if word not in vocab:
                vocab[word] = index
                index += 1
    return vocab

vocab = build_vocab(sentences)
vocab_size = len(vocab)
padding_idx = vocab["<pad>"]

In [None]:
# Tokenization function
def tokenize_sentence(sentence, vocab):
    return [vocab.get(word.lower(), vocab["<unk>"]) for word in sentence.split()]

# Padding function
def pad_sequence(seq, max_len, pad_value=0):
    return seq + [pad_value] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len]

# Dataset class
class TextDataset(Dataset):
    def __init__(self, sentences, vocab, max_len):
        self.max_len = max_len
        self.vocab = vocab
        self.data = [tokenize_sentence(sentence, vocab) for sentence in sentences]
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        seq = self.data[idx]
        x = seq[:-1]  # Input sequence
        y = seq[1:]   # Target sequence (shifted by one)
        x_padded = pad_sequence(x, self.max_len)
        y_padded = pad_sequence(y, self.max_len)
        return torch.tensor(x_padded, dtype=torch.long), torch.tensor(y_padded, dtype=torch.long)

In [None]:
# Transformer model components
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)  # Even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # Odd indices
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)].to(x.device)
        return x

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers, dim_feedforward, max_len, padding_idx):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=padding_idx)
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc_out = nn.Linear(d_model, vocab_size)
        self.d_model = d_model

    def forward(self, src):
        src_mask = self.generate_square_subsequent_mask(src.size(1)).to(src.device)
        src_pad_mask = (src == padding_idx).to(src.device)
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src.transpose(0, 1), mask=src_mask, src_key_padding_mask=src_pad_mask)
        output = self.fc_out(output)
        return output.transpose(0, 1)

    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)
        return mask

In [85]:
# Hyperparameters
max_len = 15
batch_size = 2
d_model = 64
nhead = 4
num_layers = 2
dim_feedforward = 128
num_epochs = 200

# Dataset and DataLoader
dataset = TextDataset(sentences, vocab, max_len)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, criterion, and optimizer
model = TransformerModel(vocab_size, d_model, nhead, num_layers, dim_feedforward, max_len, padding_idx)
criterion = nn.CrossEntropyLoss(ignore_index=padding_idx)
optimizer = optim.Adam(model.parameters(), lr=0.0005)

# Training loop
for epoch in range(1, num_epochs + 1):
    model.train()
    total_loss = 0
    for x_batch, y_batch in dataloader:
        optimizer.zero_grad()
        output = model(x_batch)
        output = output.reshape(-1, vocab_size)
        y_batch = y_batch.view(-1)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(dataloader)
    if (epoch%5==0):
        print(f"Epoch [{epoch}/{num_epochs}], Loss: {avg_loss:.4f}")

Epoch [5/200], Loss: 4.2328
Epoch [10/200], Loss: 3.4469
Epoch [15/200], Loss: 2.7120
Epoch [20/200], Loss: 2.0709
Epoch [25/200], Loss: 1.5228
Epoch [30/200], Loss: 1.1387
Epoch [35/200], Loss: 0.9051
Epoch [40/200], Loss: 0.6911
Epoch [45/200], Loss: 0.5742
Epoch [50/200], Loss: 0.4863
Epoch [55/200], Loss: 0.4248
Epoch [60/200], Loss: 0.3807
Epoch [65/200], Loss: 0.3357
Epoch [70/200], Loss: 0.2951
Epoch [75/200], Loss: 0.2873
Epoch [80/200], Loss: 0.2627
Epoch [85/200], Loss: 0.2357
Epoch [90/200], Loss: 0.2160
Epoch [95/200], Loss: 0.2161
Epoch [100/200], Loss: 0.1970
Epoch [105/200], Loss: 0.2107
Epoch [110/200], Loss: 0.1925
Epoch [115/200], Loss: 0.1997
Epoch [120/200], Loss: 0.1923
Epoch [125/200], Loss: 0.1806
Epoch [130/200], Loss: 0.1879
Epoch [135/200], Loss: 0.2028
Epoch [140/200], Loss: 0.1772
Epoch [145/200], Loss: 0.1941
Epoch [150/200], Loss: 0.1680
Epoch [155/200], Loss: 0.1955
Epoch [160/200], Loss: 0.1798
Epoch [165/200], Loss: 0.1839
Epoch [170/200], Loss: 0.1731


In [90]:
# Text generation function
def generate_text(model, vocab, start_text, max_len):
    model.eval()
    words = start_text.lower().split()
    input_ids = [vocab.get(word, vocab["<unk>"]) for word in words]
    generated = words.copy()
    generated[0]=generated[0].capitalize()
    input_seq = torch.tensor([pad_sequence(input_ids, max_len)], dtype=torch.long)
    with torch.no_grad():
        for _ in range(max_len - len(input_ids)):
            output = model(input_seq)
            next_token_logits = output[0, len(generated) - 1, :]
            next_token_id = torch.argmax(next_token_logits).item()
            next_word = [word for word, idx in vocab.items() if idx == next_token_id][0]
            generated.append(next_word)
            input_seq[0, len(generated) - 1] = next_token_id
            if next_token_id == vocab["<pad>"] or next_token_id == vocab["<unk>"] or any([s in next_word for s in {'.', '!', '?'}]):
                break
    return ' '.join(generated)

# Generate text
start_text = "The weather"
words=start_text.lower().split()
generated_text = generate_text(model, vocab, start_text, max_len)
print("\nGenerated Text:")
print(generated_text+"\n")


Generated Text:
The weather is perfect for a day at the beach.

