In [443]:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using {device}.')

Using cuda.


In [444]:
%matplotlib inline
import matplotlib.pyplot as plt
import torch
import torchvision
import torch.nn as nn
import time

In [445]:
# You don't need to understand this function for now.
def load_data_CIFAR10(batch_size, resize=None):
    """Download the CIFAR10 dataset and then load it into memory."""
    trans = [torchvision.transforms.ToTensor()]
    if resize:
        trans.insert(0, torchvision.transforms.Resize(resize))
    trans = torchvision.transforms.Compose(trans)
    mnist_train = torchvision.datasets.CIFAR10(
        root="../data", train=True, transform=trans, download=True)
    mnist_test = torchvision.datasets.CIFAR10(
        root="../data", train=False, transform=trans, download=True)
    return (torch.utils.data.DataLoader(mnist_train, batch_size, shuffle=True,
                            num_workers=2),
            torch.utils.data.DataLoader(mnist_test, batch_size, shuffle=False,
                            num_workers=2))

In [446]:
batch_size = 64 # Defines the batch size
train_iter, test_iter = load_data_CIFAR10(batch_size)

Files already downloaded and verified
Files already downloaded and verified


In [447]:
X, y = next(iter(train_iter)) # Requests the first training batch
print(X.size()) # 256 images per batch. Each image is represented by a 1 x 28 x 28 tensor (number of channels x height x width). The images are grayscale, so there is a single channel.
print(y.size())

torch.Size([64, 3, 32, 32])
torch.Size([64])


## Intermediate Block Generator

for creating each intermediate block a class has been definied. This class get the number of convolutional layers, and its layers and parameters.

In [500]:
class BlockGen(nn.Module):
    # Creating Intermediate Blocks
    # the length of param indicates the number of intermediate blocks
    # each parameter is then inside the param for convolutional layers
    def __init__(self, in_channels, out_channels, c, layers, paddings, strides, kernels):
        super(BlockGen, self).__init__()
        self.relu = nn.ReLU()
        master_model = nn.ModuleList([])
        self.c = c
        self.layers = layers
        for i in range(c):
            convs = nn.ModuleList()
            for j in range(layers[i]):
                convs.append(nn.Conv2d(in_channels[i][j], out_channels[i][j], kernel_size=kernels[i][j],stride= strides[i][j], padding= paddings[i][j]))
            master_model.append(convs)
        self.model = master_model
        # the fully-connected layer turning m to a to calculate the weights of convolutional layer in the final equation the output should be the same value as the number of concolutional layers
        self.fc = nn.Linear(in_channels[0][0], c)
        self.batchnorm2d = nn.BatchNorm2d(out_channels[i][j])
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=1)
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x):
        m = []
        for i in range(x.size()[1]):
            m.append((float(x[i].flatten().mean())))
        m = torch.Tensor(m).to(device)
        a = self.fc(m)

        x = self.relu(x)
        model = self.model
        out_list = []
        for i, mod in enumerate(model):
            out_list.append(0)
            # mod = model[i]
            out = x
            j=0
            for model in mod:
                out = model(out)
            
            out = self.maxpool(out)
            out_list[i] = out*a[i]
        
        out_list = torch.tensor(sum(out_list))    
        return self.dropout(self.relu(self.batchnorm2d(out_list)))
        

In [None]:
class FinalModel(nn.Module):
    # Creating Intermediate Blocks
    # the length of param indicates the number of intermediate blocks
    # each parameter is then inside the param for convolutional layers
    def __init__(self, num_block, c, layers, in_channels, out_channels, kernels, paddings, strides):
        super(FinalModel, self).__init__()
        blocks = nn.ModuleList([])
        for i in range(num_block):
            blocks.append(BlockGen(c=c[i], layers=layers[i], in_channels=in_channels[i], out_channels=out_channels[i], kernels=kernels[i], paddings=paddings[i], strides=strides[i]))
            # print('block done')
        self.blocks = blocks
        self.fc = nn.Linear(out_channels[-1][-1][-1], 10)
        
    def forward(self, x):
        # creating the convolutional neurons in the layer of the block
        out_list = nn.ModuleList([])
        for model in self.blocks:
            x = model(x)
            
        channel_avg = torch.mean(torch.flatten(x, 2), dim=2)
        out = self.fc(channel_avg)
 
        return out

