# Learning Metal

## Practicing with Matrices:

Here I will be testing and trying out different methods that can be used to train and enhance our model to be able to effectively learn from a large file. First we will start off with small books, and then move on from there. 

In [82]:
import torch
import torch.nn as nn
from torch.nn import functional as F

# Check for MPS support (for Metal GPU acceleration on macOS)
device = 'mps' if torch.has_mps else 'cpu'
print(device)

block_size = 16  # increase block size to allow more context
batch_size = 32  # increase batch size for better gradient estimates
max_iters = 5000  # increase max iterations for more training time
learning_rate = 5e-4  # try a slightly higher learning rate
eval_iters = 100  # evaluate more frequently

mps


  device = 'mps' if torch.has_mps else 'cpu'


This just shows that you have successfully installed Metal on your device. As long as it shows mps. 

In [83]:
with open('wonderful_wizard_oz.txt', 'r', encoding='utf-8') as f:
    text = f.read()

chars = sorted(set(text))
print(chars)
vocab_size = len(chars)
print(vocab_size)

['\n', ' ', '!', '&', '(', ')', ',', '-', '.', '0', '1', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '—', '‘', '’', '“', '”', '\ufeff']
73


Great! 73 is the amount of each individual character shown in this whole book! Later on we will convert each one into a token and then further down we will embedden them into matrices!

In [84]:
string_to_int = {ch: i for i, ch in enumerate(chars)}
int_to_string = {i: ch for i, ch in enumerate(chars)}

encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_string[i] for i in l])

data = torch.tensor(encode(text), dtype=torch.long)
print(data[:100])

tensor([72, 34, 48, 45,  1, 37, 55, 54, 44, 45, 58, 46, 61, 52,  1, 37, 49, 66,
        41, 58, 44,  1, 55, 46,  1, 29, 66,  0,  0, 42, 65,  1, 26,  8,  1, 20,
        58, 41, 54, 51,  1, 16, 41, 61, 53,  0,  0,  0, 34, 48, 49, 59,  1, 42,
        55, 55, 51,  1, 49, 59,  1, 44, 45, 44, 49, 43, 41, 60, 45, 44,  1, 60,
        55,  1, 53, 65,  1, 47, 55, 55, 44,  1, 46, 58, 49, 45, 54, 44,  1,  3,
         1, 43, 55, 53, 58, 41, 44, 45,  0, 27])


See how now each letter, space, symbol, indent, etc. now has its own classification.

In [85]:
# Define parameters
block_size = 30  # Sequence length (number of tokens per sequence)
batch_size = 64  # Number of sequences per batch

# Split the data into train and validation sets
n = int(0.8 * len(data))
train_data = data[:n]
val_data = data[n:]

# Compute min/max from training data only
min_val, max_val = train_data.min(), train_data.max()

# Normalize train and validation data (using train's min/max for consistency)
train_data = (train_data - min_val) / (max_val - min_val)
val_data = (val_data - min_val) / (max_val - min_val)

def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))  # Select starting points of sequences

    # Create sequences and targets
    x = torch.stack([data[i:i+block_size] for i in ix])  # shape: (batch_size, block_size)
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])  # shape: (batch_size, block_size)

    # Moving data to device (GPU if available)
    x, y = x.to(device), y.to(device)
    return x, y

# Get a batch
x, y = get_batch('train')
print('inputs:')
print(x.shape)  # Should be (batch_size, block_size)
print(x)
print(y.shape)
print('targets:')
print(y)

inputs:
torch.Size([64, 30])
tensor([[0.0139, 0.6806, 0.7500,  ..., 0.8056, 0.6806, 0.7500],
        [0.8750, 0.6667, 0.5694,  ..., 0.6667, 0.9028, 0.0139],
        [0.0139, 0.6667, 0.6250,  ..., 0.5694, 0.7500, 0.8333],
        ...,
        [0.6667, 0.5694, 0.6111,  ..., 0.6667, 0.7639, 0.8472],
        [0.8750, 0.0833, 0.0139,  ..., 0.0139, 0.6389, 0.6806],
        [0.6111, 0.6250, 0.7500,  ..., 0.6667, 0.6250, 0.0139]],
       device='mps:0')
torch.Size([64, 30])
targets:
tensor([[0.6806, 0.7500, 0.0139,  ..., 0.6806, 0.7500, 0.6528],
        [0.6667, 0.5694, 0.8333,  ..., 0.9028, 0.0139, 0.8750],
        [0.6667, 0.6250, 0.8056,  ..., 0.7500, 0.8333, 0.0139],
        ...,
        [0.5694, 0.6111, 0.0139,  ..., 0.7639, 0.8472, 0.8056],
        [0.0833, 0.0139, 0.8750,  ..., 0.6389, 0.6806, 0.6528],
        [0.6250, 0.7500, 0.0139,  ..., 0.6250, 0.0139, 0.6111]],
       device='mps:0')


