In [None]:
import os
import torch
import torchvision
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import torchvision.transforms as tt
import torchvision.models as models
from torchvision.transforms.transforms import Resize
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.transforms as tt
from torch.utils.data import random_split
from torchvision.utils import make_grid
from copy import copy
%matplotlib inline

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
!unzip gdrive/MyDrive/data/archive.zip

In [None]:
classes = os.listdir( "./asl_alphabet_train/asl_alphabet_train")
print(classes)

x = 0
for letter in classes:
    x = x + 1

print(str(x) + " classes")

['space', 'O', 'T', 'N', 'Y', 'G', 'K', 'Q', 'J', 'nothing', 'A', 'M', 'P', 'X', 'R', 'B', 'L', 'C', 'W', 'V', 'del', 'D', 'F', 'U', 'S', 'Z', 'E', 'I', 'H']
29 classes


In [None]:
dataset = ImageFolder('./asl_alphabet_train/asl_alphabet_train')


In [None]:
# Data transforms (normalization and data augmentation)
train_tfms = tt.Compose([tt.Resize((224, 224)),
                         tt.RandomCrop(224, padding=28, padding_mode='constant', fill=(0,0,0)),
                         tt.RandomHorizontalFlip(p=0.3), 
                         tt.RandomRotation(30),
                         tt.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
                         tt.RandomPerspective(distortion_scale=0.2),
                         tt.ToTensor(),
                         tt.Normalize([0.485, 0.456, 0.406],
                        [0.229, 0.224, 0.225])])

valid_tfms = tt.Compose([tt.Resize((224, 224)),
                         tt.ToTensor(),
                         tt.Normalize([0.485, 0.456, 0.406],
                        [0.229, 0.224, 0.225])])

In [None]:
val_size = int(0.15 * len(dataset))
train_size = len(dataset) - val_size

train_ds, valid_ds = random_split(dataset, [train_size, val_size])
len(train_ds), len(valid_ds)

(73950, 13050)

In [None]:
train_ds.dataset = copy(dataset)
train_ds.dataset.transform = train_tfms
valid_ds.dataset.transform = valid_tfms

In [None]:
# Pytorch Datasets
# train_ds = ImageFolder("./asl_alphabet_train/asl_alphabet_train", train_tfms)
# test_ds = ImageFolder("./asl_alphabet_test", valid_tfms)

In [None]:
# HyperParameters
batch_size = 50


In [None]:
random_seed = 23
torch.manual_seed(random_seed);

In [None]:
# Pytorch data loaders
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=4, pin_memory=True)
valid_dl = DataLoader(valid_ds, batch_size*2, num_workers=4, pin_memory=True)

In [None]:
def to_device(data, device):
    # Move Tensors to a chosen device
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    # Move Data to the device
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
    
    def __iter__(self):
        for batch in self.dl:
            yield to_device(batch, self.device)
            
    def __len__(self):
        # Number of batches
        return len(self.dl)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

train_dl = DeviceDataLoader(train_dl, device)
valid_dl = DeviceDataLoader(valid_dl, device)

print(train_dl.device)
print(valid_dl.device)

cuda
cuda
cuda


In [None]:
# Create Network class and make helper methods for training and validation
class Network(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_acc': acc, 'val_loss': loss.detach()}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_acc': epoch_acc.item(), 'val_loss': epoch_loss.item()}

    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], val_acc: {:.4f}, val_loss: {:.4f}".format(epoch, result['val_acc'], result['val_loss']))
        return True

def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

In [None]:

class ResNet152(Network):
    def __init__(self):
        super().__init__()
        # Use a pretrained model
        self.network = models.resnet152(pretrained=True)
        # Replace last layer
        num_ftrs = self.network.fc.in_features
        self.network.fc = nn.Linear(num_ftrs, 29)
    
    def forward(self, xb):
        return self.network(xb)

    def freeze(self):
        # To freeze the residual layers
        for param in self.network.parameters():
            param.require_grad = False
        for param in self.network.fc.parameters():
            param.require_grad = True
        return True
    
    def unfreeze(self):
        # Unfreeze all layers
        for param in self.network.parameters():
            param.require_grad = True
        return True


In [None]:
model = to_device(ResNet152(), device)
model

In [None]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_one_cycle(epochs, max_lr, model, train_loader, val_loader, 
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):
    torch.cuda.empty_cache()
    history = []
    
    # Set up cutom optimizer with weight decay
    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # Set up one-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_loader))
    
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        lrs = []
        for batch in train_loader:
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            
            # Gradient clipping
            if grad_clip: 
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)
            
            optimizer.step()
            optimizer.zero_grad()
            
            # Record & update learning rate
            lrs.append(get_lr(optimizer))
            sched.step()
        
        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['lrs'] = lrs
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [None]:
history = [evaluate(model, valid_dl)]
history

