In [None]:
import numpy as np
import torch

from torch import nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler

from os import listdir
from PIL import Image
from collections import Counter

In [None]:
IMAGE_PATH = '../data/images'
LABEL_PATH = '../data/annotations'

In [None]:
def get_class_map():
    ret = {}

    i = 0
    for fname in listdir(LABEL_PATH):
        img_class, _ = fname.split('.')
        ret[img_class] = i
        i += 1

    return ret

In [None]:
def get_dataloader(bs=64):
    data = []
    X = []
    y = []

    # mapping from class names to integers
    class_map = get_class_map()

    # loop through all the annotations
    for fname in listdir(LABEL_PATH):
        img_class, _ = fname.split('.')
        print(f'Reading class: {img_class}')
        
        #if img_class not in ['clouds', 'flower']:
        #    continue
    
        # open the annotation
        with open(f'{LABEL_PATH}/{fname}', 'r') as fh:

            # get image ids from annotation file
            img_ids = fh.read().splitlines()

            # gather the images with labels
            i = 0
            for img_id in img_ids:
                img_path = f'{IMAGE_PATH}/im{img_id}.jpg'
                img = Image.open(img_path)
                img_data = np.asarray(img)
                
                # skip black-and-white images
                if not len(img_data.shape) == 3:
                    continue

                img_data = img_data.flatten().astype(np.float32)

                data.append([img_data, class_map[img_class]])
                
                #if i > 100:
                #    break
                i += 1

    return DataLoader(data, batch_size=bs, shuffle=True)

In [None]:
class TwoLayerModel(nn.Module):
    def __init__(self, n_input, n_hidden1, n_hidden2, n_classes):
        super().__init__()

        self.input_layer = nn.Linear(n_input, n_hidden1)
        self.hidden1 = nn.Linear(n_hidden1, n_hidden2)
        self.hidden2 = nn.Linear(n_hidden2, n_classes)
        self.relu = nn.ReLU()
        self.bn0 = nn.BatchNorm1d(n_input)
        self.bn1 = nn.BatchNorm1d(n_hidden1)
        self.bn2 = nn.BatchNorm1d(n_hidden2)

    def forward(self, x):
        x = self.bn0(x)
        x = self.input_layer(x)
        x = self.relu(x)
        x = self.bn1(x)
        x = self.hidden1(x)
        x = self.relu(x)
        x = self.bn2(x)
        x = self.hidden2(x)

        return x

In [None]:
class OneLayerModel(nn.Module):
    def __init__(self, n_input, n_hidden, n_classes):
        super().__init__()

        self.input_layer = nn.Linear(n_input, n_hidden)
        self.hidden = nn.Linear(n_hidden, n_classes)
        self.relu = nn.ReLU()
        self.bn0 = nn.BatchNorm1d(n_input)
        self.bn1 = nn.BatchNorm1d(n_hidden)

    def forward(self, x):
        x = self.bn0(x)
        x = self.input_layer(x)
        x = self.relu(x)
        x = self.bn1(x)
        x = self.hidden(x)

        return x

In [None]:
def train(dataloader, model, optimizer, criterion, device, n_epochs=50, losses=[]):

    model.train()

    for epoch in range(n_epochs):
        
        for i, batch in enumerate(dataloader):
            X, y = batch
            
            X = X.to(device)
            y = y.to(device)

            optimizer.zero_grad()
            y_pred = model(X)
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()
            
            losses.append(loss)

        print(f'Epoch: {epoch}, loss: {loss}')

In [None]:
use_cuda = True

device = torch.device('cuda') if use_cuda else torch.device('cpu')

lr = 0.01
n_epochs = 10
bs = 256

n_classes = len(get_class_map().keys())

#model = TwoLayerModel(128*128*3, 128, 64, n_classes).to(device)
model = OneLayerModel(128*128*3, 128, n_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

In [None]:
dataloader = get_dataloader(bs)

In [None]:
train(dataloader, model, optimizer, criterion, device, n_epochs)