In [1]:
import torch
import torchvision
import torch.optim as optim
from torch.optim import lr_scheduler
from tqdm import tqdm
import torchvision.transforms as transforms
from helpers import *
from networks import *

import yaml


In [2]:
def build_network(dataset_name , network_type, num_classes,num_channels,image_size, device):
    network = None
    match network_type:
        case 'AlexNet':
            network = AlexNet(device, dataset_name,num_channels,num_classes)
        case 'LeNet5':
            network = LeNet5(device, num_channels)
        case 'ResNet50':
            network = ResNet50(device, num_channels,image_size, num_classes)
        case 'VGGNet':
            network = VGGNet(device, num_channels,num_classes,num_layers= 16)
        case 'LeNetPlusPlus':
            network = LeNetPlusPlus(device, num_channels,num_classes)
        case 'MiniVGG':
            network = MiniVGG(device, num_channels,num_classes)
        case _ :
            raise ValueError(
                "Unsupported network: {}. Only AlexNet, LeNet5, ResNet50, VGGNet, LeNetPlusPlus or MiniVGG are supported.".format(
                    network_type
                )
            )
            
    return network.to(device)


In [3]:
def build_optimizer(optimizer_type,network):
    optimizer = None

    with open('config.yaml') as file:
        hyperparameters = yaml.safe_load(file)

    match optimizer_type:
        case 'SGD':
            optimizer = optim.SGD(network.parameters(), lr = hyperparameters[optimizer_type]['learning_rate'])
        case 'SGD_Momentum':
            optimizer = optim.SGD(network.parameters(), lr = hyperparameters[optimizer_type]['learning_rate'],momentum = hyperparameters[optimizer_type]['momentum'])
        case 'Adam':
            optimizer = optim.Adam(network.parameters(),lr = hyperparameters[optimizer_type]['learning_rate'], betas = hyperparameters[optimizer_type]['betas'])
        case 'NAdam':
            optimizer = optim.NAdam(network.parameters(),lr = hyperparameters[optimizer_type]['learning_rate'], betas = hyperparameters[optimizer_type]['betas'])
        case 'AdaGrad':
            optimizer = optim.Adagrad(network.parameters(),lr = hyperparameters[optimizer_type]['learning_rate'])
        case 'AdaDelta':
            optimizer = optim.Adadelta(network.parameters(),lr = hyperparameters[optimizer_type]['learning_rate'], decay = hyperparameters[optimizer_type]['decay'])
        case 'AdaMax':
            optimizer = optim.Adamax(network.parameters(),lr = hyperparameters[optimizer_type]['learning_rate'], betas = hyperparameters[optimizer_type]['betas'])
        case 'RMSProp':
            optimizer = optim.RMSprop(network.parameters(),lr = hyperparameters[optimizer_type]['learning_rate'], alpha = hyperparameters[optimizer_type]['alpha'])
        case _ :
            raise ValueError(
                "Unsupported optimizer: {}. Only SGD, SGD_Momentum, Adam, NAdam, AdaGrad, AdaDelta,AdaMax, RMSProp are supported.".format(
                    optimizer_type
                )
            )
    return optimizer
        

    

In [4]:
def train_model(train_loader,network ,optimizer, device,  scheduler = None, criterion = nn.CrossEntropyLoss(), max_iter= 1000, model_name = ''):
    losses = []
    for epoch in tqdm(range(max_iter)):
        running_loss = 0.0
        for _,data in enumerate(train_loader):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = network(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()

        if scheduler is not None:
            scheduler.step()  # Update the learning rate scheduler

        losses.append(running_loss)
        running_loss = 0.0
    PATH = os.path.join('models','{}_network.pth'.format(model_name))
    torch.save(network.state_dict(), PATH)
    return losses


In [5]:
def test_model(test_loader,network, device):
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for _,data in enumerate(test_loader):
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)

            outputs = network(images)
            outputs = outputs.cpu()
            _, predictions = torch.max(outputs, 1)
            # collect the correct predictions for each class
            for label, prediction in zip(labels.cpu(), predictions):
                if label == prediction:
                    correct_predictions += 1
                total_predictions += 1
    accuracy = correct_predictions/total_predictions
    return accuracy

In [6]:
def train_test_model(dataset_name, network_type, optimizer_type, lr_scheduler = None,
                      max_iter= 1000,batch_size = 10, num_workers = 4):
    # load  dataset
    train_loader,test_loader,num_classes,num_channels,image_size, device = load_dataset(dataset_name=dataset_name, batch_size = batch_size, 
                                                        num_workers= num_workers)
    network = build_network(dataset_name, network_type, num_classes,num_channels,image_size, device)
    optimizer = build_optimizer(optimizer_type,network)
    losses = train_model(train_loader, network, optimizer, device,
                         lr_scheduler ,max_iter = max_iter ,model_name=dataset_name)
    accuracy = test_model(test_loader,network, device)
    return losses, accuracy

In [7]:
losses,accuracy = train_test_model('CIFAR10','LeNet5','SGD',max_iter=5)

Files already downloaded and verified
Files already downloaded and verified


100%|██████████| 5/5 [05:31<00:00, 66.29s/it]
