#Download the github repo

In [None]:
!git clone https://github.com/pairlab/csc375-w22-assignments.git

#P1 Semantic Segmentation

In [None]:
!pip install pypng colormap easydev

In [None]:
%cd /content/csc375-w22-assignments/HW3/p1
!mkdir output_train output_test models

In [None]:
import numpy as np
import os
import png
import torch
from tqdm import tqdm
from torch.utils.data.dataset import Dataset
from PIL import Image

class FacadeDataset(Dataset):
    def __init__(
        self, 
        flag, 
        dataDir='data', 
        data_range=(0, 8), 
        n_class=5, 
        onehot=False
    ):
        self.onehot = onehot
        print("load "+ flag+" dataset start")
        print("    from: %s" % dataDir)
        print("    range: [%d, %d)" % (data_range[0], data_range[1]))
        self.dataset = []
        for i in range(data_range[0], data_range[1]):
            img = Image.open(os.path.join(dataDir,flag,'eecs442_%04d.jpg' % i))
            pngreader = png.Reader(filename=os.path.join(dataDir,flag,'eecs442_%04d.png' % i))
            w,h,row,info = pngreader.read()
            label = np.array(list(row)).astype('uint8')
            img = np.asarray(img).astype("f").transpose(2, 0, 1)/128.0-1.0
            label_ = np.asarray(label)
            label = np.zeros((n_class, img.shape[1], img.shape[2])).astype("i")
            for j in range(n_class):
                label[j, :] = label_ == j
            self.dataset.append((img, label))
        print("load dataset done")

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        img, label = self.dataset[index]
        label = torch.FloatTensor(label)
        if not self.onehot:
            label = torch.argmax(label, dim=0)
        else:
            label = label.long()

        return torch.FloatTensor(img), torch.LongTensor(label)


In [None]:
import os
import time

import cv2
import matplotlib.pyplot as plt
import numpy as np
import png
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from colormap.colors import Color, hex2rgb
from sklearn.metrics import average_precision_score as ap_score
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms
from tqdm import tqdm

