In [103]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader


In [104]:
class ResidualBlock(nn.Module):
    def __init__(self, in_features, out_features):
        super(ResidualBlock, self).__init__()
        self.fc1 = nn.Linear(in_features, out_features)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(out_features, out_features)
        if in_features != out_features:
            self.residual = nn.Linear(in_features, out_features)
        else:
            self.residual = nn.Identity()

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        # Add the residual
        out += self.residual(x)
        out = self.relu(out)
        return out

In [105]:
class ResidualMLP(nn.Module):
    def __init__(self, in_features = 28*28, out_features=10, hidden_size=128, resN = 2):
        super(ResidualMLP, self).__init__()
        self.in_features = in_features
        self.fc1 = nn.Linear(in_features, hidden_size)  # Input layer
        self.residual_blocks = nn.ModuleList([ResidualBlock(hidden_size, hidden_size) for _ in range(resN)])  # n Residual blocks
        self.fc2 = nn.Linear(hidden_size, hidden_size)  # Output layer
        self.relu = nn.ReLU()
        self.fc3 = nn.Linear(hidden_size, out_features)

    def forward(self, x):
        x = x.view(-1, self.in_features)  # Flatten the input
        x = self.fc1(x)  # First fully connected layer
        # Pass through the residual blocks
        for block in self.residual_blocks:
            x = block(x)
        x = self.fc2(x)  # Second fully connected layer
        return x

In [110]:

# MNIST dataset loading and transformations
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(dataset=train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=1000, shuffle=False)

in_features = 28 * 28
model = ResidualMLP()

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 1

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_idx, (data, targets) in enumerate(train_loader):
        optimizer.zero_grad()
        data = data.view(-1, in_features).float()  # Flatten images into vectors of size 784
        outputs = model(data)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')

# Test the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for data, targets in test_loader:
        data = data.view(-1, 28 * 28).float()  # Flatten the input images
        outputs = model(data)
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()

print(f'Test Accuracy: {100 * correct / total}%')

Epoch [1/1], Loss: 0.41994240821233947
Test Accuracy: 91.99%


In [111]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

class Trainer:
    def __init__(self, model, train_loader, test_loader, criterion, optimizer, device='cpu'):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device

    def train(self, num_epochs=5):
        self.model.train()
        for epoch in range(num_epochs):
            running_loss = 0.0
            correct = 0
            total = 0
            
            for batch_idx, (data, targets) in enumerate(self.train_loader):
                # Move data to the specified device
                data, targets = data.to(self.device), targets.to(self.device)
                data = data.view(-1, 28*28).float()  # Flatten MNIST image data

                # Forward pass
                outputs = self.model(data)
                loss = self.criterion(outputs, targets)

                # Backward pass and optimization
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                running_loss += loss.item()
                
                # Calculate accuracy for the current batch
                _, predicted = torch.max(outputs.data, 1)
                total += targets.size(0)
                correct += (predicted == targets).sum().item()

            # Print loss and accuracy for the epoch
            epoch_loss = running_loss / len(self.train_loader)
            epoch_accuracy = 100 * correct / total
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%')

    def evaluate(self):
        self.model.eval()
        correct = 0
        total = 0
        test_loss = 0.0
        
        with torch.no_grad():
            for data, targets in self.test_loader:
                # Move data to the specified device
                data, targets = data.to(self.device), targets.to(self.device)
                data = data.view(-1, 28*28).float()  # Flatten MNIST image data
                
                outputs = self.model(data)
                loss = self.criterion(outputs, targets)
                test_loss += loss.item()

                # Get the predicted class with the highest score
                _, predicted = torch.max(outputs.data, 1)
                total += targets.size(0)
                correct += (predicted == targets).sum().item()

        accuracy = 100 * correct / total
        avg_loss = test_loss / len(self.test_loader)
        print(f'Test Loss: {avg_loss:.4f}, Test Accuracy: {accuracy:.2f}%')
        return avg_loss, accuracy

    def save_checkpoint(self, path='model.pth'):
        torch.save(self.model.state_dict(), path)
        print(f'Model checkpoint saved at {path}')

    def load_checkpoint(self, path='model.pth'):
        self.model.load_state_dict(torch.load(path))
        print(f'Model checkpoint loaded from {path}')

