In [31]:
import numpy as np
import torch
import timeit
import numpy as np
import torch.nn.functional as F
import torchvision.transforms.functional as TF
import torch.nn as nn
import torch.utils.data as data
from skimage.draw import polygon
import sys
!pip install paramparse
import paramparse
from google.colab import drive

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [32]:
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

        
class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

def segmentate(images):
    """
    :param np.ndarray images: N x 12288 array containing N 64x64x3 images flattened into vectors
    :return: np.ndarray
    """
    N = images.shape[0]
    # pred_seg: Your predicted segmentation for the image, shape [N, 4096]
    pred_seg = np.empty((N, 4096), dtype=np.int32)
    # add your code here to fill in pred_seg
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    images = images.reshape([images.shape[0], 64, 64, 3])
    images = np.transpose(images, (0, 3, 1, 2))
    model = UNet(3,11).to(device)
    from google.colab import drive
    drive.mount('/content/gdrive/', force_remount=True)
    model.load_state_dict(torch.load("/content/gdrive/My Drive/visual_recognition_data/checkpoint_15.pth", map_location=device))
    model = model.to(device)
    
    for i in range(N):
        image = torch.as_tensor(images[i]).float()
        logit = model(image.to(device).view(-1,3,64,64))
        pred = logit.argmax(1).view(-1).long().cpu().numpy()
        pred_seg[i,:] = pred
        if i % 100 == 0:
            print('Evaluating: [{}/{} ({:.0f}%)]\n'.format(i, N, (i/N*100)))
    
    
    return pred_seg

In [33]:
def compute_seg(pred, gt):
    # pred value should be from 0 to 10, where 10 is the background.
    # accuracy is calculated for only non background pixels.
    assert pred.shape == gt.shape
    mask = gt != 10
    return (pred[mask] == gt[mask]).astype(int).sum() / gt[mask].size

In [34]:
class A8_Params:
    def __init__(self):
        # self.prefix = "test"
        self.prefix = "valid"
        # self.prefix = "train"
        self.vis = 0
        self.vis_size = (300, 300)
        self.show_pred = 1
        self.speed_thresh = 10
        self.seg_thresh = (0.7, 0.98)

In [35]:
def compute_score(res, thresh):
    min_thres, max_thres = thresh
    if res < min_thres:
        score = 0.0
    elif res > max_thres:
        score = 100.0
    else:
        score = float(res - min_thres) / (max_thres - min_thres) * 100
    return score

In [36]:
def main():
    params = A8_Params()
    prefix = params.prefix
    drive.mount('/content/gdrive/', force_remount=True)
    images = np.load("/content/gdrive/My Drive/visual_recognition_data/" + prefix + "_X.npy")
    gt_segs = np.load("/content/gdrive/My Drive/visual_recognition_data/" + prefix + "_seg.npy")
    n_images = images.shape[0]
    print(f'running on {n_images} {prefix} images')
    start_t = timeit.default_timer()
    pred_segs = segmentate(images)
    end_t = timeit.default_timer()
    test_time = end_t - start_t
    assert test_time > 0, "test_time cannot be 0"
    test_speed = float(n_images) / test_time
    seg = compute_seg(pred_segs, gt_segs)
    seg_score = compute_score(seg, params.seg_thresh)
    if test_speed < params.speed_thresh:
        overall_score = 0
    else:
        overall_score = seg_score
    print(f"Segmentation Accuracy: {seg:.3f}")
    print(f"Test time: {test_time:.3f} seconds")
    print(f"Test speed: {test_speed:.3f} images / second")
    print(f"Overall Score: {overall_score:.3f}")
    
main()

Mounted at /content/gdrive/
running on 5000 valid images
Mounted at /content/gdrive/


















































Segmentation Accuracy: 0.947
Test time: 41.503 seconds
Test speed: 120.473 images / second
Overall Score: 88.204
