# Rock Paper Scissors - Pytorch Multiclass Classifier
In this workbook we illustrate multi-class image classification of the [Rock Paper Scissors dataset](https://www.kaggle.com/drgfreeman/rockpaperscissors), which is downloaded from Kaggle from the above URL. You'll need to login with your Kaggle ID to download the dataset. The data was downloaded and un-zipped to the `./data/RockPaperScissors` folder

In [1]:
# Connect to Google drive, so I can import modules from it into my workspace
# set this flag to True when you run this Notebook on Google Collaboratory
USE_COLAB = False

import os

#@see: https://colab.research.google.com/drive/1srw_HFWQ2SMgmWIawucXfusGzrj1_U0q
if USE_COLAB:
    # Load the Drive helper and mount
    from google.colab import drive

    # This will prompt for authorization.
    gdrive_mount_point = '/content/drive/'
    drive.mount(gdrive_mount_point)
    grive_path = os.path.join(gdrive_mount_point, "My Drive/")
    print('Your Google Drive is mounted at ', grive_path)
    COLAB_NOTEBOOKS_PATH = os.path.join(grive_path, "Colab Notebooks/")
    
    if os.path.exists(COLAB_NOTEBOOKS_PATH):    
        # display list of files
        #!ls '/content/drive/My Drive/Colab Notebooks/'

        # append our Google Drive folder to module search 
        # **NOTE:** comment this line out if you are NOT running this notebook on Google Colab
        import sys
        #sys.path.append("/content/drive/My Drive/Colab Notebooks/")
        pyt_path = os.path.join(COLAB_NOTEBOOKS_PATH, 'pytorch')
        sys.path.append(pyt_path)
        print('You Pytorch Colab notebooks are available at ', pyt_path)
        print('NOTE: {} is appended to sys.path!'.format(pyt_path))
        # and test it
        import pytorch_toolkit as pyt
        MODEL_SAVE_DIR = os.path.join(pyt_path, 'model_states')
    else:
        raise IOError("Unable to mount Google Drive!")
else:
    MODEL_SAVE_DIR = os.path.join('.','model_states')
    
if not os.path.exists(MODEL_SAVE_DIR):
    os.mkdir(MODEL_SAVE_DIR)

assert os.path.exists(MODEL_SAVE_DIR)
print('MODEL_SAVE_DIR = %s' % MODEL_SAVE_DIR)

MODEL_SAVE_DIR = .\model_states


In [2]:
import warnings
warnings.filterwarnings('ignore')

import os, sys, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pathlib
import glob

# tweaks for libraries
np.set_printoptions(precision=6, linewidth=1024, suppress=True)
plt.style.use('seaborn')
sns.set_style('darkgrid')
sns.set_context('notebook',font_scale=1.10)

# Pytorch imports
import torch
gpu_available = torch.cuda.is_available()
print('Using Pytorch version: %s. GPU %s available' % (torch.__version__, "IS" if gpu_available else "is NOT"))
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torchsummary import summary
from torchvision import transforms 
from tensorflow.keras.preprocessing.image import *
# My helper functions for training/evaluating etc.
import pytorch_toolkit as pytk

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED);

if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    #torch.backends.cudnn.enabled = False

Using Pytorch version: 1.13.0+cpu. GPU is NOT available


In [3]:
CLASSES = ['rock', 'paper', 'scissors']

In [4]:
def get_data(image_paths):
    images, labels = [], []

    for image_path in image_paths:
        image = load_img(image_path, target_size=(32,32), color_mode='grayscale')
        image = img_to_array(image)
        label_name = image_path.split(os.path.sep) [-2]
        label = CLASSES.index(label_name)
        label = np.array(label).astype('int32')
        images.append(image)
        labels.append(label)

        #labels = np.expand_dims(labels, axis=0)

    return np.array(images), np.array(labels)

In [5]:
file_pattern = pathlib.Path.cwd() / 'data' / 'RockPaperScissors' /  '*' / '*.png'
dataset_paths = [*glob.glob(str(file_pattern))]
random.shuffle(dataset_paths)
images, labels = get_data(dataset_paths)
print(f"images.shape: {images.shape} - labels.shape: {labels.shape}")

