In [58]:
! pip3 install torch torchvision



In [59]:
from torchvision import datasets, transforms 
from torchvision.models import resnet18, ResNet18_Weights
from torchvision.models import resnet50, ResNet50_Weights
from tempfile import TemporaryDirectory
from matplotlib import pyplot as plt
import torch.nn as nn
import torch.utils.data
import numpy as np
import os, time
from torchvision.datasets import OxfordIIITPet
from torchvision.models import efficientnet_b4, EfficientNet_B4_Weights
from torchvision.models import efficientnet_v2_m, EfficientNet_V2_M_Weights

In [60]:
def show(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)), interpolation='nearest')
    plt.show()

In [61]:
def test_model(model, test_data, weights_path, device_type='cpu'):
    if torch.cuda.is_available():
        device_type = 'cuda:0'
    elif torch.backends.mps.is_available():
        device_type = 'mps'

    device = torch.device(device_type)
    model.load_state_dict(torch.load(weights_path))
    model = model.to(device)

    start = time.time()
    with torch.no_grad():
        model.eval()
        correct = 0
        total = 0
        for images, labels in test_data:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)

            predicted = torch.argmax(outputs, 1)

            total += labels.size(0)
            correct += torch.sum(predicted == labels.data)
        print('Time taken:', time.time() - start)
        print(f'Accuracy of the network on the test images: {100 * correct / total}%')

In [62]:
def train_model(model, dataloaders, dataset_sizes, suffix, scheduler=0, num_epochs=25, device_type='cpu'):
    if torch.cuda.is_available():
        device_type = 'cuda:0'
    elif torch.backends.mps.is_available():
        device_type = 'mps'

    device = torch.device(device_type)
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.001)

    since = time.time()

    # Create a directory to save training checkpoints
    with TemporaryDirectory() as tempdir:
        best_model_params_path = os.path.join("./", suffix + '.pt')

        torch.save(model.state_dict(), best_model_params_path)
        best_acc = 0.0

        start = time.time()

        for epoch in range(num_epochs):
            print(f'Epoch {epoch}/{num_epochs - 1}')
            print('-' * 10)
            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                model.train() if phase == 'train' else model.eval()

                running_loss = 0.0
                running_corrects = 0

                print('phase:', phase)
                
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs) # forward pass

                        # outputs is probability of each class
                        # labels needs to be probability of each class

                        # we need the one-hot for the labels to get the loss
                        true_outputs = torch.nn.functional.one_hot(labels, num_classes=37).float()

                        # create preds by thresholding outputs
                        loss = criterion(outputs, true_outputs)

                        preds = torch.argmax(outputs, 1)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects / dataset_sizes[phase]
                
                outputstr = f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}'
                print(outputstr)
                # f.write(outputstr + '\n')

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), best_model_params_path)
            print('epoch took:', time.time() - start)
            start = time.time()

        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
        print(f'Best val Acc: {best_acc:4f}')

        # load best model weights
        model.load_state_dict(torch.load(best_model_params_path))
    return best_model_params_path, best_acc

In [63]:
def generate_datasets(
        # 70% train, 20% validation, 10% test
        conf, train_split=0.7, val_split=0.2
    ):

    base_transforms = [
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]

    training_transforms = []

    crop = conf.get('crop', False)
    random_flip = conf.get('random_flip', False)
    rotate_angle = conf.get('rotate_angle', 0)
    scale = conf.get('scale', 1)

    if crop:
        base_transforms.append(transforms.Resize(256))
        base_transforms.append(transforms.CenterCrop(224))
    else:
        base_transforms.append(transforms.Resize(224))
    

    if random_flip:
        training_transforms.append(transforms.RandomHorizontalFlip())
    if rotate_angle != 0:
        training_transforms.append(transforms.RandomRotation(rotate_angle))

    if scale != 1:
        training_transforms.append(transforms.Resize(int(224 * scale)))

    dataset=OxfordIIITPet(root="./", download=True, target_types='category', transform=transforms.Compose(base_transforms))

    # Split the data into training, validation, and test sets
    train_dataset = dataset
    train_size = int(train_split * len(train_dataset))
    val_size = int(val_split * len(train_dataset))
    test_size = len(train_dataset) - train_size - val_size
    train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(train_dataset, [train_size, val_size, test_size], generator=torch.Generator().manual_seed(42))
    # Transform the training dataset
    train_dataset.dataset.transform = transforms.Compose(base_transforms + training_transforms)

    dataloaders = {
        'train': torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True),
        'test': torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True),
        'val': torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=True),
    }

    dataset_sizes = {
        'train': len(train_dataset),
        'test': len(test_dataset),
        'val': len(val_dataset),
    }

    return dataloaders, dataset_sizes

