In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch
import pickle
import numpy as np
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
import cv2 as cv
import os 
import sys
import matplotlib.pyplot as plt

sys.path.append(os.path.dirname(os.path.abspath('.')))
from unet import UNet
from AttnUNet import AttU_Net
from ResUNet import Res_UNet

Custom Dataset Class

In [None]:
class LaneDataset(Dataset):
    '''Expects x and y to be np arrays
    x.shape=(num_samples,80,160,3)
    y.shape=(num_samples,80,160,1)
    converts them to pytroch (3,80,160) and (1,80,160)'''
    def __init__(self,images,labels):
        super().__init__()
        self.images = images
        self.labels=labels

    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img = torch.tensor(self.images[idx],dtype=torch.float).permute(2,0,1) #change to (3,80,160)
        label = torch.tensor(self.labels[idx],dtype=torch.float).permute(2,0,1) #change to (1,80,160)
        return img,label

In [None]:
train_pickle=pickle.load(open('data/full_CNN_train.p','rb'))
train_labels=pickle.load(open('data/full_CNN_labels.p','rb'))
train_features=np.array(train_pickle)
train_labels=np.array(train_labels)/255 #normalize
# train_features,train_labels=shuffle(train_features,train_labels)
X_train, X_val, y_train, y_val=train_test_split(train_features,train_labels)
train_dataset=LaneDataset(X_train, y_train)
val_dataset=LaneDataset(X_val,y_val)
train_loader=DataLoader(train_dataset,batch_size=128,shuffle=True)
val_loader=DataLoader(val_dataset,batch_size=128,shuffle=False)
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def plot_loss(train_losses,num_epochs):
    plt.figure(figsize=(8, 5))
    plt.plot(range(1, num_epochs + 1), train_losses, color='green', label='Train Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

Model

#### AttUnet Training

In [None]:
att_unet_model = AttU_Net(3, 1).to(device)

In [None]:
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(att_unet_model.parameters(),lr=1e-4)
num_epochs=100
train_losses=[]
for epoch in range(num_epochs):
    att_unet_model.train()
    running_loss=0
    for batch_x,batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device),batch_y.to(device)

        optimizer.zero_grad()
        output=att_unet_model(batch_x)
        loss=criterion(output,batch_y)
        loss.backward()
        optimizer.step()
        running_loss+=loss.item()
    print(f'Epoch {epoch+1}, loss: {running_loss/len(train_loader)}')
    avg_train_loss = running_loss / len(train_loader)

    train_losses.append(avg_train_loss)
plot_loss(train_losses,num_epochs)
# torch.save(att_unet_model.state_dict(),'weights/att_unet_model_100_1e4.pth')

In [None]:
torch.save(att_unet_model.state_dict(),'weights/att_unet_model_100_1e4.pth')

#### UNet training

In [None]:
unet_model = UNet(3,1).to(device)

In [None]:
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(unet_model.parameters(),lr=1e-4)

num_epochs=100
train_losses=[]
for epoch in range(num_epochs):
    unet_model.train()
    running_loss=0
    for batch_x,batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device),batch_y.to(device)

        optimizer.zero_grad()
        output=unet_model(batch_x)
        loss=criterion(output,batch_y)
        loss.backward()
        optimizer.step()
        running_loss+=loss.item()
    print(f'Epoch {epoch+1}, loss: {running_loss/len(train_loader)}')
    avg_train_loss = running_loss / len(train_loader)
    train_losses.append(avg_train_loss)

plot_loss(train_losses,num_epochs)
torch.save(unet_model.state_dict(),'weights/unet_model_100_1e4.pth')

#### ResNet training

In [None]:
res_unet_model = Res_UNet(3,1).to(device)

In [None]:
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(res_unet_model.parameters(),lr=1e-4)
num_epochs=100
train_losses=[]
for epoch in range(num_epochs):
    res_unet_model.train()
    running_loss=0
    for batch_x,batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device),batch_y.to(device)

        optimizer.zero_grad()
        output=res_unet_model(batch_x)
        loss=criterion(output,batch_y)
        loss.backward()
        optimizer.step()
        running_loss+=loss.item()
    print(f'Epoch {epoch+1}, loss: {running_loss/len(train_loader)}')

    avg_train_loss = running_loss / len(train_loader)
    train_losses.append(avg_train_loss)

plot_loss(train_losses,num_epochs)
torch.save(res_unet_model.state_dict(),'weights/resunet_model_100_1e4.pth')

Test

In [None]:
import cv2 as cv
# from moviepy import VideoFileClip
import numpy as np

In [None]:

class Lanes():
    def __nit__(self):
        self.recent_fit = []
        self.avg_fit = []

