In [1]:
import torch
import numpy as np
import pandas as pd
import tensorflow as tf
from PIL import Image
import torchvision.models as models
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import time
import os
import copy
import torch.optim as optim
import json

# load pretrained network

In [78]:
resnet = models.resnet50(weights="IMAGENET1K_V2")
selected_models = {'resnet': resnet}

modify the last layer of those networks to a fully connected layer with only 5 outputs ( for our 5 classes)

In [79]:
num_cl = 1

# Only 5 clases
if num_cl == 0:
    class_names = {
        0: 'basketball',
        1: 'gymnastics',
        2: 'swimming',
        3: 'weight_lifting',
        4: 'wwe'
    }

# Only 10 classes
if num_cl:
    class_names = {
        0: 'badminton',
        1: 'basketball',
        2: 'boxing',
        3: 'football',
        4: 'gymnastics',
        5: 'hockey',
        6: 'swimming',
        7: 'volleyball',
        8: 'weight_lifting',
        9: 'wwe'
    }

In [80]:
print(f"before modification : {selected_models['resnet'].fc}")
for key in selected_models.keys():
    model = selected_models[key]
    # set the learning of all the previous layers to false 
    # (already extracting the image features from their previous training)
    for param in selected_models[key].parameters():
        param.requires_grad = False
    
    # get last_layer size to fully connect our last decision layer
    last_layer = [key for key in model.state_dict().keys()][-2]
    last_layer_size = model.state_dict()[last_layer].shape[-1]
    try:
        if num_cl == 0:
            selected_models[key].classifier = nn.Sequential(*list(model.classifier)[:-1], nn.Linear(last_layer_size, 5))
        if num_cl: 
            selected_models[key].classifier = nn.Sequential(*list(model.classifier)[:-1], nn.Linear(last_layer_size, 10))
    except Exception:
        if num_cl == 0:
            selected_models[key].fc = nn.Linear(last_layer_size, 5)
        if num_cl:
            selected_models[key].fc = nn.Linear(last_layer_size, 10)

print(f"after modification : {selected_models['resnet'].fc}")

before modification : Linear(in_features=2048, out_features=1000, bias=True)
after modification : Linear(in_features=2048, out_features=10, bias=True)


transform images into tensor to input them into the convolutional networks

In [81]:
preprocess_image = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
    mean = [0.485, 0.456, 0.406],
    std = [0.229, 0.224, 0.225])
])

transform every image to make a dataset of tensor

In [82]:
df = pd.read_csv("./data/data.csv") # path to images
y = torch.as_tensor(df['label'].values)
image_paths = df['path']
images = []

for path in image_paths:
    try:
        # Mac Issues <3
        path = path.replace('\\',"/")

        for i in range(len(class_names)):
            if path.__contains__(class_names[i]):
                img = Image.open(path)
                images.append(preprocess_image(img.convert('RGB')))

    except Exception as e:
        pass

X = torch.stack(images)
print(X.shape)

torch.Size([6854, 3, 224, 224])


prepare the batch function to separate training data into batch of a certain size

In [83]:
def get_batch(X, y, batch_size, iteration):
    start = batch_size * iteration
    end = (batch_size) * (iteration + 1)
    if end > X.shape[0]:
        end = X.shape[0]
    return X[start:end, :, :, :], y[start:end]

batch_x, batch_y = get_batch(X, y, 50, 0)
print(batch_x.shape, batch_y.shape)

torch.Size([50, 3, 224, 224]) torch.Size([50])


In [84]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Split datasets into training and testing set

In [85]:
from sklearn.model_selection import train_test_split

def train_val_dataset(X, y, testing_size=0.25):
    train_idx, test_idx = train_test_split(list(range(X.shape[0])), test_size=testing_size)
    datasets = {}
    datasets['X_train'] = X[train_idx,:,:,:]
    datasets['X_test'] = X[test_idx,:,:,:]
    datasets['y_train'] = y[train_idx]
    datasets['y_test'] = y[test_idx]

    return datasets

datasets = train_val_dataset(X, y, 0.25)
print(datasets['X_train'].shape)

torch.Size([5140, 3, 224, 224])


