In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm.notebook import tqdm
from torchvision.datasets import ImageFolder
from torchvision.transforms import ToTensor
from torchvision.utils import make_grid
from torch.utils.data import random_split
from torch.utils.data.dataloader import DataLoader
import matplotlib.pyplot as plt
from torchvision.transforms import Resize
from torchvision import transforms
from torchvision.datasets import ImageFolder
%matplotlib inline

In [None]:
dataset_path = './input/fruits/fruits-360/Training'
test_path = './input/fruits/fruits-360/Test'
print(os.listdir(dataset_path))
print(os.listdir(test_path))
print(torch.cuda.is_available())

In [None]:
image_size = 50

fruit_data = ImageFolder(dataset_path, transform = transforms.Compose([
    Resize((image_size, image_size)),
    ToTensor()
]))

fruit_test = ImageFolder(test_path, transform = transforms.Compose([
    Resize((image_size, image_size)),
    ToTensor()
]))

In [None]:
torch.manual_seed(20)
validation_length = len(fruit_data) // 10
training_length = len(fruit_data) - validation_length

In [None]:
training_dataset, validation_dataset = random_split(fruit_data, [training_length, validation_length])
print(len(training_dataset))
print(len(validation_dataset))

In [None]:
batch_length = 32
training_loader = DataLoader(training_dataset, batch_length, shuffle = True, num_workers = 4, pin_memory = True)
validation_loader = DataLoader(validation_dataset, batch_length * 2, num_workers = 4, pin_memory = True)  
test_loader = DataLoader(fruit_test, batch_length * 2, num_workers = 4, pin_memory = True)

In [None]:
device = None
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

print(device)

In [None]:
def move_to_device(data, device):
    if (isinstance(data, (list, tuple))):
        return [move_to_device(d, device) for d in data]
    
    return data.to(device, non_blocking=True)

In [None]:
class DeviceLoader():
    def __init__(self, dataloader, device):
        self.dataloader = dataloader
        self.device = device

    def __iter__(self):
        for d in self.dataloader:
            yield move_to_device(d, self.device)

    def __len__(self):
        return len(self.dataloader)

In [None]:
training_loader = DeviceLoader(training_loader, device)
validation_loader = DeviceLoader(validation_loader, device)
test_loader = DeviceLoader(test_loader, device)

In [None]:
def calc_accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim = 1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

In [None]:
class Classification(nn.Module):
    def train_batch(self, batch):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)
        return loss
    
    def validate_batch(self, batch):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)
        accuracy = calc_accuracy(outputs, labels)
        return {'validation_loss': loss.detach(), 'validation_accuracy': accuracy}
    
    def calc_validation_epoch(self, outputs):
        batch_losses = [o['validation_loss'] for o in outputs]
        epoch_loss = torch.stack(batch_losses).mean() 
        batch_accs = [o['validation_accuracy'] for o in outputs]
        epoch_accuracy = torch.stack(batch_accs).mean()
        return {'validation_loss': epoch_loss.item(), 'validation_accuracy': epoch_accuracy.item()}
    
    def print_epoch_result(self, epoch, result):
        print("Epoch [{}], training_loss: {:.4f}, validation_loss: {:.4f}, validation_accuracy: {:.4f}".format(
            epoch, result['training_loss'], result['validation_loss'], result['validation_accuracy']))

In [None]:
class Model(Classification):
    def __init__(self, input_length, output_length):
        super().__init__()
        self.in_layer = nn.Linear(input_length, 512)
        self.hidden1 = nn.Linear(512, 256)
        self.hidden2 = nn.Linear(256, 128)
        self.hidden3 = nn.Linear(128, 64)
        self.out_layer = nn.Linear(64, output_length)

    def forward(self, xb):
        # Flatten images into vectors
        output = xb.view(xb.size(0), -1)
        # Apply layers & activation functions
        # Input layer
        output = self.in_layer(output)
        output = F.relu(output)
        # Hidden layers w/ ReLU
        output = self.hidden1(output)
        output = F.relu(output)
        output = self.hidden2(output)
        output = F.relu(output)
        output = self.hidden3(output)
        output = F.relu(output)
        # Class output layer
        output = self.out_layer(output)
        return output

In [None]:
image, label = fruit_data[0]
print(image.shape)

In [None]:
input_length = 3*50*50
output_length = len(os.listdir(dataset_path))

In [None]:
model = move_to_device(Model(input_length, output_length), device)

In [None]:
def verify(model, validation_loader):
    outputs = [model.validate_batch(batch) for batch in validation_loader]
    return model.calc_validation_epoch(outputs)

In [None]:
training_model = [verify(model, validation_loader)]

In [None]:
def learn_model(epochs, learing_rate, model, training_loader, validation_loader, opt_func=torch.optim.Adam):
    tm = []
    optimizer = opt_func(model.parameters(), learing_rate)
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        training_losses = []
        for batch in tqdm(training_loader):
            loss = model.train_batch(batch)
            training_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        # Validation phase
        result = verify(model, validation_loader)
        result['training_loss'] = torch.stack(training_losses).mean().item()
        model.print_epoch_result(epoch, result)
        tm.append(result)
    return tm

In [None]:
def draw_accuracy_plot(training_model):
    accuracies = [x['validation_accuracy'] for x in training_model]
    plt.plot(accuracies, '-x')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.show()

In [None]:
def draw_losses_plot(training_model):
    training_losses = [x.get('training_loss') for x in training_model]
    validation_losses = [x['validation_loss'] for x in training_model]
    plt.plot(training_losses, '-bx')
    plt.plot(validation_losses, '-rx')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.show()

In [None]:
training_model += learn_model(4, 0.1, model, training_loader, validation_loader, torch.optim.SGD)

In [None]:
training_model += learn_model(4, 0.01, model, training_loader, validation_loader, torch.optim.SGD)

In [None]:
training_model += learn_model(4, 0.001, model, training_loader, validation_loader, torch.optim.SGD)

In [None]:
training_model += learn_model(3, 0.0001, model, training_loader, validation_loader, torch.optim.SGD)

In [None]:
draw_losses_plot(training_model)

In [None]:
draw_accuracy_plot(training_model)

In [None]:
verify(model, test_loader)