In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F

import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torchvision.transforms as transforms


from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from einops import rearrange # ! pip install einops

In [2]:
class ImageDataset(Dataset):
    def __init__(self, csv, train, test):

        self.csv = csv
        self.train = train
        self.test = test
        self.all_image_names = self.csv[:]["filename"]
        self.all_labels = np.array(self.csv.drop(["filename", "ind"], axis=1))
        self.train_ratio = int(0.85 * len(self.csv))
        self.valid_ratio = len(self.csv) - self.train_ratio

        # set the training data images and labels
        if self.train == True:
            print(f"Number of training images: {self.train_ratio}")
            self.image_names = list(self.all_image_names[: self.train_ratio])
            self.labels = list(self.all_labels[: self.train_ratio])
            # define the training transforms
            self.transform = transforms.Compose([transforms.ToTensor(),])

        # set the validation data images and labels
        elif self.train == False and self.test == False:
            print(f"Number of validation images: {self.valid_ratio}")
            self.image_names = list(self.all_image_names[-self.valid_ratio : -10])
            self.labels = list(self.all_labels[-self.valid_ratio :])
            # define the validation transforms
            self.transform = transforms.Compose([transforms.ToTensor(),])

        # set the test data images and labels, only last 10 images
        # this, we will use in a separate inference script
        elif self.test == True and self.train == False:
            self.image_names = list(self.all_image_names[-10:])
            self.labels = list(self.all_labels[-10:])
            # define the test transforms
            self.transform = transforms.Compose([transforms.ToTensor(),])

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, index):
        image = cv2.imread(f"../data/raw_images/{self.image_names[index]}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # apply image transforms
        image = self.transform(image)
        targets = self.labels[index]

        return {
            "image": torch.tensor(image, dtype=torch.float32),
            "label": torch.tensor(targets, dtype=torch.float32),
        }

In [18]:
# Used in creating data loaders as well as in train loops
BATCH_SIZE = 1

In [4]:
train_csv = pd.read_csv("../data/processed/train.csv")
# train dataset
train_data = ImageDataset(train_csv[:1000], train=True, test=False)
# validation dataset
valid_data = ImageDataset(train_csv[:1000], train=False, test=False)
# train data loader
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
# validation data loader
valid_loader = DataLoader(valid_data, batch_size=BATCH_SIZE, shuffle=False)

Number of training images: 849
Number of validation images: 150


### Model definition

In [6]:
N_STEPS = 144
N_INPUTS = 768
N_NEURONS = 150
N_OUTPUTS = 6
N_EPHOCS = 10


LR = 0.001
PATCH_SIZE = 16

In [7]:
class ImageRNN(nn.Module):
    def __init__(self, batch_size, n_steps, n_inputs, n_neurons, n_outputs):
        super(ImageRNN, self).__init__()

        self.n_neurons = n_neurons
        self.batch_size = batch_size
        self.n_steps = n_steps
        self.n_inputs = n_inputs
        self.n_outputs = n_outputs

        self.basic_rnn = nn.RNN(self.n_inputs, self.n_neurons)

        self.FC = nn.Linear(self.n_neurons, self.n_outputs)

    def init_hidden(self,):
        # (num_layers, batch_size, n_neurons)
        return torch.zeros(1, self.batch_size, self.n_neurons)

    def forward(self, X):
        # transforms X to dimensions: n_steps X batch_size X n_inputs
        X = X.permute(1, 0, 2)

        self.batch_size = X.size(1)
        self.hidden = self.init_hidden()

        lstm_out, self.hidden = self.basic_rnn(X, self.hidden)
        out = self.FC(self.hidden)

        return out.view(-1, self.n_outputs)  # batch_size X n_output

### Check if inference works correctly

In [10]:
dataiter = iter(train_loader)
images, labels = dataiter.next().values()
model = ImageRNN(BATCH_SIZE, N_STEPS, N_INPUTS, N_NEURONS, N_OUTPUTS)
img_patches = rearrange(
    images.view(1, 3, 144, 256),
    "b c (patch_x x) (patch_y y) -> b (x y) (patch_x patch_y c)",
    patch_x=PATCH_SIZE,
    patch_y=PATCH_SIZE,
)
logits = model(img_patches)
print(logits)
torch.sigmoid(logits)

tensor([[-0.0302, -0.4416, -0.3831,  0.0461,  0.1974, -0.1136]],
       grad_fn=<ViewBackward>)


  "image": torch.tensor(image, dtype=torch.float32),


tensor([[0.4925, 0.3914, 0.4054, 0.5115, 0.5492, 0.4716]],
       grad_fn=<SigmoidBackward>)

### Model instantiation

In [17]:
import torch.optim as optim

# Device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Model instance
model = ImageRNN(BATCH_SIZE, N_STEPS, N_INPUTS, N_NEURONS, N_OUTPUTS)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=LR)


