In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# ==============================================================================
#                      FULL FINE-TUNING & INFERENCE SCRIPT (Corrected)
# ==============================================================================
# This script is self-contained. It loads a pre-trained model, fine-tunes it
# on a higher resolution, and then runs inference on a folder of custom images.
# ==============================================================================

# --- Imports ---
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import cv2
import os
import json
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
import glob

# ==============================================================================
# ### STEP 1: CONFIGURATION & SETUP ###
# ==============================================================================
print("--- STEP 1: CONFIGURATION & SETUP ---")

# --- Paths ---
PRE_TRAINED_MODEL_PATH = "/kaggle/input/unet-lane/pytorch/default/1/unet_lane_detection_final.pth"
TUSIMPLE_DATA_PATH = "/kaggle/input/tusimple/TUSimple/train_set/"
TRAIN_JSON = os.path.join(TUSIMPLE_DATA_PATH, 'label_data_0313.json')
VAL_JSON = os.path.join(TUSIMPLE_DATA_PATH, 'label_data_0531.json')
INFERENCE_IMAGE_FOLDER = "/kaggle/input/traffic-dataset/traffic_wala_dataset/valid/images"
FT_MODEL_SAVE_PATH = "/kaggle/working/unet_lane_FINETUNED.pth"

# --- Fine-Tuning Hyperparameters ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
FT_IMAGE_HEIGHT = 640
FT_IMAGE_WIDTH  = 640
FT_LEARNING_RATE = 1e-5
FT_NUM_EPOCHS = 10
FT_BATCH_SIZE = 1

print(f"Using device: {DEVICE}")
print(f"Loading pre-trained model from: {PRE_TRAINED_MODEL_PATH}")

# ==============================================================================
# ### STEP 2: HELPER CLASSES & FUNCTIONS (ALL DEFINITIONS HERE) ###
# ==============================================================================
print("\n--- STEP 2: DEFINING HELPER CLASSES & FUNCTIONS ---")