The data is now split. 80 percent train and 20% validation. the get_batch function takes either the **'train'** or **'val'** sets and gets random bunches from both sets.

In [86]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

Don't worry about model not being defined yet, we first need to class the Birgramlanguange model.

In [87]:
'''
class BigramLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)
        
    def forward(self, index, targets=None):
        logits = self.token_embedding_table(index)
        
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)
        
        return logits, loss
    
    def generate(self, index, max_new_tokens):
        # index is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self.forward(index)
            # focus only on the last time step
            logits = logits[:, -1, :]  # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1)  # (B, C)
            # sample from the distribution
            index_next = torch.multinomial(probs, num_samples=1)  # (B, 1)
            # append sampled index to the running sequence
            index = torch.cat((index, index_next), dim=1)  # (B, T+1)
        return index

model = BigramLanguageModel(vocab_size)
m = model.to(device)

context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated_chars = decode(m.generate(context, max_new_tokens=500)[0].tolist())
print(generated_chars)
'''

'\nclass BigramLanguageModel(nn.Module):\n    def __init__(self, vocab_size):\n        super().__init__()\n        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)\n        \n    def forward(self, index, targets=None):\n        logits = self.token_embedding_table(index)\n        \n        if targets is None:\n            loss = None\n        else:\n            B, T, C = logits.shape\n            logits = logits.view(B * T, C)\n            targets = targets.view(B * T)\n            loss = F.cross_entropy(logits, targets)\n        \n        return logits, loss\n    \n    def generate(self, index, max_new_tokens):\n        # index is (B, T) array of indices in the current context\n        for _ in range(max_new_tokens):\n            # get the predictions\n            logits, loss = self.forward(index)\n            # focus only on the last time step\n            logits = logits[:, -1, :]  # becomes (B, C)\n            # apply softmax to get probabilities\n            probs

In [88]:
import torch
import torch.nn as nn
import torch.optim as optim

class SequencePredictionModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2):
        super(SequencePredictionModel, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layer with correct input_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        # Fully connected layer to output the prediction
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # Ensure the input is in float32 format
        x = x.float()

        # If the input is 2D (batch_size, sequence_length), assume 1 feature per time step
        if x.dim() == 2:
            x = x.unsqueeze(-1)  # Add a feature dimension, making it (batch_size, sequence_length, 1)

        # Ensure the input has the correct shape for the LSTM: (batch_size, sequence_length, 9)
        if x.size(-1) == 1:  # Check if the input has 1 feature per timestep
            x = x.expand(-1, -1, 9)  # Expand to 9 features per timestep

        # Check input shape for debugging
        print(f"Input shape for LSTM: {x.shape}")

        # Initialize hidden state with zeros (set to float32)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size, dtype=torch.float32).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size, dtype=torch.float32).to(x.device)
        
        # LSTM forward pass
        lstm_out, _ = self.lstm(x, (h0, c0))
        
        # Only take the output from the last time step
        last_time_step_out = lstm_out[:, -1, :]
        
        # Fully connected layer output
        out = self.fc(last_time_step_out)
        
        return out

# Example of model usage:
input_size = 9  # Number of features per time step (adjust as per data)
hidden_size = 128  # Hidden size of the LSTM layer
output_size = 16  # Output size (adjust based on target)

# Creating the model
model = SequencePredictionModel(input_size, hidden_size, output_size)

# Dummy input with 3D shape (batch_size=64, sequence_length=30, input_size=9)
x = torch.randn(64, 30, 9)  # 64 sequences, 30 timesteps per sequence

# Dummy target (y) with 2D shape (batch_size=64, output_size=16)
y = torch.randn(64, 16)  # Target with 16 values per sequence

# Forward pass
output = model(x)

# Compute the loss
criterion = nn.MSELoss()  # Mean Squared Error Loss (for regression tasks)
loss = criterion(output, y)

# Backward pass and optimization
optimizer = optim.Adam(model.parameters(), lr=0.001)
optimizer.zero_grad()
loss.backward()
optimizer.step()

print(f"Loss: {loss.item():.4f}")

Input shape for LSTM: torch.Size([64, 30, 9])
Loss: 0.9483


In [89]:
import torch
import torch.nn as nn
import torch.optim as optim

# Hyperparameter search loop
hidden_sizes = [64, 128, 256]  # Example hidden sizes to test
num_layers_list = [1, 2, 3]  # Number of LSTM layers to test
learning_rate = 0.001  # Learning rate for the optimizer
num_epochs = 100  # Number of epochs for training (increase this to iterate more)

best_loss = float('inf')
best_model = None
best_hyperparameters = {}

# Dummy input and target data (to be replaced with actual data)
input_data = torch.randn(64, 30, 9)  # Batch size 64, 30 time steps, 9 features
target_data = torch.randn(64, 16)    # Batch size 64, 16 outputs

# Loop through different hyperparameter combinations
for hidden_size in hidden_sizes:
    for num_layers in num_layers_list:
        
        # Create the model with current hyperparameters
        model = SequencePredictionModel(input_size=9, hidden_size=hidden_size, output_size=16, num_layers=num_layers)
        
        # Define the optimizer and loss function
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        criterion = nn.MSELoss()  # Mean Squared Error Loss
        
        # Train the model for multiple epochs
        model.train()
        for epoch in range(num_epochs):  # Iterate over multiple epochs
            optimizer.zero_grad()
            
            # Get the model output for the training data
            outputs = model(input_data)
            
            # Compute the loss for training data
            loss = criterion(outputs, target_data)
            
            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Print loss for the current epoch
            print(f"Epoch [{epoch+1}/{num_epochs}], hidden_size={hidden_size}, num_layers={num_layers} - Loss: {loss.item():.4f}")

        # After all epochs, check if this model is the best so far
        if loss.item() < best_loss:
            best_loss = loss.item()
            best_model = model
            best_hyperparameters = {'hidden_size': hidden_size, 'num_layers': num_layers}

# Print the best model and its hyperparameters based on training loss
print("\nBest Model Found!")
print(f"Hidden Size: {best_hyperparameters['hidden_size']}")
print(f"Number of Layers: {best_hyperparameters['num_layers']}")
print(f"Best Training Loss: {best_loss:.4f}")

Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [1/100], hidden_size=64, num_layers=1 - Loss: 1.0677
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [2/100], hidden_size=64, num_layers=1 - Loss: 1.0652
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [3/100], hidden_size=64, num_layers=1 - Loss: 1.0627
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [4/100], hidden_size=64, num_layers=1 - Loss: 1.0603
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [5/100], hidden_size=64, num_layers=1 - Loss: 1.0579
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [6/100], hidden_size=64, num_layers=1 - Loss: 1.0555
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [7/100], hidden_size=64, num_layers=1 - Loss: 1.0532
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [8/100], hidden_size=64, num_layers=1 - Loss: 1.0509
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [9/100], hidden_size=64, num_layers=1 - Loss: 1.0486
Input shape for LSTM: torch.Size([64, 30, 9])
Epoch [10

best loss found is 0.0003 and that is with 3 layers and a hidden size of 256. 

In [90]:
# Ensure val_data is at least 2D
if val_data.dim() == 1:
    val_data = val_data.unsqueeze(-1)  # Convert (num_samples,) → (num_samples, 1)

# Create sequences for validation
x_val = torch.stack([val_data[i:i+block_size] for i in range(len(val_data) - block_size)])  # (batch_size, block_size, num_features)

# Fix target extraction: Extract last 16 values per sample
y_val = torch.stack([val_data[i+block_size, -16:] for i in range(len(val_data) - block_size)])  # (batch_size, 16)

# Ensure input shape matches LSTM expectations
x_val = x_val.expand(-1, -1, 9).float()  # Expand feature dimension to match model

# Ensure target is float
y_val = y_val.float()

# Debugging: Check shapes
print(f"Input shape for LSTM (Validation): {x_val.shape}")  # (batch_size, 30, 9)
print(f"Target shape for validation: {y_val.shape}")  # (batch_size, 16)

# Forward pass through the model
best_model.eval()
outputs_val = best_model(x_val)

# Debugging: Check output shape
print(f"Model output shape: {outputs_val.shape}")  # Should be (batch_size, 16)

# Compute validation loss
validation_loss = nn.MSELoss()(outputs_val, y_val)

# Print the validation loss
print(f"Validation Loss: {validation_loss.item():.4f}")

Input shape for LSTM (Validation): torch.Size([41530, 30, 9])
Target shape for validation: torch.Size([41530, 1])
Input shape for LSTM: torch.Size([41530, 30, 9])
Model output shape: torch.Size([41530, 16])


  return F.mse_loss(input, target, reduction=self.reduction)


Validation Loss: 0.5353


So based on our first batch of trained data this is what our model predicted!

In [91]:
'''
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import torch

# Load the GPT-2 small model and tokenizer
model = GPT2LMHeadModel.from_pretrained("gpt2")
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

# Move the model to the correct device (MPS for Mac)
device = "mps"  # for MPS on Mac
model.to(device)

# Hyperparameters
learning_rate = 3e-4
max_iters = 1000
eval_iters = 250
batch_size = 4

# Create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# Learning rate scheduler (StepLR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=500, gamma=0.9)

# Training loop
for iter in range(max_iters):
    if iter % eval_iters == 0:
        losses = estimate_loss()  # Define your custom function to compute loss
        print(f"step: {iter}, train loss: {losses['train']:.3f}, val loss: {losses['val']:.3f}")

    # Sample a batch of data (assumed to provide text data)
    xb, yb = get_batch('train')  # Ensure this returns text data

    # Tokenize input and label text
    inputs = tokenizer(xb, return_tensors='pt', padding=True, truncation=True, max_length=512).to(device)
    labels = tokenizer(yb, return_tensors='pt', padding=True, truncation=True, max_length=512).to(device)

    # Prepare attention mask
    attention_mask = inputs['attention_mask']

    # Forward pass through GPT-2 model with `use_cache=False` to disable past_key_values
    outputs = model(input_ids=inputs['input_ids'], labels=labels['input_ids'], attention_mask=attention_mask, use_cache=False)

    # Get the loss value from the model's output
    loss = outputs.loss

    # Backpropagation and optimization
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

    # Update the learning rate after each step
    scheduler.step()

    # Print the loss value at regular intervals
    if iter % 100 == 0:
        print(f"Iter {iter} - Loss: {loss.item()}")

# Final model loss
print(f"Final loss: {loss.item()}")
'''

'\nfrom transformers import GPT2LMHeadModel, GPT2Tokenizer\nimport torch\n\n# Load the GPT-2 small model and tokenizer\nmodel = GPT2LMHeadModel.from_pretrained("gpt2")\ntokenizer = GPT2Tokenizer.from_pretrained("gpt2")\n\n# Move the model to the correct device (MPS for Mac)\ndevice = "mps"  # for MPS on Mac\nmodel.to(device)\n\n# Hyperparameters\nlearning_rate = 3e-4\nmax_iters = 1000\neval_iters = 250\nbatch_size = 4\n\n# Create a PyTorch optimizer\noptimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)\n\n# Learning rate scheduler (StepLR)\nscheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=500, gamma=0.9)\n\n# Training loop\nfor iter in range(max_iters):\n    if iter % eval_iters == 0:\n        losses = estimate_loss()  # Define your custom function to compute loss\n        print(f"step: {iter}, train loss: {losses[\'train\']:.3f}, val loss: {losses[\'val\']:.3f}")\n\n    # Sample a batch of data (assumed to provide text data)\n    xb, yb = get_batch(\'tra

In [95]:
import torch
import torch.nn as nn

class SequencePredictionModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SequencePredictionModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Define the forward pass (this may already be defined)
        pass

    def generate(self, context, max_new_tokens):
        generated = context.float()  # Convert context to float32

        # Debugging print: Checking the shape of the context tensor at the start
        print(f"Initial context shape: {generated.shape}")
        
        # Ensure that context has 3 dimensions, (batch_size, sequence_length, input_size)
        if generated.dim() == 2:  # If context is 2D, add a dummy feature dimension
            generated = generated.unsqueeze(-1)  # (batch_size, sequence_length, 1)
            print(f"After unsqueeze: {generated.shape}")  # Debugging

        # Now, check if the context has the right number of features (9), and expand if necessary
        if generated.size(-1) != 9:
            print(f"Before expand: {generated.shape}")  # Debugging
            generated = generated.expand(-1, -1, 9)  # Expand to match 9 features
            print(f"After expand: {generated.shape}")  # Debugging

        # Now we are ready to generate new tokens
        for _ in range(max_new_tokens):
            # Forward pass through the LSTM
            lstm_out, _ = self.lstm(generated)
            
            # Get the output from the last time step (predicted token)
            output = self.fc(lstm_out[:, -1, :])  # Pass through the fully connected layer
            next_token = output.argmax(dim=-1)  # Choose the token with the highest probability
            
            # Append the generated token to the context for the next step
            next_token_expanded = next_token.unsqueeze(0).unsqueeze(-1).float()  # Convert to float32 and add necessary dimensions
            generated = torch.cat((generated, next_token_expanded.expand(-1, -1, 9)), dim=1)  # Add the new token to context

        return generated

In [96]:
# Ensure both model and input tensor are on the same device
context = torch.randn((1, 1, 9), dtype=torch.float32, device=device)

# Ensure the model is on the same device
best_model.to(device)

# Generate the output from the model
generated_output = best_model.generate(context, max_new_tokens=500)

# Flatten the output and ensure it's a list of indices by using argmax
generated_tokens = generated_output[0].flatten().argmax(dim=-1).tolist()

# Make sure generated_tokens is a list
if not isinstance(generated_tokens, list):
    generated_tokens = [generated_tokens]

# Decode the generated tokens
generated_chars = decode(generated_tokens)
print(generated_chars)

AttributeError: 'SequencePredictionModel' object has no attribute 'generate'

I know it still is unreadable, but if you notice, the letters are making more and more sense. And, you might actually catch a couple words here and there if you run it a couple of times. 