N_CLASS=5

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.n_class = N_CLASS
        self.layers = nn.Sequential(
            #########################################
            ###        TODO: Add more layers      ###
            #########################################
            nn.Conv2d(3, self.n_class, 1, padding=0),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.layers(x)
        return x


def save_label(label, path):
    '''
    Function for ploting labels.
    '''
    colormap = [
        '#000000',
        '#0080FF',
        '#80FF80',
        '#FF8000',
        '#FF0000',
    ]
    assert(np.max(label)<len(colormap))
    colors = [hex2rgb(color, normalise=False) for color in colormap]
    w = png.Writer(label.shape[1], label.shape[0], palette=colors, bitdepth=4)
    with open(path, 'wb') as f:
        w.write(f, label)

def train(trainloader, net, criterion, optimizer, device, epoch):
    '''
    Function for training.
    '''
    start = time.time()
    running_loss = 0.0
    net = net.train()
    for images, labels in tqdm(trainloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        output = net(images)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        running_loss = loss.item()
    end = time.time()
    print('[epoch %d] loss: %.3f elapsed time %.3f' %
          (epoch, running_loss, end-start))

def test(testloader, net, criterion, device):
    '''
    Function for testing.
    '''
    losses = 0.
    cnt = 0
    with torch.no_grad():
        net = net.eval()
        for images, labels in tqdm(testloader):
            images = images.to(device)
            labels = labels.to(device)
            output = net(images)
            loss = criterion(output, labels)
            losses += loss.item()
            cnt += 1
    print(losses / cnt)
    return (losses/cnt)


def cal_AP(testloader, net, criterion, device):
    '''
    Calculate Average Precision
    '''
    losses = 0.
    cnt = 0
    with torch.no_grad():
        net = net.eval()
        preds = [[] for _ in range(5)]
        heatmaps = [[] for _ in range(5)]
        for images, labels in tqdm(testloader):
            images = images.to(device)
            labels = labels.to(device)
            output = net(images).cpu().numpy()
            for c in range(5):
                preds[c].append(output[:, c].reshape(-1))
                heatmaps[c].append(labels[:, c].cpu().numpy().reshape(-1))

        aps = []
        for c in range(5):
            preds[c] = np.concatenate(preds[c])
            heatmaps[c] = np.concatenate(heatmaps[c])
            if heatmaps[c].max() == 0:
                ap = float('nan')
            else:
                ap = ap_score(heatmaps[c], preds[c])
                aps.append(ap)
            print("AP = {}".format(ap))
    return None


def get_result(testloader, net, device, folder='output_train'):
    result = []
    cnt = 1
    with torch.no_grad():
        net = net.eval()
        cnt = 0
        for images, labels in tqdm(testloader):
            images = images.to(device)
            labels = labels.to(device)
            output = net(images)[0].cpu().numpy()
            c, h, w = output.shape
            assert(c == N_CLASS)
            y = np.zeros((h,w)).astype('uint8')
            for i in range(N_CLASS):
                mask = output[i]>0.5
                y[mask] = i
            gt = labels.cpu().data.numpy().squeeze(0).astype('uint8')
            save_label(y, './{}/y{}.png'.format(folder, cnt))
            save_label(gt, './{}/gt{}.png'.format(folder, cnt))
            plt.imsave(
                './{}/x{}.png'.format(folder, cnt),
                ((images[0].cpu().data.numpy()+1)*128).astype(np.uint8).transpose(1,2,0))

            cnt += 1

def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # TODO: Adjust batch_size for loaders
    train_data   = FacadeDataset(flag='train', data_range=(0,800), onehot=False)
    train_loader = DataLoader(train_data, batch_size=1)
    val_data     = FacadeDataset(flag='train', data_range=(801,906), onehot=False)
    val_loader   = DataLoader(val_data, batch_size=1)

    test_data    = FacadeDataset(flag='test_dev', data_range=(0,114), onehot=False)
    test_loader  = DataLoader(test_data, batch_size=1)
    ap_data      = FacadeDataset(flag='test_dev', data_range=(0,114), onehot=True)
    ap_loader    = DataLoader(ap_data, batch_size=1)

    name = 'starter_net'
    net = Net().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), 1e-3, weight_decay=1e-5)

    print('\nStart training')
    for epoch in range(2): # TODO: Change the number of epochs
        print('-----------------Epoch = %d-----------------' % (epoch+1))
        train(train_loader, net, criterion, optimizer, device, epoch+1)
        test(val_loader, net, criterion, device)

    print('\nFinished Training, Testing on test set')
    test(test_loader, net, criterion, device)
    print('\nGenerating Unlabeled Result')
    result = get_result(test_loader, net, device, folder='output_test')

    torch.save(net.state_dict(), 'models/model_{}.pth'.format(name))

    cal_AP(ap_loader, net, criterion, device)

if __name__ == "__main__":
    main()

#P2 Object Detection

In [None]:
%cd /content/csc375-w22-assignments/HW3/p2

In [None]:
def notebook_init():
    # For  notebooks
    print('Checking setup...')
    from IPython import display  # to display images and clear console output

    from utils.general import emojis
    from utils.torch_utils import select_device  # imports

    display.clear_output()
    select_device(newline=False)
    print(emojis('Setup complete ✅'))
    return display

display = notebook_init() 

In [None]:
!pip install -r requirements.txt

## Training

1. Open `csc375-w22-assignments/HW3/p2/train.py` and implement model parameter freezing at line 128 - line 130.

2. After finishing step 1, train the YOLOv3 model on COCO128 for 5 epochs, freezing 10 layers, by running the below cell.

In [None]:
!python train.py --img 640 --batch 16 --epochs 5 --data coco128.yaml --weights yolov3.pt --cache --freeze 10

## Visualization

Set up the visualization by running the below cell. Note that if you ran the training loop multiple times, you would have additional folder exp2, exp3, etc.

In [None]:
!python detect.py --weights runs/train/exp/weights/best.pt --img 640 --conf 0.25 --source data/images

Display Results

In [None]:
display.Image(filename='runs/detect/exp/Cats_and_dog.jpg', width=600)

In [None]:
display.Image(filename='runs/detect/exp/bus.jpg', width=600)

#P3 Object Pose Estimation

In [None]:
%cd /content/csc375-w22-assignments/HW3/p3

In [None]:
import numpy as np
import os
import png
import torch
from tqdm import tqdm
from torch.utils.data.dataset import Dataset
from PIL import Image
import pickle