def get_accuracy(logit, target, batch_size):
    """ Obtain accuracy for training round """
    # FIXME: The below does not account for batch_size > 1 - change the definition accordingly

    pred = logit > 0.5
    actual = target == 1
    acc = ((pred & actual).sum().item() / target.shape[1]) * 100
    return acc

### Train

In [13]:
for epoch in range(N_EPHOCS):  # loop over the dataset multiple times
    train_running_loss = 0.0
    train_acc = 0.0
    model.train()

    # TRAINING ROUND
    for i, data in enumerate(train_loader):
        # zero the parameter gradients
        optimizer.zero_grad()

        # reset hidden states
        model.hidden = model.init_hidden()

        # get the inputs
        inputs, labels = data.values()
        img_patches = rearrange(
            inputs.view(BATCH_SIZE, 3, 144, 256),
            "b c (patch_x x) (patch_y y) -> b (x y) (patch_x patch_y c)",
            patch_x=PATCH_SIZE,
            patch_y=PATCH_SIZE,
        )

        # forward + backward + optimize
        outputs = model(img_patches)
        outputs = torch.sigmoid(outputs)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_running_loss += loss.detach().item()
        train_acc += get_accuracy(outputs, labels, BATCH_SIZE)

    model.eval()
    print(
        "Epoch:  %d | Loss: %.4f | Train Accuracy: %.2f"
        % (epoch, train_running_loss / i, train_acc / i)
    )

  "image": torch.tensor(image, dtype=torch.float32),


Epoch:  0 | Loss: 0.3985 | Train Accuracy: 7.53
Epoch:  1 | Loss: 0.2935 | Train Accuracy: 13.50
Epoch:  2 | Loss: 0.2391 | Train Accuracy: 15.29
Epoch:  3 | Loss: 0.1991 | Train Accuracy: 16.71
Epoch:  4 | Loss: 0.1688 | Train Accuracy: 17.30
Epoch:  5 | Loss: 0.1482 | Train Accuracy: 17.96
Epoch:  6 | Loss: 0.1313 | Train Accuracy: 18.30
Epoch:  7 | Loss: 0.1436 | Train Accuracy: 17.85
Epoch:  8 | Loss: 0.1174 | Train Accuracy: 18.71
Epoch:  9 | Loss: 0.1139 | Train Accuracy: 18.87


### Test Accuracy

In [16]:
test_acc = 0.0
for i, data in enumerate(valid_loader, 0):
    inputs, labels = data.values()
    img_patches = rearrange(
        inputs.view(BATCH_SIZE, 3, 144, 256),
        "b c (patch_x x) (patch_y y) -> b (x y) (patch_x patch_y c)",
        patch_x=PATCH_SIZE,
        patch_y=PATCH_SIZE,
    )

    outputs = model(img_patches)

    test_acc += get_accuracy(outputs, labels, BATCH_SIZE)

print("Test Accuracy: %.2f" % (test_acc / i))

  "image": torch.tensor(image, dtype=torch.float32),


Test Accuracy: 2.16


### Sample image patch generation

In [None]:
# ! pip install einops

# import matplotlib.pyplot as plt
# import numpy as np

# functions to show an image
# def imshow(img):
#     # img = img / 2 + 0.5     # unnormalize
#     npimg = img.numpy()
#     plt.imshow(np.transpose(npimg, (1, 2, 0)))

# get some random training images
# dataiter = iter(train_loader)
# images, labels = dataiter.next().values()
# imshow(images[0])
# p = 16
# img_patches = rearrange(
#     images[0].view(1, 3, 144, 256),
#     "b c (patch_x x) (patch_y y) -> b (x y) (patch_x patch_y c)",
#     patch_x=p,
#     patch_y=p,
# )
# img_patches.shape