# Fully Convolutional Network

In [23]:
# Author : joono
# Date : 2022-02-10

## Import Libraries

In [2]:
import pandas as pd
import numpy as np
import os
import random
import tensorflow as tf
import cv2
from tqdm import tqdm
import datetime

from  matplotlib import pyplot as plt
import matplotlib.image as mpimg
from IPython.display import clear_output
%matplotlib inline

from IPython.display import HTML
from base64 import b64encode

import torch
import torch.nn as nn
import torchvision 
import torchvision.transforms as transforms
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import regex

## Load Dataset

### Source Dataset

In [3]:
# Load directories
train_data_dir = "../input/kittiroadsegmentation/training/image_2/"
train_gt_dir = "../input/kittiroadsegmentation/training/gt_image_2/"

test_data_dir = "../input/kittiroadsegmentation/testing/"

In [4]:
# Number of training examples
TRAINSET_SIZE = int(len(os.listdir(train_data_dir)) * 0.8)
print(f"Number of Training Examples: {TRAINSET_SIZE}")

VALIDSET_SIZE = int(len(os.listdir(train_data_dir)) * 0.1)
print(f"Number of Validation Examples: {VALIDSET_SIZE}")

TESTSET_SIZE = int(len(os.listdir(train_data_dir)) - TRAINSET_SIZE - VALIDSET_SIZE)
print(f"Number of Testing Examples: {TESTSET_SIZE}")

In [5]:
# Initialize Constants
IMG_SIZE = 224
N_CHANNELS = 3
N_CLASSES = 1
SEED = 22022

In [6]:
# Function to load image and return a dictionary
def parse_image(img_path: str) -> dict:
    image = Image.open(img_path)
    
    # Three types of img paths: um, umm, uu
    # gt image paths: um_road, umm_road, uu_road
    mask_path = img_path.replace("image_2", "gt_image_2")
    mask_path = mask_path.replace("um_", "um_road_")
    mask_path = mask_path.replace("umm_", "umm_road_")
    mask_path = mask_path.replace("uu_", "uu_road_")
    
    mask = Image.open(mask_path)
    
    return {'image': image, 'mask': mask}

In [20]:
randimg = random.choice(os.listdir(train_data_dir))

data = parse_image(os.path.join(train_data_dir, randimg))

img = data["image"]
gt = data["mask"]

print(img.size, gt.size)

plt.figure(figsize=(20, 10))
plt.subplot(121)
plt.imshow(img)
plt.subplot(122)
plt.imshow(gt)

gt = np.array(gt)

In [8]:
class KITTYRoadSegDataset(Dataset):
    """Auth: joono, pytorch version of tf dataset builing"""
    
    def __init__(self, data_dir, transform=None):
        super().__init__()
        self.imgs = [os.path.join(data_dir, img) for img in os.listdir(data_dir)] 
        self.transforms = transform
        
    def __len__(self):
        return len(self.imgs)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = self.imgs[idx]
        data = parse_image(img_name)

        sample = dict()
        if self.transforms:
            sample["image"] = self.transforms(data["image"])
            
            mask = self.transforms(data["mask"])
            sample["mask"] = mask[2]

        return sample
        

In [9]:
trainset = KITTYRoadSegDataset(
    data_dir=train_data_dir,
    transform=transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor()
    ])
)

trainloader = DataLoader(
    dataset=trainset,
    batch_size=8,
    shuffle=True,
    num_workers=2,
    drop_last=True,
)

## Define Network

In [10]:
class VGG16_FCN8(nn.Module):
    def __init__(self, n_classes):
        super().__init__()
        self.n_classes = n_classes
        
        self.W = IMG_SIZE
        self.H = IMG_SIZE
        
        self.vgg = nn.Sequential(*list(torchvision.models.vgg16(pretrained=True).children()))
        self.pool3 = pool3 = self.vgg[0][:17]
        self.pool4 = pool4 = self.vgg[0][17:24]
        self.pool5 = pool5 = self.vgg[0][24:]
        
        # Wout, Hout = Win * scale_factor, Hin * scale_factor
        self.upsampling2 = nn.UpsamplingBilinear2d(scale_factor=2)
        self.upsampling8 = nn.UpsamplingBilinear2d(scale_factor=8)
        
        self.conv = nn.Conv2d(512, 256, kernel_size=(1, 1))
        
        self.fc_layer = nn.Sequential(
            nn.Conv2d(256, self.n_classes, kernel_size=(1, 1)),
            nn.BatchNorm2d(self.n_classes),
            nn.ReLU(inplace=True),
        )
        
        
    def forward(self, x):
        p3 = self.pool3(x)
        p4 = self.pool4(p3)
        p5 = self.pool5(p4)
        
        u1 = self.upsampling2(p5)
        d1 = torch.add(p4, u1)
        d1 = self.conv(d1)
        
        u2 = self.upsampling2(d1)
        d2 = torch.add(p3, u2)
        
        d3 = self.upsampling8(d2)
        
        out = self.fc_layer(d3)
        out = out.view(out.shape[0], self.W, self.H)
        
        return out