class PyBulletDataset(Dataset):
    def __init__(
        self, 
        flag, 
        dataDir='data'
    ):
        gt_pose = pickle.load(open(os.path.join(dataDir, flag,'gt_poses.pkl'), 'rb'))
        self.dataset = []
        if flag == 'train':
            num_images = 1000
        else:
            num_images = 100
        for i in range(num_images):
            img = Image.open(os.path.join(dataDir, flag,'{:05d}.png'.format(i)))
            img = np.asarray(img).astype("f").transpose(2, 0, 1)/128.0-1.0
            pos, quat = gt_pose['{:05d}'.format(i)]
            self.dataset.append((img, pos, quat))
        print("load dataset done")

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        img, pos, quat = self.dataset[index]
        return torch.FloatTensor(img), torch.FloatTensor(pos), torch.FloatTensor(quat)

In [None]:
import os
import time

import cv2
import matplotlib.pyplot as plt
import numpy as np
import png
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from colormap.colors import Color, hex2rgb
from sklearn.metrics import average_precision_score as ap_score
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms
from tqdm import tqdm

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2),
            nn.Conv2d(32, 64, 5, stride=2),
            nn.Conv2d(64, 128, 5, stride=2),
            nn.Conv2d(128, 256, 5, stride=2),
            nn.Conv2d(256, 512, 5, stride=2),
            nn.ReLU(inplace=True)
        )

        self.pos_head = nn.Sequential(
            #########################################
            ###        TODO: Add more layers      ###
            #########################################
            nn.Linear(512, 3)
        )

        self.quat_head = nn.Sequential(
            #########################################
            ###        TODO: Add more layers      ###
            #########################################
            nn.Linear(512, 4)
        )

    def forward(self, images):
        f = self.layers(images)
        f = F.avg_pool2d(f, (12, 17))
        f = f.view(-1, f.size(1))
        pos = self.pos_head(f)
        quat = self.quat_head(f)
        return pos, quat

def train(trainloader, net, criterion, optimizer, device, epoch):
    '''
    Function for training.
    '''
    start = time.time()
    running_loss = 0.0
    net = net.train()
    for images, gt_pos, gt_quat in tqdm(trainloader):
        images = images.to(device)
        gt_pos = gt_pos.to(device)
        gt_quat = gt_quat.to(device)
        optimizer.zero_grad()
        pred_pos, pred_quat = net(images)
        pos_loss = criterion(pred_pos, gt_pos)
        quat_loss = criterion(pred_quat, gt_quat)
        loss = pos_loss + quat_loss
        loss.backward()
        optimizer.step()
        running_loss = loss.item()
    end = time.time()
    print('[epoch %d] loss: %.3f elapsed time %.3f' %
          (epoch, running_loss, end-start))

def test(testloader, net, criterion, device):
    '''
    Function for testing.
    '''
    losses = 0.
    cnt = 0
    with torch.no_grad():
        net = net.eval()
        for images, gt_pos, gt_quat in tqdm(testloader):
            images = images.to(device)
            gt_pos = gt_pos.to(device)
            gt_quat = gt_quat.to(device)
            pred_pos, pred_quat = net(images)
            pos_loss = criterion(pred_pos, gt_pos)
            quat_loss = criterion(pred_quat, gt_quat)
            loss = pos_loss + quat_loss
            losses += loss.item()
            cnt += 1
    print('Test loss:', losses / cnt)
    return (losses/cnt)

def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # TODO: Adjust batch_size for loaders
    train_data   = PyBulletDataset(flag='train')
    train_loader = DataLoader(train_data, batch_size=1)
    val_data     = PyBulletDataset(flag='val')
    val_loader   = DataLoader(val_data, batch_size=1)
    test_data    = PyBulletDataset(flag='test')
    test_loader  = DataLoader(test_data, batch_size=1)

    net = Net().to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(net.parameters(), 1e-3, weight_decay=1e-5)

    print('\nStart training')
    for epoch in range(2): # TODO: Change the number of epochs
        print('-----------------Epoch = %d-----------------' % (epoch+1))
        train(train_loader, net, criterion, optimizer, device, epoch+1)
        test(val_loader, net, criterion, device)
        
    print('\nFinished Training, Testing on test set')
    test(test_loader, net, criterion, device)
    
if __name__ == "__main__":
    main()