In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import re

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
class TextDataset(Dataset):
    def __init__(self, file_path, seq_length, vocab=None):
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        
        text = re.sub(r'\s+', ' ', text)  # Clean up spaces
        self.vocab = vocab or sorted(set(text))
        self.char2idx = {c: i for i, c in enumerate(self.vocab)}
        self.idx2char = {i: c for i, c in enumerate(self.vocab)}
        self.seq_length = seq_length
        self.text_as_int = np.array([self.char2idx[c] for c in text])
        self.num_samples = len(self.text_as_int) - seq_length
        
    def __len__(self):
        return self.num_samples
    
    def __getitem__(self, idx):
        input_seq = self.text_as_int[idx:idx + self.seq_length]
        target_seq = self.text_as_int[idx + 1:idx + self.seq_length + 1]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target_seq, dtype=torch.long)

In [5]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, x, hidden):
        x = self.embedding(x)
        out, hidden = self.lstm(x, hidden)
        out = self.fc(out)
        return out, hidden
    
    def init_hidden(self, batch_size):
        return (torch.zeros(num_layers, batch_size, hidden_size).to(device),
                torch.zeros(num_layers, batch_size, hidden_size).to(device))

In [6]:
def train_and_generate(model, dataloader, criterion, optimizer, num_epochs, generation_length, generate_every_n_epochs):
    model.train()
    print("Started training")
    for epoch in range(1, num_epochs + 1):
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)  # Move data to device
            optimizer.zero_grad()

            # Initialize hidden state with the correct batch size
            hidden = model.init_hidden(inputs.size(0))
            
            outputs, hidden = model(inputs, hidden)
            hidden = tuple([h.detach() for h in hidden])  # Detach hidden states for next batch
            loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
            loss.backward()
            optimizer.step()
        
        print(f'Epoch {epoch}/{num_epochs}, Loss: {loss.item()}')
        
        if epoch % generate_every_n_epochs == 0:
            generated_text = generate_text(model, generation_length)
            print(f"Generated text after {epoch} epochs:\n{generated_text}\n")

def generate_text(model, generation_length, temperature=0.5):
    model.eval()
    generated_text = ""
    input_seq = torch.randint(0, vocab_size, (1, 1), dtype=torch.long).to(device)  # Random start
    hidden = model.init_hidden(1)
    
    for _ in range(generation_length):
        with torch.no_grad():
            output, hidden = model(input_seq, hidden)
            output = output.div(temperature).exp()  # Apply temperature
            next_char_idx = torch.multinomial(output[0, -1], 1).item()
            next_char = idx2char[next_char_idx]
            generated_text += next_char
            input_seq = torch.tensor([[next_char_idx]], dtype=torch.long).to(device)
    
    return generated_text



In [7]:
# Hyperparameters
file_path = 'trump_speeches_combined_processed.txt'
seq_length = 100
embed_size = 128
hidden_size = 256
num_layers = 2
batch_size = 64
num_epochs = 50
learning_rate = 0.001
generation_length = 500  # Length of the generated text
generate_every_n_epochs = 5  # Generate text after every 5 epochs

# Initialize Dataset and DataLoader
dataset = TextDataset(file_path, seq_length)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Vocabulary size
vocab_size = len(dataset.vocab)
char2idx = dataset.char2idx
idx2char = dataset.idx2char

# Initialize the model, loss function, and optimizer
model = LSTMModel(vocab_size, embed_size, hidden_size, num_layers).to(device)  # Move model to device
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the model and generate text in a loop
train_and_generate(model, dataloader, criterion, optimizer, num_epochs, generation_length, generate_every_n_epochs)


Started training


KeyboardInterrupt: 