[{'val_acc': 0.04793892800807953, 'val_loss': 3.437835693359375}]

In [None]:
# model.freeze()

In [None]:
epochs = 1
max_lr = 1e-5
grad_clip = 0.001
weight_decay = 1e-5
opt_func = torch.optim.AdamW

In [None]:
%%time
history += fit_one_cycle(epochs, max_lr, model, train_dl, valid_dl, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)

Epoch [0], val_acc: 0.9992, val_loss: 0.0190
CPU times: user 10min 22s, sys: 5min 48s, total: 16min 10s
Wall time: 15min 48s


In [None]:
# model.unfreeze()

In [None]:
%%time
history += fit_one_cycle(epochs, max_lr, model, train_dl, valid_dl, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)

Epoch [0], val_acc: 1.0000, val_loss: 0.0019
CPU times: user 10min 23s, sys: 5min 47s, total: 16min 10s
Wall time: 15min 48s


In [None]:
history = [evaluate(model, valid_dl)]
history

[{'val_acc': 1.0, 'val_loss': 0.0018670875579118729}]

In [None]:
model.eval()

In [None]:
# Save Pytorch Model
FILE = "gdrive/MyDrive/data/modelResNet12.pth"
torch.save(model.state_dict(), FILE)

In [None]:
# onnx_model_path = "gdrive/MyDrive/data/modelResNetV10.onnx"
# x = torch.randn(50, 3, 200, 200, device=device) # Sample input in the shape that the model expects
# torch.onnx.export(model, x, onnx_model_path, export_params=True, verbose=True,)

In [None]:
!unzip gdrive/MyDrive/data/archiveTest.zip

Archive:  gdrive/MyDrive/data/archiveTest.zip
replace A/A0001_test.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [None]:
test_dataset = ImageFolder('./asl-alphabet-testR')
test_ds, _ = random_split(test_dataset, [len(test_dataset), 0])
test_ds.dataset.transform = tt.Compose([tt.Resize((224, 224)),
                         tt.RandomCrop(224, padding=28, padding_mode='constant', fill=(0,0,0)),
                         tt.RandomHorizontalFlip(p=0.3), 
                         tt.RandomRotation(30),
                         tt.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
                         tt.RandomPerspective(distortion_scale=0.2),
                         tt.ToTensor(),
                         tt.Normalize([0.485, 0.456, 0.406],
                        [0.229, 0.224, 0.225])
                         ])
test_dl = DataLoader(test_ds, batch_size*2, num_workers=4, pin_memory=True)

In [None]:
test_dl = DeviceDataLoader(test_dl, device)


In [None]:
evaluate(model, test_dl)


{'val_acc': 0.8022222518920898, 'val_loss': 0.7864134907722473}

In [None]:
def predict_image(img, model):
    # Convert to a batch of 1
    xb = to_device(img.unsqueeze(0), device)
    # Get predictions from model
    yb = model(xb)
    # Pick index with highest probability
    _, preds  = torch.max(yb, dim=1)
    # Retrieve the class label
    return dataset.classes[preds[0].item()]

In [None]:
from PIL import Image
import cv2
import io

In [None]:
img = Image.open('/content/gdrive/MyDrive/data/model/test_B.jpg')
img = tt.functional.rotate(img, angle=270)
img = valid_tfms(img)

In [None]:
print(img.shape)
print('Predicted:', predict_image(img, model))

torch.Size([3, 224, 224])
Predicted: B


In [None]:
test_dataset = ImageFolder('./asl-alphabet-testR')
test_ds, _ = random_split(test_dataset, [len(test_dataset), 0])
test_ds.dataset.transform = tt.Compose([tt.ToTensor()])

In [None]:
img, label = test_ds[16]
print('Label:', test_dataset.classes[label])
print('Predicted:', predict_image(img, model))

In [None]:
img, label = test_ds[1]
print('Label:', test_dataset.classes[label])
print('Predicted:', predict_image(img, model))

In [None]:
img, label = test_ds[0]
print('Label:', test_dataset.classes[label])
print('Predicted:', predict_image(img, model))

Label: space
Predicted: space


In [None]:
x = 60
while x <= 100:
  img, label = test_ds[x]
  print('Expected Ouput:', test_dataset.classes[label])
  print('Predicted Output:', predict_image(img, model))
  print()
  x = x + 1