images.shape: (0,) - labels.shape: (0,)


In [6]:
dataset_paths[:10]

[]

In [7]:
np.bincount(labels.astype('int32')), labels[:10]

(array([], dtype=int64), array([], dtype=float64))

In [8]:
def build_model():
    net = nn.Sequential(
        nn.Conv2d(1, 32, kernel_size=5, padding=1), # 20
        nn.ReLU(),
        nn.BatchNorm2d(32),
        nn.MaxPool2d(2,2),
        nn.Dropout(0.4),

        nn.Conv2d(32, 64, kernel_size=5, padding=1), # 32
        nn.ReLU(),
        nn.BatchNorm2d(64),
        nn.MaxPool2d(2,2),
        nn.Dropout(0.4),       

        nn.Conv2d(64, 64, kernel_size=5, padding=1), # 32
        nn.ReLU(),
        nn.BatchNorm2d(64),
        nn.MaxPool2d(2,2),
        nn.Dropout(0.4),     

        nn.Flatten(),

        nn.Linear(64*2*2, 500),
        nn.ReLU(),
        nn.Dropout(0.4),

        nn.Linear(500, 3)
    )

    model = pytk.PytkModuleWrapper(net)
    # add weights to optimizer due to class imbalance
    # weights = torch.FloatTensor(class_counts) / total
    # weights = weights.cuda() if torch.cuda.is_available() else weights.cpu()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(params=model.parameters(), lr=0.001)
    #optimizer = optim.SGD(params=model.parameters(), lr=0.001)
    model.compile(loss=criterion, optimizer=optimizer, metrics=['acc'])
    return model

In [9]:
from sklearn.model_selection import train_test_split

# split dataset into train/test sets in 70:30 ration
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.3, 
                                                    random_state=SEED, stratify=labels)
# split the test set into cross-val & test datasets in 80:20 rstio
# NOTE: Andrew Ng recommends that, if possible, the test & cv datasets should be drawn
# from the same sample
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.3, 
                                                random_state=SEED, stratify=y_test)

print(f"X_train.shape: {X_train.shape} - y_train.shape: {y_train.shape} - class dist: {np.bincount(y_train)}\n" +
      f"X_val.shape: {X_val.shape} - y_val.shape: {y_val.shape} - class dist: {np.bincount(y_val)}\n" +
      f"X_test.shape: {X_test.shape} - y_test.shape: {y_test.shape} - class dist: {np.bincount(y_test)}") 

y_train = y_train.astype('float32')
y_test = y_test.astype('float32')
y_val = y_val.astype('float32')

ValueError: With n_samples=0, test_size=0.3 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

In [None]:
# define a dataset for Pytorch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class RPSDataset(Dataset):
    def __init__(self, x, y, transforms=None):
        self.x = x
        self.y = torch.LongTensor(y)
        self.transforms = transforms

    def __len__(self):
        return len(self.x)

    def __getitem__(self, ix):
        x, y = self.x[ix], self.y[ix]
        if self.transforms is not None:
            x = self.transforms(x)
        return x, y

train_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomAffine(0, shear=0.2),         # random shear 0.2
    transforms.RandomAffine(0, scale=(0.8, 1.2)),  # random zoom 0.2
    transforms.RandomRotation(20),
    transforms.RandomHorizontalFlip(),   
    transforms.RandomVerticalFlip(),     
])
    
image_transforms = transforms.Compose([
    transforms.ToTensor()   
])

In [None]:
train_dataset = RPSDataset(X_train, y_train, train_transforms)
val_dataset = RPSDataset(X_val, y_val, image_transforms)
test_dataset = RPSDataset(X_test, y_test, image_transforms)

In [None]:
testloader = DataLoader(test_dataset, batch_size=32, shuffle=True)
data_iter = iter(testloader)
sample_images, sample_labels = next(data_iter)
sample_images, sample_labels = sample_images.cpu().numpy(), sample_labels.cpu().numpy()
print(f"images.shape: {sample_images.shape} - labels.shape: {sample_labels.shape}")

