In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.transforms.functional as transform
from torch.nn.functional import normalize
from PIL import Image
import glob
import time
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import albumentations
import cv2

### 1. Get training dataset in usable format

- images are in jpg format, need to convert these to PyTorch tensors
- tensors also have to be normalized  

In [2]:
#torchvision.io.read_image()


In [3]:
### class to load and store relevant data 
class LoadImageData:   
    def __init__(self, n_files_per_class=10000):  ### n_files == number of files of each letter to train on
        self.label_converter = dict()
        self.convert_label_to_int()
        self.n_files_per_class = n_files_per_class
        self.train_data    = [] # train data paths, no longer in tensor format
        self.test_data     = [] # test data paths, no longer in tensor format
        self.train_labels = [] # truth LETTER label for training data
        self.test_labels   = [] # truth LETTER label for test data
        self.train_labels_num =  [] # truth NUMBER label for training data
        self.test_labels_num  =  [] # truth NUMBER label for test data
        
        self.image_path_dict = dict() # dictionary of all training file paths
        self.load_train_data()
        self.load_test_data()
        
        self.transf = albumentations.Compose([
            albumentations.Resize(224, 224, always_apply=True),
        ])
    ### helper function to convert str letters/del/space into numbers 
    def convert_label_to_int(self):
        alphabet = "A/B/C/D/E/F/G/H/I/J/K/L/M/N/O/P/Q/R/S/T/U/V/W/X/Y/Z/del/nothing/space"
        for iii,label in enumerate(alphabet.split("/")):
            self.label_converter[label] = iii
    
    ### load training data into the instance variable train_data
    def load_train_data(self):
        now = time.time()
        train_path = "datasets/asl_alphabet_train/asl_alphabet_train/"
        train_directories = glob.glob(train_path+"/*")
        n_files = 0
        print(" ----- Loading training dataset -----")

        for dir in train_directories:
            letter = dir.split("/")[-1]
            self.image_path_dict[letter] = []
            n_test_for_class = 0 
            for image_file in glob.glob(dir+"/*"):
                if n_test_for_class > (self.n_files_per_class-1):
                    break ### move onto the next letter 
                self.image_path_dict[letter].append(image_file)
                self.train_data.append( image_file )
                self.train_labels_num.append(self.label_converter[letter])
                self.train_labels.append(letter)
                n_test_for_class+=1
                n_files +=1
            print("Finished importing %s"%letter)
        print("Done with training dataset - loaded paths for %i files. Took %f seconds"%(n_files, np.around(time.time()-now)))
        return
    ### load test data into the instance variable train_data
    def load_test_data(self):
        now = time.time()
        test_path = "datasets/asl_alphabet_test/asl_alphabet_test/"
        test_directories = glob.glob(test_path+"/*")
        n_files = 0
        print("----- Loading test dataset -----")
        for image_file in test_directories:
            letter = image_file.split("_")[-2].split("/")[-1]
            self.image_path_dict[letter] = []
            self.image_path_dict[letter].append(image_file)
            self.test_data.append(image_file   )
            self.test_labels_num.append(self.label_converter[letter])
            self.test_labels.append(letter)
            n_files +=1
        print("Done with test dataset - loaded paths for %i files. Took %f seconds"%(n_files, np.around(time.time()-now,4)))
        return
    def __getitem__(self,i):
        image = cv2.imread(self.train_data[i])
        image = self.transf(image=np.array(image))['image']
        image = np.transpose(image, (2, 0, 1)).astype(np.float32)
        label = self.train_labels[i]
        return torch.tensor(image, dtype=torch.float), torch.tensor(label, dtype=torch.long)
### lighter class to store test/train paths and labels
class ImageData:
    def __init__(self,paths, labels):
        self.X = paths
        self.y = labels
        self.transf = albumentations.Compose([
            albumentations.Resize(224, 224, always_apply=True),
        ])
    def __len__(self):
        return (len(self.X))
        
    def __getitem__(self,i):
        image = cv2.imread(self.X[i])
        image = self.transf(image=np.array(image))['image']
        image = np.transpose(image, (2, 0, 1)).astype(np.float32)
        label = self.y[i]
        return torch.tensor(image, dtype=torch.float), torch.tensor(label, dtype=torch.long)

In [4]:
datasets = LoadImageData(1000) ## data instance, passing in 1000 so that only 1000 of each letter are used for training 
### changing gears, keeping the "data" as the file paths to each jpg, also imbuing this class with a __get__
### built-in method that returns the relevent tensors for when these are needed 

train_data = ImageData(datasets.train_data, datasets.train_labels_num)
test_data  = ImageData(datasets.test_data, datasets.test_labels_num)

 ----- Loading training dataset -----
Finished importing R
Finished importing U
Finished importing I
Finished importing N
Finished importing G
Finished importing Z
Finished importing T
Finished importing S
Finished importing A
Finished importing F
Finished importing O
Finished importing H
Finished importing del
Finished importing nothing
Finished importing space
Finished importing M
Finished importing J
Finished importing C
Finished importing D
Finished importing V
Finished importing Q
Finished importing X
Finished importing E
Finished importing B
Finished importing K
Finished importing L
Finished importing Y
Finished importing P
Finished importing W
Done with training dataset - loaded paths for 29000 files. Took 0.000000 seconds
----- Loading test dataset -----
Done with test dataset - loaded paths for 28 files. Took 0.000400 seconds


### 2. Create a csv with the file path, corresponding, letter, and then binarize this 

