## 1. Install Dependencies

In [None]:
# Install required packages
!pip install torch torchvision numpy opencv-python matplotlib pillow pyyaml tqdm scipy

## 2. Setup Path and Imports

In [None]:
import sys
import os

# Add project root to path
project_root = os.path.dirname(os.getcwd())
if project_root not in sys.path:
    sys.path.insert(0, project_root)

print(f"Project root: {project_root}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import cv2
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

# Check device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 3. Create Synthetic Lane Dataset

In [None]:
def create_synthetic_lane_image(width=640, height=640, num_lanes=2):
    """
    Create a synthetic image with lane markings.
    
    Returns:
        image: RGB image with lanes
        lane_coeffs: Polynomial coefficients for each lane [num_lanes, 4]
        lane_exists: Binary mask indicating which lanes exist [num_lanes]
    """
    # Create gray road background
    image = np.ones((height, width, 3), dtype=np.uint8) * 80
    
    # Add some texture/noise
    noise = np.random.randint(-10, 10, (height, width, 3), dtype=np.int16)
    image = np.clip(image.astype(np.int16) + noise, 0, 255).astype(np.uint8)
    
    # Random lane parameters
    # Lane center offset from image center
    center_offset = np.random.uniform(-50, 50)
    # Lane width in pixels
    lane_width = np.random.uniform(200, 300)
    # Slight curvature
    curvature = np.random.uniform(-0.0005, 0.0005)
    
    # Calculate lane positions
    left_x0 = width/2 + center_offset - lane_width/2
    right_x0 = width/2 + center_offset + lane_width/2
    
    # Draw lanes
    lane_coeffs = np.zeros((4, 4), dtype=np.float32)  # Max 4 lanes
    lane_exists = np.zeros(4, dtype=np.float32)
    
    for lane_idx, base_x in enumerate([left_x0, right_x0]):
        if lane_idx >= num_lanes:
            break
            
        # Store normalized polynomial coefficients
        # x = c0 + c1*y + c2*y^2 + c3*y^3 (in normalized coords)
        lane_coeffs[lane_idx] = [
            (base_x - width/2) / width,  # c0: lateral offset
            0.0,                          # c1: linear term
            curvature * 1000,             # c2: quadratic term
            0.0                           # c3: cubic term
        ]
        lane_exists[lane_idx] = 1.0
        
        # Draw the lane line
        points = []
        for y in range(height//3, height, 5):
            # Apply polynomial
            y_norm = y / height
            x = base_x + curvature * (y - height/2)**2
            
            if 0 < x < width:
                points.append((int(x), y))
        
        # Draw lane line
        for i in range(len(points) - 1):
            cv2.line(image, points[i], points[i+1], (255, 255, 255), 3)
    
    return image, lane_coeffs, lane_exists


# Test synthetic image generation
test_img, test_coeffs, test_exists = create_synthetic_lane_image()
plt.figure(figsize=(8, 8))
plt.imshow(test_img)
plt.title('Synthetic Lane Image')
plt.axis('off')
plt.show()

print(f"Lane coefficients shape: {test_coeffs.shape}")
print(f"Lane exists: {test_exists}")

In [None]:
class SyntheticLaneDataset(torch.utils.data.Dataset):
    """Synthetic lane dataset for testing."""
    
    def __init__(self, num_samples=1000, image_size=(640, 640)):
        self.num_samples = num_samples
        self.image_size = image_size
    
    def __len__(self):
        return self.num_samples
    
    def __getitem__(self, idx):
        # Generate synthetic image
        image, lane_coeffs, lane_exists = create_synthetic_lane_image(
            width=self.image_size[1],
            height=self.image_size[0],
            num_lanes=2
        )
        
        # Convert to tensor
        image = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        lane_coeffs = torch.from_numpy(lane_coeffs)
        lane_exists = torch.from_numpy(lane_exists)
        
        return {
            'image': image,
            'polynomials': lane_coeffs,
            'lane_exists': lane_exists,
        }


# Create dataset
train_dataset = SyntheticLaneDataset(num_samples=500, image_size=(320, 320))
val_dataset = SyntheticLaneDataset(num_samples=100, image_size=(320, 320))

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=8, shuffle=False)

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")

## 4. Create a Simple Lane Detection Model

In [None]:
class SimpleLaneModel(nn.Module):
    """
    Simple CNN for lane detection (for quick testing).
    For production, use the full LaneYOLO model.
    """
    
    def __init__(self, num_lanes=4, num_coeffs=4):
        super().__init__()
        self.num_lanes = num_lanes
        self.num_coeffs = num_coeffs
        
        # Simple encoder
        self.encoder = nn.Sequential(
            # 320x320x3 -> 160x160x32
            nn.Conv2d(3, 32, 3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            
            # 160x160x32 -> 80x80x64
            nn.Conv2d(32, 64, 3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            
            # 80x80x64 -> 40x40x128
            nn.Conv2d(64, 128, 3, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            
            # 40x40x128 -> 20x20x256
            nn.Conv2d(128, 256, 3, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            
            # 20x20x256 -> 10x10x512
            nn.Conv2d(256, 512, 3, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            
            # Global average pooling
            nn.AdaptiveAvgPool2d(1),
        )
        
        # Polynomial coefficient head
        self.poly_head = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(256, num_lanes * num_coeffs),
        )
        
        # Lane existence head
        self.exist_head = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(128, num_lanes),
            nn.Sigmoid(),
        )
    
    def forward(self, x):
        features = self.encoder(x)
        
        # Polynomial coefficients [B, num_lanes, num_coeffs]
        poly = self.poly_head(features)
        poly = poly.view(-1, self.num_lanes, self.num_coeffs)
        
        # Lane existence [B, num_lanes]
        existence = self.exist_head(features)
        
        return {
            'polynomials': poly,
            'confidences': existence,
        }


# Create model
model = SimpleLaneModel(num_lanes=4, num_coeffs=4).to(device)

# Count parameters
num_params = sum(p.numel() for p in model.parameters())
print(f"Model parameters: {num_params / 1e6:.2f}M")

# Test forward pass
test_input = torch.randn(2, 3, 320, 320).to(device)
test_output = model(test_input)
print(f"Polynomial output shape: {test_output['polynomials'].shape}")
print(f"Confidence output shape: {test_output['confidences'].shape}")

## 5. Training Loop

In [None]:
def train_one_epoch(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0
    
    for batch in tqdm(dataloader, desc="Training"):
        images = batch['image'].to(device)
        gt_poly = batch['polynomials'].to(device)
        gt_exists = batch['lane_exists'].to(device)
        
        # Forward pass
        outputs = model(images)
        
        # Polynomial loss (MSE for existing lanes)
        poly_loss = nn.MSELoss(reduction='none')(outputs['polynomials'], gt_poly)
        poly_loss = (poly_loss.mean(dim=-1) * gt_exists).sum() / (gt_exists.sum() + 1e-6)
        
        # Existence loss (BCE)
        exist_loss = nn.BCELoss()(outputs['confidences'], gt_exists)
        
        # Total loss
        loss = poly_loss + exist_loss
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)


def validate(model, dataloader, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for batch in dataloader:
            images = batch['image'].to(device)
            gt_poly = batch['polynomials'].to(device)
            gt_exists = batch['lane_exists'].to(device)
            
            outputs = model(images)
            
            poly_loss = nn.MSELoss(reduction='none')(outputs['polynomials'], gt_poly)
            poly_loss = (poly_loss.mean(dim=-1) * gt_exists).sum() / (gt_exists.sum() + 1e-6)
            exist_loss = nn.BCELoss()(outputs['confidences'], gt_exists)
            
            loss = poly_loss + exist_loss
            total_loss += loss.item()
    
    return total_loss / len(dataloader)

In [None]:
# Training setup
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=0.01)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=20)

# Training loop
num_epochs = 20
train_losses = []
val_losses = []

print("Starting training...")
for epoch in range(num_epochs):
    train_loss = train_one_epoch(model, train_loader, optimizer, device)
    val_loss = validate(model, val_loader, device)
    scheduler.step()
    
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

print("\nTraining complete!")

In [None]:
# Plot training curves
plt.figure(figsize=(10, 4))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Progress')
plt.legend()
plt.grid(True)
plt.show()

## 6. Test Inference and Visualization

In [None]:
def visualize_prediction(model, device, image_size=(320, 320)):
    """Visualize model prediction on a synthetic image."""
    model.eval()
    
    # Generate test image
    image, gt_coeffs, gt_exists = create_synthetic_lane_image(
        width=image_size[1], height=image_size[0]
    )
    
    # Prepare input
    input_tensor = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
    input_tensor = input_tensor.unsqueeze(0).to(device)
    
    # Inference
    with torch.no_grad():
        output = model(input_tensor)
    
    pred_poly = output['polynomials'][0].cpu().numpy()
    pred_conf = output['confidences'][0].cpu().numpy()
    
    # Visualize
    fig, axes = plt.subplots(1, 2, figsize=(14, 6))
    
    # Original image with ground truth
    ax1 = axes[0]
    ax1.imshow(image)
    ax1.set_title('Ground Truth')
    ax1.axis('off')
    
    # Image with predictions
    ax2 = axes[1]
    pred_image = image.copy()
    
    height, width = image_size
    colors = [(0, 255, 0), (255, 0, 0), (0, 0, 255), (255, 255, 0)]
    
    for i in range(4):
        if pred_conf[i] > 0.5:  # Confidence threshold
            c0, c1, c2, c3 = pred_poly[i]
            
            # Draw predicted lane
            points = []
            for y in range(height//3, height, 5):
                y_norm = y / height
                x = width/2 + c0 * width + c2 * (y - height/2)**2 / 1000
                if 0 < x < width:
                    points.append((int(x), y))
            
            for j in range(len(points) - 1):
                cv2.line(pred_image, points[j], points[j+1], colors[i], 2)
    
    ax2.imshow(pred_image)
    ax2.set_title(f'Predictions (conf: {pred_conf[:2].round(2)})')
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()
    
    return pred_conf, pred_poly


# Visualize several predictions
for _ in range(3):
    conf, poly = visualize_prediction(model, device)
    print(f"Predicted confidences: {conf}")
    print()

## 7. Save Model

In [None]:
# Save the trained model
save_path = '../outputs/simple_lane_model.pt'
os.makedirs('../outputs', exist_ok=True)

torch.save({
    'model': model.state_dict(),
    'optimizer': optimizer.state_dict(),
    'epoch': num_epochs,
    'train_loss': train_losses[-1],
    'val_loss': val_losses[-1],
}, save_path)

print(f"Model saved to {save_path}")

## 8. Measure Inference Speed

In [None]:
import time

model.eval()
dummy_input = torch.randn(1, 3, 320, 320).to(device)

# Warmup
for _ in range(10):
    with torch.no_grad():
        _ = model(dummy_input)

if device.type == 'cuda':
    torch.cuda.synchronize()

# Benchmark
num_iterations = 100
latencies = []

for _ in range(num_iterations):
    if device.type == 'cuda':
        torch.cuda.synchronize()
    
    start = time.perf_counter()
    with torch.no_grad():
        _ = model(dummy_input)
    
    if device.type == 'cuda':
        torch.cuda.synchronize()
    
    latencies.append((time.perf_counter() - start) * 1000)

latencies = np.array(latencies)

print(f"\n=== Inference Speed (input: 320x320) ===")
print(f"Mean latency: {np.mean(latencies):.2f} ms")
print(f"Std latency: {np.std(latencies):.2f} ms")
print(f"Min latency: {np.min(latencies):.2f} ms")
print(f"Max latency: {np.max(latencies):.2f} ms")
print(f"FPS: {1000 / np.mean(latencies):.1f}")

## Next Steps

This notebook demonstrates a simple training pipeline. For production use:

1. **Use the full LaneYOLO model** from `lane_keeping.models.lane_yolo`
2. **Use real datasets** (TuSimple, CULane) from `lane_keeping.data.datasets`
3. **Apply augmentations** from `lane_keeping.processing.augmentation`
4. **Use the full training script** at `scripts/train.py`
5. **Export to TensorRT** for edge deployment