<a href="https://colab.research.google.com/github/devashishbotre/Autonomous-Lane-Detector/blob/main/scripts/Model_Evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

manideep1108_tusimple_path = kagglehub.dataset_download('manideep1108/tusimple')
kingster9_models_path = kagglehub.dataset_download('kingster9/models')

print('Data source import complete.')


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import os
import json
import cv2
import time
from sklearn.metrics import f1_score, accuracy_score
from model_module import RESA, Decoder, LaneNet

IMG_HEIGHT = 720
IMG_WIDTH = 1280
NUM_CLASSES = 2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class RESA(nn.Module):
    def __init__(self, in_channels):
        super(RESA, self).__init__()
        self.conv = nn.Conv2d(in_channels, in_channels, 1, padding=0)
        self.bn = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.refine_conv = nn.Conv2d(in_channels, in_channels, 3, padding=1, groups=in_channels)
        self.refine_bn = nn.BatchNorm2d(in_channels)
        self.shifts = [
            (1, 0), (2, 0), (3, 0),
            (-1, 0), (-2, 0), (-3, 0),
            (0, -1), (0, -2), (0, -3),
            (0, 1), (0, 2), (0, 3)
        ]

        distances = torch.tensor([(i**2 + j**2) for i, j in self.shifts], dtype=torch.float32)
        self.weights = torch.exp(-distances / (2 * 1.5**2)).to(device)

    def forward(self, x):
        contributions = []
        for (shift_h, shift_w), weight in zip(self.shifts, self.weights):
            shifted = torch.roll(x, shifts=(shift_h, shift_w), dims=(2, 3))
            contrib = self.relu(self.bn(self.conv(shifted)))
            contrib = self.relu(self.refine_bn(self.refine_conv(contrib)))
            contributions.append(contrib * weight)

        out = x + 0.5 * sum(contributions) / sum(self.weights)
        return out

