In [None]:
import random
import csv
import os
import os.path
import shutil
import cv2

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm # Displays a progress bar

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import Dataset, Subset, DataLoader, random_split

import pandas as pd
import h5py

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

!gdown https://drive.google.com/uc?id=1olKESqgAm5GeAmuRC21BpTBkxFrgmUIp
!unzip /content/Data

In [16]:
class DataNonNorm:
    def __init__(self, root):
        self.ROOT = root
        self.images = self.read_images(os.path.join(root, "Images"))
        self.labels = self.read_labels(os.path.join(root, "Labels"))

    def __len__(self):
        # Return number of points in the dataset
        return len(self.images)

    def __getitem__(self, idx):
        # Here we have to return the item requested by `idx`. The PyTorch DataLoader class will use this method to make an iterable for training/validation loop.
        img = self.images[idx] / 255
        #label = self.labels[idx]
        return img
  
    # Read Images
    def read_images(self, path:str) -> list:
        imgs = []
        with h5py.File(os.path.join(path, "imagedata.hdf5"), "r") as f:
            for i in sorted(f.keys(), key = int ):
                imgs.append(np.array(f.get(str(i))))
        #output = torch.tensor(output)
        return imgs
      
    # Read Labels
    def read_labels(self, path:str) -> list:
        output = []
        with h5py.File(os.path.join(path, "labeldata.hdf5"), "r") as f:
            labels = f["labels"][()]
        return labels       

# # Load the dataset and train and test splits
# print("Loading datasets...")

# # Data path
dataTrain = DataNonNorm(os.path.normpath('./Data/Train'))
dataTest  = DataNonNorm(os.path.normpath('./Data/Test'))

In [17]:
trainloader = DataLoader(dataTrain, batch_size=32, shuffle=True)
testloader = DataLoader(dataTest, batch_size=2, shuffle=True)

In [25]:
import math
# Get mean and std deviation of training and test
tnMean = 0.
tnStd = 0.
count = 0.
var = 0.
for data in trainloader:
    tnMean += np.mean(np.array(data))
    var += np.var(np.array(data))
    count += 1

tnMean /= count
tnStd = math.sqrt(var / count)

print(tnMean)
print(tnStd)

tstMean = 0.
tstStd = 0.
count = 0.
var = 0.
for data in testloader:
    tstMean += np.mean(np.array(data))
    var += np.var(np.array(data))
    count += 1

tstMean /= count
tstStd = math.sqrt(var / count)

print(tstMean)
print(tstStd)

0.6531141332511887
0.26932273407019464
0.6535949342601184
0.2643014432779285


In [None]:
class Data:

    def __init__(self, root, mn, std):
        self.ROOT = root
        self.images = self.read_images(os.path.join(root, "Images"))
        self.labels = self.read_labels(os.path.join(root, "Labels"))

        self.transform = MyTransform = transforms.Compose([
            transforms.ToTensor(), # Transform from [0,255] uint8 to [0,1] float
            transforms.Normalize((mn, mn, mn), (std, std, std)) # TODO: Normalize to zero mean and unit variance with appropriate parameters 0.5
            ])

    def __len__(self):
        # Return number of points in the dataset
        return len(self.images)

    def __getitem__(self, idx):
        # Here we have to return the item requested by `idx`. The PyTorch DataLoader class will use this method to make an iterable for training/validation loop.
        img = self.transform(self.images[idx])
        # img = self.images[idx] / 255
        label = self.labels[idx]
        return img, label
  
    # Read Images
    def read_images(self, path:str) -> list:
        imgs = []
        with h5py.File(os.path.join(path, "imagedata.hdf5"), "r") as f:
            for i in sorted(f.keys(), key = int ):
                imgs.append(np.array(f.get(str(i))))
        #output = torch.tensor(output)
        return imgs
      
    # Read Labels
    def read_labels(self, path:str) -> list:
        output = []
        with h5py.File(os.path.join(path, "labeldata.hdf5"), "r") as f:
            labels = f["labels"][()]
        return labels       

# Load the dataset and train and test splits
print("Loading datasets...")

# Data path
dataTrain = Data(os.path.normpath('./Data/Train'), tnMean, tnStd)
dataTest  = Data(os.path.normpath('./Data/Test'),tstMean, tstStd)

In [None]:
print(dataTrain.__len__())
print(len(dataTrain.labels))

In [None]:
from PIL import Image

with h5py.File(os.path.join('./Data/Train/Images', "imagedata.hdf5"), "r") as f:
  img = Image.fromarray(np.array(f.get("1")))
display(img)

In [None]:
# print("Done!")

# # Create dataloaders
# # TODO: Experiment with different batch sizes
trainloader = DataLoader(dataTrain, batch_size=32, shuffle=True, drop_last=True)
testloader = DataLoader(dataTest, batch_size=2, shuffle=True, drop_last=True)

In [None]:
from torchvision import datasets, transforms
import torchvision.models as models