In [86]:
def train_model(datasets, model, criterion, optimizer, batch_size = 100, num_epochs=5):
    since = time.time()
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    accuracies = {}

    X_train, y_train, X_test, y_test = datasets['X_train'], datasets['y_train'], datasets['X_test'], datasets['y_test']

    batch_count = (X_train.shape[0] // batch_size) + 1
    batch_test_count = (X_test.shape[0] // batch_size) + 1

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        # training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        # Iterate over data.
        for num_batch in range(batch_count):
            if (num_batch % 15 == 0):
                print(f'batch train : {num_batch}/{batch_count}')
            inputs, labels = get_batch(X_train, y_train, batch_size, num_batch)
            inputs = inputs.to(device)
            labels = labels.to(device)
            # zero the parameter gradients
            optimizer.zero_grad()
            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels.long())
                # backward + optimize
                loss.backward()
                optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels)
        
        epoch_train_loss = running_loss / datasets[f'X_train'].shape[0]
        epoch_train_acc = running_corrects / datasets[f'X_train'].shape[0]

        # testing set
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        for num_batch in range(batch_test_count):
            if (num_batch % 15 == 0):
                print(f'batch test : {num_batch}/{batch_test_count}')
            inputs, labels = get_batch(X_test, y_test, batch_size, num_batch)
            inputs = inputs.to(device)
            labels = labels.to(device)
            with torch.no_grad():
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels.long())
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels)

        epoch_test_loss = running_loss / datasets[f'X_test'].shape[0]
        epoch_test_acc = running_corrects / datasets[f'X_test'].shape[0]

        accuracies[epoch] = {'train accuracy' : epoch_train_acc.item(), 
                            'test accuracy' : epoch_test_acc.item(),
                            'train loss': epoch_train_loss,
                            'test loss': epoch_test_loss}

        # deep copy the model if better
        if epoch_test_acc > best_acc:
            best_acc = epoch_test_acc
            best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, accuracies

Train all the models with 5 epochs 

In [87]:
criterion = nn.CrossEntropyLoss()
model_accuracies = {}

for model_name, model in selected_models.items():
    print(model_name)
    optimizer_ft = optim.Adam(model.parameters(), lr=0.01)
    model = model.to(device)
    model_opti, model_accuracy = train_model(datasets, model, criterion, optimizer_ft, batch_size=50, num_epochs=5)
    model_scripted = torch.jit.script(model_opti) # Export to TorchScript
    model_accuracies[model_name] = model_accuracy
    model_scripted.save(f'./models/{model_name}.pt') # Save

resnet
Epoch 0/4
batch train : 0/103
batch train : 15/103
batch train : 30/103
batch train : 45/103
batch train : 60/103
batch train : 75/103
batch train : 90/103
batch test : 0/35
batch test : 15/35
batch test : 30/35
Epoch 1/4
batch train : 0/103
batch train : 15/103
batch train : 30/103
batch train : 45/103
batch train : 60/103
batch train : 75/103
batch train : 90/103
batch test : 0/35
batch test : 15/35
batch test : 30/35
Epoch 2/4
batch train : 0/103
batch train : 15/103
batch train : 30/103
batch train : 45/103
batch train : 60/103
batch train : 75/103
batch train : 90/103
batch test : 0/35
batch test : 15/35
batch test : 30/35
Epoch 3/4
batch train : 0/103
batch train : 15/103
batch train : 30/103
batch train : 45/103
batch train : 60/103
batch train : 75/103
batch train : 90/103
batch test : 0/35
batch test : 15/35
batch test : 30/35
Epoch 4/4
batch train : 0/103
batch train : 15/103
batch train : 30/103
batch train : 45/103
batch train : 60/103
batch train : 75/103
batch trai

In [None]:
import json
if num_cl == 0:
    name = 'models_accuracies_5classes.txt'

if num_cl:
    name = 'models_accuracies_10classes.txt'

with open(name, 'w') as f:
    for i in range(5):
        if i != 0: f.write('\n\n')
        f.write(str(i) + ':')
        f.write("\ntrain accuracy:" + str(model_accuracy[i]["train accuracy"]))
        f.write("\ntest accuracy:" + str(model_accuracy[i]["test accuracy"]))
        f.write("\ntrain loss:" + str(model_accuracy[i]["train loss"]))
        f.write("\ntest loss:" + str(model_accuracy[i]["test loss"]))