In [112]:
# Model, loss function, optimizer
input_size = 28 * 28  # MNIST images are 28x28 pixels
hidden_size = 128  # First hidden layer size
output_size = 10    # Output size (10 classes for digits 0-9)
num_blocks = 3      # Number of residual blocks

model = ResidualMLP(in_features=input_size, out_features=output_size, hidden_size=hidden_size, resN=num_blocks)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Instantiate the trainer
device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.mps.is_available() else 'cpu')
trainer = Trainer(model, train_loader, test_loader, criterion, optimizer, device)

# Train the model
trainer.train(num_epochs=5)

# Evaluate the model
trainer.evaluate()

# Save the model checkpoint
trainer.save_checkpoint('residual_mlp_mnist.pth')

# Load the model checkpoint
trainer.load_checkpoint('residual_mlp_mnist.pth')

Epoch [1/5], Loss: 0.4729, Accuracy: 87.41%
Epoch [2/5], Loss: 0.2120, Accuracy: 93.65%
Epoch [3/5], Loss: 0.1823, Accuracy: 94.57%
Epoch [4/5], Loss: 0.1839, Accuracy: 94.54%
Epoch [5/5], Loss: 0.1834, Accuracy: 94.67%
Test Loss: 0.1615, Test Accuracy: 95.31%
Model checkpoint saved at residual_mlp_mnist.pth
Model checkpoint loaded from residual_mlp_mnist.pth


  self.model.load_state_dict(torch.load(path))


In [113]:
class Evaluator:
    def __init__(self, model, test_loader, criterion, device='cpu'):
        self.model = model.to(device)
        self.test_loader = test_loader
        self.criterion = criterion
        self.device = device
    
    def evaluate(self):
        """
        Evaluates the model on the test/validation set, computing the accuracy and loss.
        """
        self.model.eval()  # Set the model to evaluation mode
        correct = 0
        total = 0
        test_loss = 0.0
        
        with torch.no_grad():  # Disable gradient calculation
            for data, targets in self.test_loader:
                # Move data and targets to the device (CPU or GPU)
                data, targets = data.to(self.device), targets.to(self.device)
                data = data.view(-1, 28*28).float()  # Flatten input for MLP
                
                # Forward pass
                outputs = self.model(data)
                loss = self.criterion(outputs, targets)
                test_loss += loss.item()
                
                # Get the predicted class
                _, predicted = torch.max(outputs.data, 1)
                total += targets.size(0)
                correct += (predicted == targets).sum().item()
        
        # Calculate average loss and accuracy
        accuracy = 100 * correct / total
        avg_loss = test_loss / len(self.test_loader)
        print(f'Test Loss: {avg_loss:.4f}, Test Accuracy: {accuracy:.2f}%')
        return avg_loss, accuracy

    def predict(self, data):
        """
        Generates predictions for a given batch of data.
        """
        self.model.eval()  # Set the model to evaluation mode
        data = data.to(self.device)
        data = data.view(-1, 28*28).float()  # Flatten input if necessary
        
        with torch.no_grad():  # Disable gradient calculation
            outputs = self.model(data)
            _, predicted = torch.max(outputs.data, 1)
        
        return predicted

In [114]:
# Define the model (same as the model used during training)
input_size = 28 * 28
hidden_size = 128
output_size = 10
num_blocks = 3

model = ResidualMLP(in_features=input_size, out_features=output_size, hidden_size=hidden_size, resN=num_blocks)

# Load the trained model (if available)
model.load_state_dict(torch.load('residual_mlp_mnist.pth'))

# Define the loss function (same as during training)
criterion = nn.CrossEntropyLoss()

# Instantiate the evaluator
device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.mps.is_available() else 'cpu')
evaluator = Evaluator(model, test_loader, criterion, device)

# Evaluate the model
_, _ = evaluator.evaluate()

# Predict a batch of data from the test set
test_batch, _ = next(iter(test_loader))  # Get one batch of test data
predictions = evaluator.predict(test_batch)
print(f'Predictions for a batch: {predictions[:10]}')

  model.load_state_dict(torch.load('residual_mlp_mnist.pth'))


Test Loss: 0.1615, Test Accuracy: 95.31%
Predictions for a batch: tensor([7, 2, 1, 0, 4, 1, 4, 9, 5, 9], device='mps:0')