#plt.figure(figsize=(5,5))
for i in range(32):
    plt.subplot(4, 8, i + 1)
    image = sample_images[i]
    #print(f"images[{i}].shape: {image.shape} ")
    image = image.transpose((1,2,0))
    #print(f" - AP: images[{i}].shape: {image.shape}")
    #plt.imshow(image.squeeze(), cmap='gray')
    plt.imshow(image.squeeze())
    plt.axis('off')
plt.show()
plt.close()

In [None]:
testloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
data_iter = iter(testloader)
sample_images, sample_labels = next(data_iter)
sample_images, sample_labels = sample_images.cpu().numpy(), sample_labels.cpu().numpy()
print(f"images.shape: {sample_images.shape} - labels.shape: {sample_labels.shape}")

#plt.figure(figsize=(5,5))
for i in range(32):
    plt.subplot(4, 8, i + 1)
    image = sample_images[i]
    #print(f"images[{i}].shape: {image.shape} ")
    image = image.transpose((1,2,0))
    #print(f" - AP: images[{i}].shape: {image.shape}")
    #plt.imshow(image.squeeze(), cmap='gray')
    plt.imshow(image.squeeze())
    plt.axis('off')
plt.show()
plt.close()

In [None]:
model = build_model()
print(model.summary((1, 32, 32)))

In [None]:
# hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)
hist = model.fit_dataset(train_dataset, epochs=10, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance')

In [None]:
loss, acc = model.evaluate_dataset(train_dataset)
print(f"Training  -> loss: {loss:.3f} - acc: {acc:.3f}")
loss, acc = model.evaluate_dataset(val_dataset)
print(f"Cross-val -> loss: {loss:.3f} - acc: {acc:.3f}")
loss, acc = model.evaluate_dataset(test_dataset)
print(f"Testing   -> loss: {loss:.3f} - acc: {acc:.3f}")

In [None]:
MODEL_SAVE_PATH = './model_states/pytk_rock_paper_scissors.pt'

In [None]:
model.save(MODEL_SAVE_PATH)
del model

In [None]:
model = build_model()
model.load(MODEL_SAVE_PATH)
print(model.summary((1, 32, 32)))

In [None]:
probs, y_test = model.predict_dataset(test_dataset)
y_pred = np.argmax(probs, axis=1)
y_test[:20], y_pred[:20]

In [None]:
print(f"We got {(y_test == y_pred).sum()} of {len(y_test)} correct!")

In [None]:
def create_model(activate_dropout = True, dropout_probability = 0.4):
    if activate_dropout:
        net = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5, padding=1), # 20
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2,2),
            nn.Dropout(dropout_probability),

            nn.Conv2d(32, 64, kernel_size=5, padding=1), # 32
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2,2),
            nn.Dropout(dropout_probability),       

            nn.Conv2d(64, 64, kernel_size=5, padding=1), # 32
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2,2),
            nn.Dropout(dropout_probability),     

            nn.Flatten(),

            nn.Linear(64*2*2, 500),
            nn.ReLU(),
            nn.Dropout(dropout_probability),

            nn.Linear(500, 3)
            )
    else:
        net = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5, padding=1), # 20
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2,2),

            nn.Conv2d(32, 64, kernel_size=5, padding=1), # 32
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2,2),

            nn.Conv2d(64, 64, kernel_size=5, padding=1), # 32
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2,2),

            nn.Flatten(),

            nn.Linear(64*2*2, 500),
            nn.ReLU(),

            nn.Linear(500, 3)
            )

    model = pytk.PytkModuleWrapper(net)
    # add weights to optimizer due to class imbalance
    # weights = torch.FloatTensor(class_counts) / total
    # weights = weights.cuda() if torch.cuda.is_available() else weights.cpu()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(params=model.parameters(), lr=0.001)
    #optimizer = optim.SGD(params=model.parameters(), lr=0.001)
    model.compile(loss=criterion, optimizer=optimizer, metrics=['acc'])
    return model

In [None]:
model = create_model(activate_dropout = False)

In [None]:
hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance without Dropout Layers')