## Training

### Loss Function

In [11]:
model = VGG16_FCN8(n_classes=1)
model = model.to("cuda")
model = model.train()

lr = 1e-3

optimizer = optim.Adam(model.parameters(), lr=lr)
criteria = nn.MSELoss().to("cuda")

epochs = 200

### Train Model

In [12]:
for epoch in range(epochs):
    total_loss = 0
    for data in tqdm(trainloader):
        image, mask = data["image"], data["mask"]
        image, mask = image.to("cuda"), mask.to("cuda")
        
        pred = model(image)
        loss = criteria(pred, mask)
        
        total_loss += loss / 16 # batchsize
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    torch.save(model, "vgg16-fcn8.pt")
    print(f"epoch: {epoch+1:04d} | loss: {total_loss:.4f}")

## Testing (Test Dataset)

In [16]:
# Function to view video
from IPython.display import HTML
from base64 import b64encode

def play(filename):
    html = ''
    video = open(filename,'rb').read()
    src = 'data:video/mp4;base64,' + b64encode(video).decode()
    html += f'<video width=1000 controls autoplay loop><source src="{src}" type="video/mp4"></video>' 
    return HTML(html)

# Function to calculate mask over image
def weighted_img(img, initial_img, α=1., β=0.5, γ=0.):
    return cv2.addWeighted(initial_img, α, img, β, γ)

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((IMG_SIZE, IMG_SIZE))
])

# Function to process an individual image
def process_image(image):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(image)
    
    # Preprocess image
    img = transform(img)
    img = img.unsqueeze_(0)
    img = img.to("cuda")
    
    # Get the binary mask
    pred_mask = model(img)
    pred_mask = pred_mask.detach().cpu().numpy()
    
    pred_mask = pred_mask.transpose((1, 2, 0))
    mask = np.round_(pred_mask)
    
    # Convert to mask image
    zero_image = np.zeros_like(mask)
    mask = np.dstack((mask, zero_image, zero_image)) * 255
    mask = np.asarray(mask, np.uint8)
    
    # Get the final image|
    image = np.asarray(image, np.uint8)
    image = cv2.resize(image, (IMG_SIZE, IMG_SIZE))
    
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    mask = cv2.cvtColor(mask, cv2.COLOR_RGB2BGR)
    
    final_image = weighted_img(mask, image)
    final_image = cv2.resize(final_image, (1280, 720))

    return final_image

In [22]:
# Make a new directory
if not "videos" in os.listdir("."):
    os.mkdir("videos")
    print("mkdir videos")

# Creating a VideoCapture object to read the video
# project_video = "challenge.mp4"
# project_video = "challenge_video.mp4"
project_video = "harder_challenge_video.mp4"


original_video = cv2.VideoCapture(test_data_dir + project_video)
frame_width = int(original_video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(original_video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = original_video.get(cv2.CAP_PROP_FPS)

print(f"Wori: {frame_width}, Hori: {frame_height}, fps: {fps}")

# Define the codec and create VideoWriter object.The output is stored in 'outpy.avi' file.
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
output = cv2.VideoWriter("./videos/"+project_video, fourcc, fps, (frame_width,frame_height))

# Process Video
while(original_video.isOpened()):
    ret, frame = original_video.read()

    if ret == True:
        # Write the frame into the file 'output.avi'
        output.write(process_image(frame))
    else:
        break

# When everything done, release the video capture and video write objects
original_video.release()
output.release()

In [18]:
# play("../input/kittiroadsegmentation/testing/challenge_video.mp4")
!ls
play("./videos/challenge.mp4")

In [None]:
play("videos/" + project_video)

## References

- [Kitti Dataset Processing](http://ronny.rest/blog/post_2017_09_06_kitti_road_data/)
- [Image Segmentation on Keras](https://yann-leguilly.gitlab.io/post/2019-12-14-tensorflow-tfdata-segmentation/)