In [None]:
class Network(nn.Module):
    def __init__(self):
        super().__init__()
        # TODO: [Transfer learning with pre-trained ResNet-50] 1) Define how many first layers of convolutoinal neural network (CNN) feature extractor in ResNet-50 to be "frozen" and 2) design your own fully-connected network (FCN) classifier.
        # 1) You will only refine last several layers of CNN feature extractor in ResNet-50 that mainly relate to high-level vision task. Determine how many first layers of ResNet-50 should be frozen to achieve best performances. Commented codes below will help you understand the architecture, i.e., "children", of ResNet-50.
        # 2) Design your own FCN classifier. Here I provide a sample of two-layer FCN.
        # Refer to PyTorch documentations of torch.nn to pick your layers. (https://pytorch.org/docs/stable/nn.html)
        # Some common Choices are: Linear, ReLU, Dropout, MaxPool2d, AvgPool2d
        # If you have many layers, consider using nn.Sequential() to simplify your code
        
        # Load pretrained ResNet-50
        self.model_resnet = models.resnet50(pretrained=True)
        
        # The code below can show children of ResNet-50
        # child_counter = 0
        # for child in model.children():
        #    print(" child", child_counter, "is -")
        #    print(child)
        #    child_counter += 1
        
        # TODO: Determine how many first layers of ResNet-50 to freeze
        child_counter = 0
        for child in self.model_resnet.children():
            if child_counter < 6:
                for param in child.parameters():
                    param.requires_grad = False
            elif child_counter == 6:
                children_of_child_counter = 0
                for children_of_child in child.children():
                    if children_of_child_counter < 3:
                        for param in children_of_child.parameters():
                            param.requires_grad = False
                    else:
                        children_of_child_counter += 1
            else:
                print("child ",child_counter," was not frozen")
            child_counter += 1
        
        # Set ResNet-50's FCN as an identity mapping
        num_fc_in = self.model_resnet.fc.in_features
        self.model_resnet.fc = nn.Identity()
        
        # TODO: Design your own FCN
        self.fc1 = nn.Linear(num_fc_in, 20, bias = 3) # from input of size num_fc_in to output of size ?
        self.bn1 = nn.BatchNorm1d(num_features=20)
        self.fc2 = nn.Linear(20, 3, bias = 3) # from hidden layer to 3 class scores

    def forward(self,x):
        # TODO: Design your own network, implement forward pass here
        
        relu = nn.ReLU() # No need to define self.relu because it contains no parameters
        
        with torch.no_grad():
            features = self.model_resnet(x)
            
        x = self.fc1(features) # Activation are flattened before being passed to the fully connected layers
        x = self.bn1(x)
        x = relu(x)
        x = self.fc2(x)
        
        # The loss layer will be applied outside Network class
        return x

device = "cuda" if torch.cuda.is_available() else "cpu" # Configure device
model= Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer (note: CrossEntropyLoss already includes LogSoftMax())
# TODO: Modify the line below, experiment with different optimizers and parameters (such as learning rate)
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=.001, weight_decay=0.004) # Specify optimizer and assign trainable parameters to it, weight_decay is L2 regularization strength (default: lr=1e-2, weight_decay=1e-4)
num_epochs = 30 # TODO: Choose an appropriate number of training epochs

def train(model, loader, num_epoch = num_epochs): # Train the model
    print("Start training...")
    model.train() # Set the model to training mode
    for i in range(num_epoch):
        running_loss = []
        for batch, label in tqdm(loader, position=0, leave=True):
            batch = batch.to(device)
            label = label.to(device)
            optimizer.zero_grad() # Clear gradients from the previous iteration
            pred = model(batch) # This will call Network.forward() that you implement
            loss = criterion(pred, label) # Calculate the loss
            running_loss.append(loss.item())
            loss.backward() # Backprop gradients to all tensors in the network
            optimizer.step() # Update trainable weights
        print("Epoch {} loss:{}".format(i+1,np.mean(running_loss))) # Print the average loss for this epoch
    print("Done!")

def evaluate(model, loader): # Evaluate accuracy on validation / test set
    model.eval() # Set the model to evaluation mode
    correct = 0
    with torch.no_grad(): # Do not calculate grident to speed up computation
        for batch, label in tqdm(loader, position=0, leave=True):
            batch = batch.to(device)
            label = label.to(device)
            pred = model(batch)
            correct += (torch.argmax(pred,dim=1)==label).sum().item()
    acc = correct/len(loader.dataset)
    print("Evaluation accuracy: {}".format(acc))
    return acc
    
train(model, trainloader, num_epochs)
print("Evaluate on test set")
evaluate(model, testloader)

In [None]:
# Load pretrained ResNet-50
model_resnet = models.resnet50(pretrained=True)

# The code below can show children of ResNet-50
child_counter = 0
for child in model_resnet.children():
    print(" child", child_counter, "is -")
    print(child)
    child_counter += 1