In [64]:
def multiclass_resnet(resnet_size = "18"):
    if resnet_size == "18":
        resnet = resnet18(weights=ResNet18_Weights.DEFAULT)
    elif resnet_size == "50":
        resnet = resnet50(weights=ResNet50_Weights.DEFAULT)
    
    for param in resnet.parameters():
        param.requires_grad = False
    
    print(resnet.fc)

    #resnet fc input size
    input_size = resnet.fc.in_features

    resnet.fc = nn.Sequential(
        nn.Flatten(),
        nn.Linear(input_size, 256),
        nn.ReLU(),
        # nn.Dropout(0.2),
        nn.Linear(256, 37),
        nn.Softmax()
    )

    return resnet

def multiclass_efficientnet(efficientnet_size = "b4"):
    if efficientnet_size == "b4":
        efficientnet = efficientnet_b4(weights=EfficientNetB4_Weights.DEFAULT)
    elif efficientnet_size == "v2_m":
        efficientnet = efficientnet_v2_m(weights=EfficientNetV2M_Weights.DEFAULT)
    
    for param in efficientnet.parameters():
        param.requires_grad = False
    
    print(efficientnet.classifier)

    #efficientnet fc input size
    input_size = efficientnet.classifier.in_features

    efficientnet.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(input_size, 256),
        nn.ReLU(),
        # nn.Dropout(0.2),
        nn.Linear(256, 37),
        nn.Softmax()
    )

    return efficientnet

In [65]:

def create_suffix(
        conf,
    ):
    model_name = conf.get('model_name', 'resnet50')
    crop = conf.get('crop', True)
    random_flip = conf.get('random_flip', False)
    rotate_angle = conf.get('rotate_angle', 0)
    scale = conf.get('scale', 1)
    device_type = 'cpu'
    if torch.cuda.is_available():
        device_type = 'cuda:0'
    elif torch.backends.mps.is_available():
        device_type = 'mps'
    suffix = model_name
    suffix += device_type
    if crop:
        suffix += "crop"
    if random_flip:
        suffix += "flip"
    if rotate_angle != 0:
        suffix += "rotate" + str(rotate_angle)
    if scale != 1:
        suffix += "scale" + str(scale)
    return suffix

In [66]:
model_name = 'resnet50'
random_flip = False
rotate_angle = 0
crop = True
scale = 1

base_conf = {
    'model_name': model_name,
    'crop': crop,
    'random_flip': random_flip,
    'rotate_angle': rotate_angle,
    'scale': scale
}


In [67]:
#dataloaders, dataset_sizes = generate_datasets(conf)

In [68]:
#resnet = multiclass_resnet(conf)

In [69]:
#suffix = create_suffix(conf)
#suffix

In [70]:
#train_model(resnet, dataloaders, dataset_sizes, suffix, num_epochs=2)

In [71]:
def get_model(model_name):
    if model_name == 'resnet18':
        return multiclass_resnet("18")
    elif model_name == 'resnet50':
        return multiclass_resnet("50")
    elif model_name == 'efficientnet_b4':
        return multiclass_efficientnet(efficientnet_size="b4")
    elif model_name == 'efficientnet_v2_m':
        return multiclass_efficientnet(efficientnet_size="v2_m")
    else:
        return None

In [72]:
def generate_confs():
    models = ['resnet18', 'resnet50', 'efficientnet', 'efficientnet-v2']
    crops = [True, False]
    random_flips = [True, False]
    rotate_angles = [0, 90]
    scales = [1, 1.5]

    # Generate confs 
    confs = [
        {**base_conf, 'model_name': model}
        for model in models
    ]

    return confs
    
confs = generate_confs()

weights_and_accs = []
for conf in confs:
    dataloaders, dataset_sizes = generate_datasets(conf)
    model = get_model(conf['model_name'])
    suffix = create_suffix(conf)
    weights_path, best_acc = train_model(model, dataloaders, dataset_sizes, suffix, num_epochs=5)
    weights_and_accs.append((weights_path, best_acc))

print(weights_and_accs)


Linear(in_features=512, out_features=1000, bias=True)
Epoch 0/4
----------
phase: train


  return self._call_impl(*args, **kwargs)