# --- Model Definition: U-NET ---
class DoubleConv(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.conv=nn.Sequential(nn.Conv2d(in_c, out_c, 3, 1, 1, bias=False), nn.BatchNorm2d(out_c), nn.ReLU(inplace=True), nn.Conv2d(out_c, out_c, 3, 1, 1, bias=False), nn.BatchNorm2d(out_c), nn.ReLU(inplace=True))
    def forward(self, x): return self.conv(x)

class UNET(nn.Module):
    def __init__(self, in_c=3, out_c=1, fts=[64, 128, 256, 512]):
        super().__init__()
        self.ups, self.downs = nn.ModuleList(), nn.ModuleList(); self.pool = nn.MaxPool2d(2, 2)
        for ft in fts: self.downs.append(DoubleConv(in_c, ft)); in_c = ft
        for ft in reversed(fts):
            self.ups.append(nn.ConvTranspose2d(ft*2, ft, 2, 2)); self.ups.append(DoubleConv(ft*2, ft))
        self.bottleneck = DoubleConv(fts[-1], fts[-1]*2); self.final_conv = nn.Conv2d(fts[0], out_c, 1)
    def forward(self, x):
        skips = [];
        for down in self.downs: x = down(x); skips.append(x); x = self.pool(x)
        x = self.bottleneck(x); skips = skips[::-1]
        for i in range(0, len(self.ups), 2):
            x = self.ups[i](x); skip = skips[i//2]; x = torch.cat((skip, x), dim=1); x = self.ups[i+1](x)
        return self.final_conv(x)

# --- Dataset Class: TuSimpleLaneDataset ---
class TuSimpleLaneDataset(Dataset):
    def __init__(self, data_path, json_path, transform=None):
        self.data_path = data_path
        self.transform = transform
        self.samples = []
        with open(json_path) as f:
            for line in f:
                sample_info = json.loads(line)
                # Pre-filter to ensure files exist (makes it more robust)
                image_path = os.path.join(self.data_path, sample_info['raw_file'])
                mask_path = image_path.replace('/clips/', '/seg_label/').replace('.jpg', '.png')
                if os.path.exists(image_path) and os.path.exists(mask_path):
                    self.samples.append(sample_info)
    def __len__(self): return len(self.samples)
    def __getitem__(self, idx):
        sample_info = self.samples[idx]
        image_path = os.path.join(self.data_path, sample_info['raw_file'])
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask_path = image_path.replace('/clips/', '/seg_label/').replace('.jpg', '.png')
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = (mask > 0).astype("float32")
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask'].unsqueeze(0)
        return image, mask

# --- Loss Function: DiceLoss ---
class DiceLoss(nn.Module):
    def __init__(self):
        super(DiceLoss, self).__init__()
    def forward(self, inputs, targets, smooth=1):
        inputs = torch.sigmoid(inputs)
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        intersection = (inputs * targets).sum()
        total_sum = inputs.sum() + targets.sum()
        dice_coeff = (2.*intersection + smooth)/(total_sum + smooth)
        return 1 - dice_coeff

# --- Training & Validation Functions (These were missing before) ---
def train_fn(loader, model, optimizer, loss_fn):
    loop = tqdm(loader, desc="Fine-Tuning")
    for data, targets in loop:
        data, targets = data.to(device=DEVICE), targets.float().to(device=DEVICE)
        predictions = model(data)
        loss = loss_fn(predictions, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        loop.set_postfix(dice_loss=loss.item())

def check_dice_score(loader, model, device="cuda"):
    model.eval()
    total_intersection, total_sum_of_masks = 0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            preds = (torch.sigmoid(model(x)) > 0.5).float()
            total_intersection += (preds * y).sum().item()
            total_sum_of_masks += (preds.sum() + y.sum()).item()
    final_dice_score = (2. * total_intersection) / (total_sum_of_masks + 1e-8)
    model.train()
    return final_dice_score

# ==============================================================================
# ### STEP 3: FINE-TUNING STAGE ###
# ==============================================================================
print("\n--- STEP 3: FINE-TUNING THE MODEL ---")

# Define the high-resolution transform
fine_tune_transform = A.Compose([
    A.LongestMaxSize(max_size=max(FT_IMAGE_HEIGHT, FT_IMAGE_WIDTH)),
    A.PadIfNeeded(min_height=FT_IMAGE_HEIGHT, min_width=FT_IMAGE_WIDTH, border_mode=cv2.BORDER_CONSTANT),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
    ToTensorV2(),
])
ft_val_transform = A.Compose([
    A.LongestMaxSize(max_size=max(FT_IMAGE_HEIGHT, FT_IMAGE_WIDTH)),
    A.PadIfNeeded(min_height=FT_IMAGE_HEIGHT, min_width=FT_IMAGE_WIDTH, border_mode=cv2.BORDER_CONSTANT),
    A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
    ToTensorV2(),
])

# Create DataLoaders
ft_train_ds = TuSimpleLaneDataset(TUSIMPLE_DATA_PATH, TRAIN_JSON, transform=fine_tune_transform)
ft_val_ds = TuSimpleLaneDataset(TUSIMPLE_DATA_PATH, VAL_JSON, transform=ft_val_transform)
ft_train_loader = DataLoader(ft_train_ds, batch_size=FT_BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
ft_val_loader = DataLoader(ft_val_ds, batch_size=FT_BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

# Load the pre-trained model
model = UNET(in_c=3, out_c=1).to(DEVICE)
model.load_state_dict(torch.load(PRE_TRAINED_MODEL_PATH, map_location=DEVICE))

# Set up for fine-tuning
optimizer = optim.Adam(model.parameters(), lr=FT_LEARNING_RATE)
loss_fn = DiceLoss()
best_ft_dice_score = -1.0
    
# Run the fine-tuning loop
print("Starting fine-tuning...")
for epoch in range(FT_NUM_EPOCHS):
    print(f"--- Fine-Tuning Epoch {epoch+1}/{FT_NUM_EPOCHS} ---")
    train_fn(ft_train_loader, model, optimizer, loss_fn)
    current_dice_score = check_dice_score(ft_val_loader, model, device=DEVICE)
    print(f"Validation Dice Score: {current_dice_score:.4f}")
    
    if current_dice_score > best_ft_dice_score:
        best_ft_dice_score = current_dice_score
        torch.save(model.state_dict(), FT_MODEL_SAVE_PATH)
        print(f"--> New best fine-tuned model saved with Dice Score: {best_ft_dice_score:.4f}")

print(f"Fine-tuning finished. Best model saved to {FT_MODEL_SAVE_PATH}")

# ==============================================================================
# ### STEP 4: INFERENCE ON YOUR CUSTOM IMAGES ###
# ==============================================================================
print("\n--- STEP 4: RUNNING INFERENCE ON CUSTOM IMAGE FOLDER ---")

def predict_and_process_lanes_advanced(model, image_path, device):
    """
    This advanced function runs the U-Net prediction and then uses a more
    intelligent post-processing pipeline to merge broken lane segments.
    """
    original_image = cv2.imread(image_path)
    if original_image is None:
        print(f"Could not read image: {image_path}"); return None, None
        
    model.eval() # Set model to evaluation mode for inference
    original_dims = (original_image.shape[1], original_image.shape[0])
    image_rgb = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
    
    # Use the same transform as validation during fine-tuning
    transform = ft_val_transform
    input_tensor = transform(image=image_rgb)['image'].unsqueeze(0).to(device)
    
    with torch.no_grad():
        pred = torch.sigmoid(model(input_tensor)) > 0.5
        # Resize the prediction mask back to the original image dimensions
        raw_mask_resized = cv2.resize(pred.cpu().numpy().squeeze().astype(np.uint8), original_dims, interpolation=cv2.INTER_NEAREST)

    # Post-processing to find and fit curves to lanes
    kernel = np.ones((5, 5), np.uint8)
    cleaned_mask = cv2.morphologyEx(raw_mask_resized, cv2.MORPH_OPEN, kernel, iterations=2)
    num_labels, labels_mask, stats, _ = cv2.connectedComponentsWithStats(cleaned_mask, 4, cv2.CV_32S)
    
    final_lanes = []
    output_image = original_image.copy()
    for i in range(1, num_labels):
        if stats[i, cv2.CC_STAT_AREA] < 300: continue
        coords = np.argwhere(labels_mask == i)
        y_coords, x_coords = coords[:, 0], coords[:, 1]
        if len(y_coords) < 10: continue
        poly_coeffs = np.polyfit(y_coords, x_coords, 2)
        final_lanes.append(poly_coeffs)
        plot_y = np.linspace(np.min(y_coords), np.max(y_coords), 100)
        fit_x = np.polyval(poly_coeffs, plot_y)
        points = np.asarray([fit_x, plot_y]).T.astype(np.int32)
        color = np.random.randint(50, 255, size=3).tolist()
        cv2.polylines(output_image, [points], isClosed=False, color=color, thickness=5)
        
    return output_image, final_lanes

# Load the BEST fine-tuned model for inference
inference_model = UNET(in_c=3, out_c=1).to(DEVICE)
print(f"\nLoading BEST fine-tuned model for final inference: {FT_MODEL_SAVE_PATH}")
inference_model.load_state_dict(torch.load(FT_MODEL_SAVE_PATH, map_location=DEVICE))

# Get list of your images
image_paths = glob.glob(os.path.join(INFERENCE_IMAGE_FOLDER, "*.jpg"))
print(f"Found {len(image_paths)} images in your folder.")

# Loop and process each image
for image_path in image_paths[:5]: # Process first 5 images as an example
    print(f"\n--- Processing: {os.path.basename(image_path)} ---")
    
    final_image, lane_data = predict_and_process_lanes_advanced(inference_model, image_path, DEVICE)
    
    if final_image is not None:
        print(f"Successfully identified {len(lane_data)} distinct lanes.")
        plt.figure(figsize=(15, 10))
        plt.imshow(cv2.cvtColor(final_image, cv2.COLOR_BGR2RGB))
        plt.title(f"Result for {os.path.basename(image_path)}")
        plt.axis("off")
        plt.show()

print("\n--- Script Finished ---")

In [None]:
# ### STEP 4: INFERENCE ON YOUR CUSTOM IMAGES ###
# ==============================================================================
print("\n--- STEP 4: RUNNING INFERENCE ON CUSTOM IMAGE FOLDER ---")

def predict_and_process_lanes_advanced(model, image_path, device):
    """
    This advanced function runs the U-Net prediction and then uses a more
    intelligent post-processing pipeline to merge broken lane segments.
    """
    original_image = cv2.imread(image_path)
    if original_image is None:
        print(f"Could not read image: {image_path}"); return None, None
        
    model.eval() # Set model to evaluation mode for inference
    original_dims = (original_image.shape[1], original_image.shape[0])
    image_rgb = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
    
    # Use the same transform as validation during fine-tuning
    transform = ft_val_transform
    input_tensor = transform(image=image_rgb)['image'].unsqueeze(0).to(device)
    
    with torch.no_grad():
        pred = torch.sigmoid(model(input_tensor)) > 0.5
        # Resize the prediction mask back to the original image dimensions
        raw_mask_resized = cv2.resize(pred.cpu().numpy().squeeze().astype(np.uint8), original_dims, interpolation=cv2.INTER_NEAREST)
# Post-processing to find and fit curves to lanes
    kernel = np.ones((5, 5), np.uint8)
    cleaned_mask = cv2.morphologyEx(raw_mask_resized, cv2.MORPH_OPEN, kernel, iterations=2)
    num_labels, labels_mask, stats, _ = cv2.connectedComponentsWithStats(cleaned_mask, 4, cv2.CV_32S)
    
    final_lanes = []
    output_image = original_image.copy()
    for i in range(1, num_labels):
        if stats[i, cv2.CC_STAT_AREA] < 300: continue
        coords = np.argwhere(labels_mask == i)
        y_coords, x_coords = coords[:, 0], coords[:, 1]
        if len(y_coords) < 10: continue
        poly_coeffs = np.polyfit(y_coords, x_coords, 2)
        final_lanes.append(poly_coeffs)
        plot_y = np.linspace(np.min(y_coords), np.max(y_coords), 100)
        fit_x = np.polyval(poly_coeffs, plot_y)
        points = np.asarray([fit_x, plot_y]).T.astype(np.int32)
        color = np.random.randint(50, 255, size=3).tolist()
        cv2.polylines(output_image, [points], isClosed=False, color=color, thickness=5)
        
    return output_image, final_lanes

# Load the BEST fine-tuned model for inference
inference_model = UNET(in_c=3, out_c=1).to(DEVICE)
print(f"\nLoading BEST fine-tuned model for final inference: {FT_MODEL_SAVE_PATH}")
inference_model.load_state_dict(torch.load(FT_MODEL_SAVE_PATH, map_location=DEVICE))

# Get list of your images
image_paths = glob.glob(os.path.join(INFERENCE_IMAGE_FOLDER, "*.jpg"))
print(f"Found {len(image_paths)} images in your folder.")

# Loop and process each image
for image_path in image_paths[:5]: # Process first 5 images as an example
    print(f"\n--- Processing: {os.path.basename(image_path)} ---")
    
    final_image, lane_data = predict_and_process_lanes_advanced(inference_model, image_path, DEVICE)
    
    if final_image is not None:
        print(f"Successfully identified {len(lane_data)} distinct lanes.")
        plt.figure(figsize=(15, 10))
        plt.imshow(cv2.cvtColor(final_image, cv2.COLOR_BGR2RGB))
        plt.title(f"Result for {os.path.basename(image_path)}")
        plt.axis("off")
        plt.show()

print("\n--- Script Finished ---")

In [None]:
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Define your UNET model here (or import it if defined elsewhere)
# from model import UNET  # Uncomment if your UNET class is in a separate file

# --- Paths ---
INFERENCE_IMAGE_FOLDER = "/kaggle/input/traffic-dataset/traffic_wala_dataset/valid/images"
MODEL_SAVE_PATH = "/kaggle/input/unet-lane-finetuned-1/pytorch/default/1/unet_lane_FINETUNED.pth"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
IMAGE_HEIGHT = 640
IMAGE_WIDTH = 640

def predict_lanes(model_path, image_path, device):
    model = UNET(in_c=3, out_c=1)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()

    image = cv2.imread(image_path)
    if image is None:
        print(f"Failed to load image: {image_path}")
        return None, None

    original_dims = (image.shape[1], image.shape[0])
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    transform = A.Compose([
        A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
        A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
        ToTensorV2(),
    ])

    augmented = transform(image=image_rgb)
    input_tensor = augmented['image'].unsqueeze(0).to(device)

    with torch.no_grad():
        pred = torch.sigmoid(model(input_tensor)) > 0.5
        pred = pred.cpu().squeeze(0).squeeze(0).numpy()

    mask = (pred * 255).astype(np.uint8)
    mask_resized = cv2.resize(mask, original_dims, interpolation=cv2.INTER_NEAREST)

    overlay = image.copy()
    overlay[mask_resized != 0] = [0, 255, 0]
    final_image = cv2.addWeighted(overlay, 0.4, image, 0.6, 0)

    return final_image, mask_resized

# --- Run inference on all images in the folder ---
if os.path.exists(MODEL_SAVE_PATH):
    image_paths = glob(os.path.join(INFERENCE_IMAGE_FOLDER, "*.jpg"))

    for img_path in image_paths:
        print(f"\nRunning inference on: {img_path}")
        predicted_image, _ = predict_lanes(MODEL_SAVE_PATH, img_path, DEVICE)

        if predicted_image is not None:
            plt.figure(figsize=(10, 6))
            plt.imshow(cv2.cvtColor(predicted_image, cv2.COLOR_BGR2RGB))
            plt.title(f"Result: {os.path.basename(img_path)}")
            plt.axis("off")
            plt.show()
        else:
            print("Prediction failed.")
else:
    print("Model file not found. Skipping inference.")