In [5]:
df_training = pd.DataFrame()
df_training['path'] = ""
df_training['letter'] = ""
for iii in range(0,len(datasets.train_data)):
    df_training.loc[iii, 'path' ] = datasets.train_data[iii]
    df_training.loc[iii, 'letter' ] = datasets.train_labels[iii] ### converting letter to int
df_training = df_training.sample(frac=1).reset_index(drop=True) ### shuffle
df_training.to_csv("processedDatasets/train_data.csv") ### write out csv file 

In [6]:
letters_binarized = pd.get_dummies(df_training["letter"],dtype=int) ### binarize
letters_binarized.insert(0, 'path', df_training['path']) ### reinsert the path 
letters_binarized.to_csv("processedDatasets/train_data_binarized.csv") ### write out binarized csv

In [7]:
letters_binarized.head(10)

Unnamed: 0,path,A,B,C,D,E,F,G,H,I,...,T,U,V,W,X,Y,Z,del,nothing,space
0,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1
2,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
3,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
8,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,datasets/asl_alphabet_train/asl_alphabet_train...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


### 2.5 Save labels in a loadable format

In [8]:
import pickle
import sklearn.preprocessing


    
binarized_labels = sklearn.preprocessing.label_binarize(datasets.train_labels, classes = np.array("A/B/C/D/E/F/G/H/I/J/K/L/M/N/O/P/Q/R/S/T/U/V/W/X/Y/Z/del/nothing/space".split("/")))
with open('processedDatasets/labels_binarized.pickle', 'wb') as handle:
    pickle.dump(binarized_labels, handle, protocol=pickle.HIGHEST_PROTOCOL)


### test pickle file is working
binarized_labels_pkl = open('processedDatasets/labels_binarized.pickle', 'rb')
# dump information to that file
binarized_labels_loaded = pickle.load(binarized_labels_pkl)

### 3. Define NN Architecture

In [9]:
import torch.nn as nn
import torch.nn.functional as F
### custon CNN, inherits from base torch.nn 
class CustomCNN(nn.Module):
    def __init__(self):
        super(CustomCNN,self).__init__()  ## override __init__ to be from nn.Module base class
        self.conv1 = nn.Conv2d(3,16,5) # 3 channels in, 16 channels out, kernel size 5
        self.conv2 = nn.Conv2d(16,32,5)
        self.conv3 = nn.Conv2d(32,64,3)
        self.conv4 = nn.Conv2d(64,128,5)
        
        self.fc1 = nn.Linear(128,256)
        self.fc2 = nn.Linear(256, len(binarized_labels[0]))
        self.pool = nn.MaxPool2d(2, 2)
    def forward(self,x): # passed in input tensor
        x = self.pool(F.relu(self.conv1(x))) # convolutional layer followed by relu mapping and then a pool layer
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))
        ### get the tensor shape
        t_layers, t_width, t_height = x.shape
        x = F.adaptive_avg_pool2d(x,1).reshape(t_layers,-1)    ### reshape this to be 1D, dimensions are inferred 
        x = F.relu(self.fc1(x))
        return self.fc2(x) ### return tensor of length 29 
               


In [10]:
cust_nn = CustomCNN()

### 4. Set Up training

In [11]:
### set seeds for random, np, torch

In [12]:
import random
import torch.optim as optim
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
#import CustomNN class

def set_seeds(seed_):
    np.random.seed(seed_)
    random.seed(seed_)
    torch.cuda.manual_seed(seed_)
    torch.cuda.manual_seed_all(seed_)
    torch.manual_seed(seed_)
    torch.backends.cudnn.benchmark = True
    return
SEED = 96
set_seeds(SEED)

if torch.cuda.is_available():
    device = ('cuda:0')
    print("Found CUDA:0 - using CUDA.")
else:
    device = ('cpu')
    print("CUDA not found - using CPU.")
### API for interfacing with GPU, (Compute Unified Device Architecture)


CUDA not found - using CPU.


In [13]:
### get images from LoadImageData class


X = datasets.train_data
y = datasets.train_labels

### shuffle train and test with same indices
zipped_Xy = list(zip(X, y))
random.shuffle(zipped_Xy)
X, y = zip(*zipped_Xy)

(xtrain, xtest, ytrain, ytest) = (train_test_split(X, y, 
                                test_size=0.15, random_state=SEED))
print("Finished splitting dataset into train and validation.")

Finished splitting dataset into train and validation.


In [14]:
### init data loaders
train_dataset    = ImageData(xtrain,ytrain)
validate_dataset = ImageData(xtest,ytest)

batch_size = 4 
### now load data into ptytorch
trainloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size,
                                          shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size,
                                          shuffle=True, num_workers=2)




In [15]:
model = CustomCNN().to(device)


In [16]:
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

In [17]:
#optimizer = optim.Adam(model.parameters(), lr=0.001) # construct an optimizer object that will hold the current state and will update the parameters based on the computed gradients., learning rate = 0.001
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)  # stochastic gradient descent

### 5. Do Training

In [None]:
for epoch in range(2):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        print("Running %s"%i)
        # print statistics
        running_loss += loss.item()
        if i % 1000 == 0:    # give update every 1000 minibatches
            print("epoch %s, loss: %f"%(i,running_loss/1000.))
            running_loss = 0.0

print('Finished Training')

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/ethan/miniconda3/envs/ml/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/ethan/miniconda3/envs/ml/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'ImageData' on <module '__main__' (built-in)>