Defining parameters

In [502]:
num_blocks = 2
c = [2, 2]
layers = [[3, 2], [2, 2]]
kernels = [[[6,5,3],[4,4]],[[4,6],[3,5]]]
paddings = [[[2,0,0],[0,0]],[[2,0],[1,0]]]
strides = [[[1,2,1],[2,1]],[[1,1],[1,1]]]
in_channels = [[[3,6,12],[3,6]],[[12,18],[12,24]]]
out_channels = [[[6,12,12],[6,12]],[[18,24],[24,24]]]

In [503]:
# Applies Xavier initialization if the `torch.nn.Module` is `torch.nn.Linear` or `torch.nn.Conv2d`
def init_weights(m):
    if type(m) == torch.nn.Linear or type(m) == torch.nn.Conv2d:
        torch.nn.init.xavier_uniform_(m.weight)

# num_outputs = 10
model = FinalModel(num_block=num_blocks, c = c, layers = layers, in_channels=in_channels, out_channels=out_channels, kernels= kernels, paddings = paddings, strides = strides).to(device)
model.apply(init_weights) # Applies `init_weights` to every `torch.nn.Module` inside `model`

FinalModel(
  (blocks): ModuleList(
    (0): BlockGen(
      (relu): ReLU()
      (model): ModuleList(
        (0): ModuleList(
          (0): Conv2d(3, 6, kernel_size=(6, 6), stride=(1, 1), padding=(2, 2))
          (1): Conv2d(6, 12, kernel_size=(5, 5), stride=(2, 2))
          (2): Conv2d(12, 12, kernel_size=(3, 3), stride=(1, 1))
        )
        (1): ModuleList(
          (0): Conv2d(3, 6, kernel_size=(4, 4), stride=(2, 2))
          (1): Conv2d(6, 12, kernel_size=(4, 4), stride=(1, 1))
        )
      )
      (fc): Linear(in_features=3, out_features=2, bias=True)
    )
    (1): BlockGen(
      (relu): ReLU()
      (model): ModuleList(
        (0): ModuleList(
          (0): Conv2d(12, 18, kernel_size=(4, 4), stride=(1, 1), padding=(2, 2))
          (1): Conv2d(18, 24, kernel_size=(6, 6), stride=(1, 1))
        )
        (1): ModuleList(
          (0): Conv2d(12, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): Conv2d(24, 24, kernel_size=(5, 5), stride=(1, 1)

In [504]:
loss = torch.nn.CrossEntropyLoss()

In [505]:
lr = 0.9
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [511]:
def correct(logits, y):
    y_hat = logits.argmax(axis=1) # Finds the column with the highest value for each row of `logits`.
    return (y_hat == y).float().sum() # Computes the number of times that `y_hat` and `y` match.

In [515]:
def evaluate_metric(model, data_iter, metric):
    """Compute the average `metric` of the model on a dataset."""
    c = torch.tensor(0.).to(device)
    n = torch.tensor(0.).to(device)
    j = 0
    for X, y in data_iter:
        X, y = X.to(device), y.to(device) # Moves data to `device`
        logits = model(X)
        c += metric(logits, y)
        n += len(y)
    return c / n

In [516]:
model.eval()
print(f'Training accuracy: {evaluate_metric(model, train_iter, correct)}. Testing accuracy: {evaluate_metric(model, test_iter, correct)}.')

Training accuracy: 0.09997999668121338. Testing accuracy: 0.0997999981045723.


## Training

In [518]:
losses = [] # Stores the loss for each training batch
train_accs = [] # Stores the training accuracy after each epoch
test_accs = [] # Stores the testing accuracy after each epoch

num_epochs = 10
for epoch in range(num_epochs):
    print(f'\nEpoch {epoch + 1}/{num_epochs}.')
    start_time = time.perf_counter()

    model.train() # This is necessary because batch normalization behaves differently between training and evaluation

    for X, y in train_iter:
        X, y = X.to(device), y.to(device) # Moves data to `device`
        logits = model(X) # Computes the logits for the batch of images `X`
        l = loss(logits, y) # Computes the loss given the `logits` and the class vector `y`
        optimizer.zero_grad() # Zeroes the gradients stored in the model parameters
        l.backward() # Computes the gradient of the loss `l` with respect to the model parameters

        optimizer.step() # Updates the model parameters based on the gradients stored inside them

        losses.append(float(l)) # Stores the loss for this batch

    with torch.no_grad(): # Computing performance metrics does not require gradients
        model.eval() # This is necessary because batch normalization behaves differently between training and evaluation
        train_accs.append(evaluate_metric(model, train_iter, correct))
        test_accs.append(evaluate_metric(model, test_iter, correct))

        end_time = time.perf_counter()

        print(f'Training accuracy: {train_accs[-1]}. Testing accuracy: {test_accs[-1]}. Duration: {end_time - start_time:.3f}s.') # Computes and displays training/testing dataset accuracy.

plt.plot(losses) # Plots the loss for each training batch
plt.xlabel('Training batch')
plt.ylabel('Cross entropy loss')
plt.show()

plt.plot(train_accs, label='Training accuracy')
plt.plot(test_accs, label='Testing accuracy')
plt.legend(loc='best')
plt.xlabel('Epoch')
plt.show()


Epoch 1/10.


KeyboardInterrupt: 

In [None]:
m = next(iter(train_iter))
len(m)
print(f'Training accuracy: {evaluate_metric(model, m, correct)}.')
# print(f'Training accuracy: {evaluate_metric(model, m, correct)}. Testing accuracy: {evaluate_metric(model, test_iter, correct)}.')

In [497]:
model(X).size()

torch.Size([16, 10])

In [498]:
n = model(X)

In [412]:
model(X)

tensor([-0.0517, -0.2033,  0.0251,  0.1640, -0.0859, -0.1489, -0.1994,  0.1988,
        -0.1764,  0.1416], device='cuda:0', grad_fn=<ViewBackward0>)

In [None]:
model(n)

In [193]:
X.size()

torch.Size([16, 3, 32, 32])

In [215]:
n.argmax()

tensor(4)

In [462]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm import tqdm
loss_fn = torch.nn.CrossEntropyLoss()
# Define the Intermediate Block
class IntermediateBlock(nn.Module):
    def __init__(self, in_channels, out_channels, num_layers):
        super(IntermediateBlock, self).__init__()
        self.conv_layers = nn.ModuleList([nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1) for _ in range(num_layers)])
        self.fc = nn.Linear(in_channels * out_channels, num_layers)

    def forward(self, x):
        channel_avg = torch.mean(x, dim=1, keepdim=True)  # Average over channels
        weights = self.fc(channel_avg).view(-1, num_layers, 1, 1)  # Reshape weights
        outputs = [layer(x) for layer in self.conv_layers]
        return torch.sum(weights * torch.cat(outputs, dim=1), dim=1)

# Define the Output Block
class OutputBlock(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(OutputBlock, self).__init__()
        self.fc1 = nn.Linear(in_channels, 64)  # Optional fully-connected layer
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        channel_avg = torch.mean(x, dim=1, keepdim=True)
        x = self.fc1(channel_avg) if self.fc1 else x
        return self.fc2(x)

# Define the Overall Network Architecture
class CIFAR10Classifier(nn.Module):
    def __init__(self, in_channels=3, num_classes=10, num_blocks=2, block_channels=32, num_layers_per_block=2):
        super(CIFAR10Classifier, self).__init__()
        self.blocks = nn.ModuleList([IntermediateBlock(in_channels, block_channels, num_layers_per_block) for _ in range(num_blocks)])
        self.output_block = OutputBlock(block_channels * num_layers_per_block, num_classes)

    def forward(self, x):
        for block in self.blocks:
            x = block(x)
        return self.output_block(x)

# Helper functions for data loading and training
def load_data(batch_size=64, use_gpu=False):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.2154, 0.2024))  # Normalize for CIFAR-10
    ])

    train_dataset = datasets.CIFAR10('./data', train=True, download=True, transform=transform)
    test_dataset = datasets.CIFAR10('./data', train=False, download=True, transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    if use_gpu and torch.cuda.is_available():
        train_loader.dataset.tensors = train_loader.dataset.tensors.cuda()
        test_loader.dataset.tensors = test_loader.dataset.tensors.cuda()

    return train_loader, test_loader
def train(model, optimizer, loss_fn, train_loader, device):
  """
  Function to train the model on a single epoch.

  Args:
      model: The neural network model to be trained.
      optimizer: The optimizer used for updating model parameters.
      loss_fn: The loss function used for calculating loss.
      train_loader: The data loader for the training set.
      device: The device (CPU or GPU) to use for training.
  """
  model.train()  # Set the model to training mode (affects dropout layers etc.)
  
  running_loss = 0.0
  for images, labels in train_loader:
    # Move data to the device
    images, labels = images.to(device), labels.to(device)
    
    # Clear gradients from previous iteration
    optimizer.zero_grad()

    # Forward pass
    logits = model(images)
    
    # Calculate loss
    loss = loss_fn(logits, labels)
    
    # Backward pass
    loss.backward()
    
    # Update model parameters
    optimizer.step()
    
    # Update running loss
    running_loss += loss.item()
  
  # Calculate average epoch loss
  epoch_loss = running_loss / len(train_loader)
  
  # Print or log training statistics (optional)
  print(f"Epoch {epoch + 1} - Training Loss: {epoch_loss:.4f}")



def train_model(model, train_loader, test_loader, learning_rate=0.001, num_epochs=10, use_gpu=False):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    if use_gpu and torch.cuda.is_available():
        model = model.cuda()
        criterion = criterion.cuda()

    for epoch in range(num_epochs):
        print(f"Epoch {epoch}")
def test(model, loss_fn, test_loader, device):
  """
  Function to evaluate the model on the testing set.

  Args:
      model: The trained neural network model.
      loss_fn: The loss function used for calculating loss.
      test_loader: The data loader for the testing set.
      device: The device (CPU or GPU) to use for evaluation.
  """
  model.eval()  # Set the model to evaluation mode (affects dropout layers etc.)
  
  with torch.no_grad():
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in test_loader:
      # Move data to the device
      images, labels = images.to(device), labels.to(device)

      # Forward pass
      logits = model(images)

      # Calculate loss
      loss = loss_fn(logits, labels)
      running_loss += loss.item()

      # Get predictions and update accuracy metrics
      _, predicted = torch.max(logits.data, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()

  # Calculate average test loss and accuracy
  test_loss = running_loss / len(test_loader)
  accuracy = correct / total

  # Print or log evaluation statistics (optional)
  print(f"Test Loss: {test_loss:.4f} - Accuracy: {accuracy:.4f}")
for epoch in range(num_epochs):
    # Train on batches
    train_model(model, train_loader, test_loader , learning_rate=0.01)

    # Evaluate on test set (optional)
    test(model, loss_fn, test_loader, device)

# Save the trained model (optional)
torch.save(model.state_dict(), "cifar10_model.pt")


Epoch 0
Epoch 1
Epoch 2
Epoch 3
Epoch 4
Epoch 5
Epoch 6
Epoch 7
Epoch 8
Epoch 9


RuntimeError: size mismatch (got input: [10], target: [64])

In [324]:
model = CIFAR10Classifier().to(device) # Creates and moves the model to `device`
model.apply(init_weights)
train_loader, test_loader = load_data()
train_model(model, train_loader, test_loader, use_gpu = True)

Files already downloaded and verified
Files already downloaded and verified
Epoch 0
Epoch 1
Epoch 2
Epoch 3
Epoch 4
Epoch 5
Epoch 6
Epoch 7
Epoch 8
Epoch 9
