In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torch.utils.data import Subset
from torch.utils.data import random_split
from torch.utils.data import ConcatDataset
from torchvision.transforms import AutoAugment, AutoAugmentPolicy
from torchvision.datasets import ImageFolder
from PIL import Image
import numpy as np
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#Training data filename and labels filename
data_file = 'data_train.npy'
labels_file = 'labels_train.npy'


class MathSymbols(Dataset):
    def __init__(self, data_file, labels_file, data_transform=None, label_transform=None):
        '''
        data_file: Data file for training the model (should be .npy file)
        labels_file: Target labels (should be .npy file)   
        '''
        # Load data 
        self.data = np.load(data_file)
        self.labels = np.load(labels_file)
        self.data_transform = data_transform
        self.label_transform = label_transform
        
        # Reshape data
        self.data = np.reshape(np.transpose(self.data), (4480, 100, 100))

        # Convert the images to have 3 channels so it will function with pretrained models
        self.data = self.convert_to_rgb(self.data)

    def __len__(self):
        # Length of data (and labels) array
        return len(self.data)

    def __getitem__(self, idx):
        # Fetch the dataset item
        sample = self.data[idx]
        label = self.labels[idx]
        sample = Image.fromarray(sample)  # Convert numpy array to PIL Image
        
        if self.data_transform:
            # Transform the data
            sample = self.data_transform(sample)
        
        if self.label_transform:
            # Transform the label
            label = self.label_transform(label)
        
        # Convert sample and label to tensor
        label = torch.tensor(label, dtype=torch.long)
        
        return sample, label

    def convert_to_rgb(self, grayscale_images):
        # Add first input channel
        grayscale_images = np.expand_dims(grayscale_images, -1)
        # Change to 3 input channels to match with pretrained models
        rgb_images = grayscale_images.repeat(3, axis=-1)
        return rgb_images
    
    
    
class TransformSubset(torch.utils.data.Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform

    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.subset)
    
    
def Train(X, Y):
    
    aug_transform1 = transforms.Compose(
    [AutoAugment(policy=AutoAugmentPolicy.IMAGENET),
     transforms.ToTensor(),
     transforms.Normalize(0.5, 0.5)])

    aug_transform2 = transforms.Compose(
    [AutoAugment(policy=AutoAugmentPolicy.CIFAR10),
     transforms.ToTensor(),
     transforms.Normalize(0.5, 0.5)])

    aug_transform3 = transforms.Compose(
    [AutoAugment(policy=AutoAugmentPolicy.SVHN),
     transforms.ToTensor(),
     transforms.Normalize(0.5, 0.5)])

    transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize(0.5, 0.5)])
    
    
    batch_size = 8

    data = MathSymbols(data_file = X, labels_file = Y)

    # split for validation
    train_len = int((0.8*len(data)))
    valid_len = int((len(data)-train_len))

    train_data, valid_data = random_split(data, [train_len, valid_len], generator=torch.Generator().manual_seed(1997))

    # Apply transformations using the wrapper class
    train1 = TransformSubset(train_data, transform=transform)
    train2 = TransformSubset(train_data, transform=aug_transform1)
    train3 = TransformSubset(train_data, transform=aug_transform2)
    train4 = TransformSubset(train_data, transform=aug_transform3)
    valid = TransformSubset(valid_data, transform=transform)

    train = ConcatDataset([train1, train2, train3, train4])
    
    #Training data set
    trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

    #Validation data set
    validloader = torch.utils.data.DataLoader(valid, batch_size=batch_size,
                                          shuffle=False, num_workers=2)
    
    # Check if CUDA is available
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print("GPU is available. Using GPU.")
    else:
        device = torch.device("cpu")
        print("GPU is not available. Using CPU.")

        
    # Load the pre-trained ResNet50 model
    model = models.resnet50(pretrained=True)

    num_classes = 10 
    model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
    model = model.to(device)
    
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    N_epochs = 100
    min_loss = np.inf
    best_model = None
    patience = 30 # How many increases in validation score before stopping training
    patience_count = 0

    for epoch in range(N_epochs):  # loop over the dataset multiple times
    
        #Training
        model.train(True) # Make sure model is in training mode (not eval mode)
        running_loss = 0.0
    
        for i, data in enumerate(trainloader):
        
            # Get the images and target labels
            images, labels = data
        
            # Move images and labels to GPU if possible
            images = images.to(device)
            labels = labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(images) # Calculate predictions for images for this model instantiation
            loss = criterion(outputs, labels) # Calculate the loss from predictions and target labels
            loss.backward() # Backpropagation
            optimizer.step()

            # print statistics
            running_loss += loss.item()

            if i % 100 == 99:    # print every 100 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}')
                running_loss = 0.0


        # Validation
        model.eval() 
        val_loss = 0 # Loss
        val_num = 0 # Number of classifications made

        with torch.no_grad(): # Don't waste memory calculating gradients

            for i, data in enumerate(validloader):

                images, labels = data
                images = images.to(device) #Move images to GPU
                labels = labels.to(device) #Move labels to GPU
                outputs = model(images) # Classify images with the trained model
                loss = criterion(outputs, labels) # Calculate the loss 
                val_loss += loss.item()
                val_num += 1 # Number of validation losses calculated

        # Print the average validation loss
        avg_val_loss = val_loss / val_num
    
        # If keep track of lowest avg_loss for 
        if (avg_val_loss) < min_loss:
            min_loss = avg_val_loss 
            best_model = model.state_dict()
            patience_count = 0 # Reset count

        else:
            patience_count += 1
            if (patience_count >= patience):
                break


        print(f'Epoch {epoch + 1} validation loss: {avg_val_loss:.3f}')

            
    print('Finished Training')

    # Save the best performing model
    torch.save(best_model, 'CNN.pth')
    

Train(data_file, labels_file)

GPU is available. Using GPU.




[1,   100] loss: 2.527
[1,   200] loss: 2.301
[1,   300] loss: 2.295
[1,   400] loss: 2.216


KeyboardInterrupt: 