class Decoder(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Decoder, self).__init__()
        self.Up1 = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // 2, 3, padding=1),
            nn.BatchNorm2d(in_channels // 2),
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        )
        self.Up2 = nn.Sequential(
            nn.Conv2d(in_channels // 2, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        )

        self.smooth_conv = nn.Conv2d(out_channels, out_channels, 5, padding=2, groups=out_channels)
        self.smooth_bn = nn.BatchNorm2d(out_channels)

        self.thin1 = nn.Conv2d(in_channels // 2, in_channels // 2, 3, padding=1, groups=in_channels // 2)
        self.thin2 = nn.Conv2d(out_channels, out_channels, 3, padding=1, groups=out_channels)

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.Up1(x)
        x = x + torch.sigmoid(self.thin1(x)) * x
        x = self.Up2(x)
        x = x + torch.sigmoid(self.thin2(x)) * x

        x = self.relu(self.smooth_bn(self.smooth_conv(x)))
        return x

class LaneNet(nn.Module):
    def __init__(self, num_classes=NUM_CLASSES, k_iterations=4):
        super(LaneNet, self).__init__()
        resnet = models.resnet34(weights=models.ResNet34_Weights.DEFAULT)
        self.encoder = nn.Sequential(
            resnet.conv1,
            resnet.bn1,
            resnet.relu,
            resnet.maxpool,
            resnet.layer1,
            resnet.layer2,
            resnet.layer3,
            resnet.layer4
        )

        self.resa_layers = nn.ModuleList([RESA(512) for _ in range(k_iterations)])
        self.decoder = Decoder(512, 256)
        self.seg_head = nn.Conv2d(256, num_classes, 1)

    def forward(self, x):
        x = self.encoder(x)
        for resa in self.resa_layers:
            x = resa(x)
        x = self.decoder(x)
        seg_out = self.seg_head(x)
        seg_out = F.interpolate(seg_out, size=(IMG_HEIGHT, IMG_WIDTH), mode='bilinear', align_corners=True)
        seg_out = F.avg_pool2d(seg_out, kernel_size=3, padding=1, stride=1)
        return seg_out

In [None]:
class TuSimpleDataset(Dataset):
    def __init__(self, json_files, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.annotations = []
        total_annotations = 0
        for json_file in json_files:
            if not os.path.exists(json_file):
                print(f"Warning: JSON file not found: {json_file}")
                continue
            with open(json_file, 'r') as f:
                lines = f.readlines()
                total_annotations += len(lines)
                for line in lines:
                    ann = json.loads(line)
                    img_path = os.path.join(self.img_dir, ann['raw_file'])
                    if os.path.exists(img_path):
                        self.annotations.append(ann)
                    else:
                        print(f"Warning: Image not found: {img_path}")
        print(f"Total annotations in JSON: {total_annotations}")
        print(f"Valid images found: {len(self.annotations)}")

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        ann = self.annotations[idx]
        img_path = os.path.join(self.img_dir, ann['raw_file'])

        # Load and resize image
        image = cv2.imread(img_path)
        if image is None:
            print(f"Error: Failed to load image: {img_path}")
            image = np.zeros((IMG_HEIGHT, IMG_WIDTH, 3), dtype=np.uint8)
        image = cv2.resize(image, (IMG_WIDTH, IMG_HEIGHT))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        mask = np.zeros((IMG_HEIGHT, IMG_WIDTH), dtype=np.uint8)
        for lane in ann['lanes']:
            points = [(x, y) for x, y in zip(lane, ann['h_samples']) if x != -2 and 0 <= y < IMG_HEIGHT and 0 <= x < IMG_WIDTH]
            if len(points) > 1:
                points = np.array(points, dtype=np.int32)
                cv2.polylines(mask, [points], False, 1, 1)

        image = torch.from_numpy(image.transpose(2, 0, 1)).float() / 255.0
        mask = torch.from_numpy(mask).long()

        return image, mask, ann

def collate_fn(batch):
    images, masks, anns = zip(*batch)
    images = torch.stack(images, 0)
    masks = torch.stack(masks, 0)
    return images, masks, anns

In [None]:
def evaluate_model(model, device, data_loader):
    model.eval()
    total_f1, total_acc, total_time = 0.0, 0.0, 0.0
    num_samples = 0

    with torch.no_grad():
        for images, masks, _ in tqdm(data_loader, desc="Evaluating on Training Data"):
            images, masks = images.to(device), masks.to(device)

            start_time = time.time()
            outputs = model(images)
            end_time = time.time()
            inference_time = end_time - start_time

            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            masks = masks.cpu().numpy()

            batch_size = images.size(0)
            for i in range(batch_size):
                pred_flat = preds[i].flatten()
                mask_flat = masks[i].flatten()

                f1 = f1_score(mask_flat, pred_flat, average='binary', zero_division=0)
                acc = accuracy_score(mask_flat, pred_flat)

                total_f1 += f1
                total_acc += acc
                total_time += inference_time / batch_size
                num_samples += 1

    avg_f1 = total_f1 / num_samples
    avg_acc = total_acc / num_samples
    avg_inference_time = total_time / num_samples

    return avg_f1, avg_acc, avg_inference_time

In [None]:
model = LaneNet(num_classes=NUM_CLASSES).to(device)
model_path = "/kaggle/input/models/lane_model_final (2).pth"
if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path, map_location=device, weights_only=True))
    print(f"Loaded model weights from {model_path}")
else:
    raise FileNotFoundError(f"Model weights not found at {model_path}")

Loaded model weights from /kaggle/input/models/lane_model_final (2).pth


In [None]:
data_dir = '/kaggle/input/tusimple/TUSimple/train_set'
json_files = [os.path.join(data_dir, f) for f in ['label_data_0313.json',
                                                     'label_data_0531.json',
                                                     'label_data_0601.json']]
train_dataset = TuSimpleDataset(json_files, data_dir)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=False, num_workers=4, collate_fn=collate_fn)

Total annotations in JSON: 3626
Valid images found: 3626


In [None]:
avg_f1, avg_acc, avg_inference_time = evaluate_model(model, device, train_loader)

print(f"Average F1 Score (Training Data): {avg_f1:.4f}")
print(f"Average Accuracy (Training Data): {avg_acc:.4f}")
print(f"Average Inference Time per Image (Training Data): {avg_inference_time:.6f} seconds")

Evaluating on Training Data: 100%|██████████| 907/907 [43:44<00:00,  2.89s/it]

Average F1 Score (Training Data): 0.3197
Average Accuracy (Training Data): 0.9913
Average Inference Time per Image (Training Data): 0.005836 seconds