def road_lines(image,model,lanes):
    small_img = cv.imresize(image,(3,80,160))
    small_img = np.array(small_img)
    small_img = small_img[None,:,:,:]

    prediction = model.eval()
    lanes.recent_fit.append(prediction)

    if len(lanes.recent_fit) >5:
        lanes.recent_fit = lanes.recent_fit[1:]
    
    lanes.avg_fit = np.mean(np.array([i for i in lanes.recent_fit]),axis=0)

    blanks = np.zeros_like(lanes.avg_fit).astype(np.uint8)
    lane_drawn = np.dstack((blanks, lanes.avg_fit, blanks))

    lane_image = cv.imresize(lane_drawn, (720,1280,3))
    result = cv.addWeighted(image,1,lane_image,1,0)

    return result
# lanes= Lanes()
# vid_input = VideoFileClip(r'.mp4')
# vid_output = 'output_test.mp4'

# vid_clip = vid_input.fl_image(road_lines)
# vid_clip.write_videofile(vid_output)

In [None]:
unet_model.load_state_dict(torch.load('weights/unet_model_100_1e4.pth',map_location=device))

In [None]:
res_unet_model.load_state_dict(torch.load('weights/res_unet_model_100_1e4.pth',map_location=device))

In [None]:
att_unet_model.load_state_dict(torch.load('weights/att_unet_model_100_1e4.pth',map_location=device))

In [None]:
import matplotlib.pyplot as plt

In [None]:
def infer(model):
    model.eval() #AttU_model, UNet_model, ResUNet_model
    outputs=[]
    y_trues=[]
    batches_x=[]
    with torch.no_grad():
        for batch_x,batch_y in val_loader:
            batch_x, batch_y = batch_x.to(device),batch_y.to(device)
            output=(model(batch_x)>0.5)
            outputs.append(output.int().detach().cpu().numpy())
            y_trues.append(batch_y.int().detach().cpu().numpy())
            batches_x.append(batch_x.int().detach().cpu().numpy())
        outputs=np.concatenate(outputs)
        y_trues=np.concatenate(y_trues)
        batches_x=np.concatenate(batches_x)
    return outputs, y_trues, batches_x

In [None]:
def plot_lanes(outputs,y_trues,batches_x,idx):
    fig, axes = plt.subplots(ncols=1, nrows=4, figsize=(8,16))

    img = batches_x[idx]                
    img=np.transpose(img,(1,2,0))    
    y_true=y_trues[idx].squeeze()
    output=outputs[idx].squeeze()*2
    error=np.abs(y_true-output)              

    axes[0].imshow(img)
    axes[0].set_title("Input")
    # axes[0].axis('off')

    axes[1].imshow(y_true, cmap='gray') 
    axes[1].set_title("Label")
    # axes[1].axis('off')

    axes[2].imshow(output, cmap='gray') 
    axes[2].set_title("Prediction")
    # axes[2].axis('off')

    axes[3].imshow(error, cmap='RdYlGn_r') 
    axes[3].set_title("Absolute Error")
    # axes[3].axis('off')

    plt.subplots_adjust(hspace=0.2)

In [None]:
res_unet_outputs,y_trues,batches_x=infer(res_unet_model)
plot_lanes(res_unet_outputs,y_trues,batches_x,8)

In [None]:
att_unet_outputs,y_trues,batches_x=infer(att_unet_model)
plot_lanes(att_unet_outputs,y_trues,batches_x,8)

In [None]:
unet_outputs,y_trues,batches_x=infer(unet_model)
plot_lanes(unet_outputs,y_trues,batches_x,8)

In [None]:
from sklearn.metrics import jaccard_score, f1_score

In [None]:
def get_metrics(model_name,y_true,y_pred):
    print(y_true.shape,y_pred.shape)
    y_true=y_true.flatten()
    y_pred=y_pred.flatten()
    print(y_true.shape,y_pred.shape)
    #iou
    iou=jaccard_score(y_true,y_pred)
    #dcs
    dcs=(2*iou)/(1+iou)
    #f1
    f1=f1_score(y_true,y_pred)
    print(f'{model_name}, iou: {iou}, dcs {dcs}, f1: {f1}')

In [None]:
#unet
get_metrics('unet',y_trues,unet_outputs)

In [None]:
#resunet
get_metrics('resunet',y_trues,res_unet_outputs)

In [None]:
#attunet
get_metrics('attunet',y_trues,att_unet_outputs)

In [None]:
'''UNet_model.eval()
lanes=Lanes()
clip_input=VideoFileClip('input_clip.mp4')
vid_output='output_video.mp4'
def process_frame(frame):
    frame_bgr=cv.cvtColor(frame,cv.COLOR_RGB2BGR)
    output_bgr=road_lines(frame_bgr,UNet_model,lanes)
    return cv.cvtColor(output_bgr,cv.COLOR_BGR2RGB)
vid_clip=clip_input.fl_image(process_frame)
vid_clip.write_videofile(vid_output,audio=False)'''