In [None]:
model = create_model(activate_dropout = True, dropout_probability = 0.4)

In [None]:
hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance with Dropout Rate = 0.4')

In [None]:
model = create_model(activate_dropout = True, dropout_probability = 0.2)

In [None]:
hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance with Dropout Rate = 0.2')

In [None]:
model = create_model(activate_dropout = True, dropout_probability = 0.6)

In [None]:
hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance with Dropout Rate = 0.6')

In [None]:
model = create_model(activate_dropout = True, dropout_probability = 0.8)

In [None]:
hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance with Dropout Rate = 0.8')

In [None]:
def create_model(activate_dropout = True, dropout_probability = 0.4, activate_batchnorm = True, optimizer_name = 'Adam'):
    layers = []
    
    # 20
    layers.append(nn.Conv2d(1, 32, kernel_size=5, padding=1))
    layers.append(nn.ReLU())
    if activate_batchnorm:
        layers.append(nn.BatchNorm2d(32))
    layers.append(nn.MaxPool2d(2,2))
    if activate_dropout:
        layers.append(nn.Dropout(dropout_probability))
    
    # 32
    layers.append(nn.Conv2d(32, 64, kernel_size=5, padding=1))
    layers.append(nn.ReLU())
    if activate_batchnorm:
        layers.append(nn.BatchNorm2d(64))
    layers.append(nn.MaxPool2d(2,2))
    if activate_dropout:
        layers.append(nn.Dropout(dropout_probability))
    
    # 64
    layers.append(nn.Conv2d(64, 64, kernel_size=5, padding=1))
    layers.append(nn.ReLU())
    if activate_batchnorm:
        layers.append(nn.BatchNorm2d(64))
    layers.append(nn.MaxPool2d(2,2))
    if activate_dropout:
        layers.append(nn.Dropout(dropout_probability))
        
    layers.append(nn.Flatten())
    layers.append(nn.Linear(64*2*2, 500))
    layers.append(nn.ReLU())
    if activate_dropout:
        layers.append(nn.Dropout(dropout_probability))
    
    layers.append(nn.Linear(500, 3))
    
    net = nn.Sequential(*layers)
    
    model = pytk.PytkModuleWrapper(net)

    criterion = nn.CrossEntropyLoss()
    
    if optimizer_name == 'Adam':
        optimizer = optim.Adam(params=model.parameters(), lr=0.001)
    elif optimizer_name == 'SGD':
        optimizer = optim.SGD(params=model.parameters(), lr=0.001)
    model.compile(loss=criterion, optimizer=optimizer, metrics=['acc'])
    return model

In [None]:
model = create_model(activate_dropout = True, dropout_probability = 0.4, activate_batchnorm = True, optimizer_name = 'SGD')

In [None]:
hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance With Dropout Rate = 0.4 and SGD Optimizer')

In [None]:
model = create_model(activate_dropout = False, dropout_probability = 0.4, activate_batchnorm = True, optimizer_name = 'Adam')

In [None]:
hist = model.fit_dataset(train_dataset, epochs=100, batch_size=32, validation_dataset=val_dataset)

In [None]:
pytk.show_plots(hist, metric='acc', plot_title='Model Performance With Dropout Rate = 0.4 and SGD Optimizer')

In [None]:
def run_training(activate_dropout = False, dropout_probability = 0.4, activate_batchnorm = True, optimizer_name = 'Adam'):
    model = create_model(activate_dropout, dropout_probability, activate_batchnorm, optimizer_name)
    hist = model.fit_dataset(train_dataset, epochs = 100, batch_size=32, validation_dataset=val_dataset)
    if not activate_dropout:
        pytk.show_plots(hist, metric='acc', plot_title='Model Performance without Dropout Layers')
    else:
        pytk.show_plots(hist, metric='acc', plot_title='Model Performance with Dropout Rate = ' + str(dropout_probability))
        

In [None]:
run_training(False, 0, False)

In [None]:
run_training(True, 0.2)

In [None]:
run_training(True, 0.4)

In [None]:
run_training(True, 0.8)

In [None]:
run_training(False, 0)