## Step 1: Install Dependencies

In [1]:
# Install required packages (if needed)
!pip install -q torch torchvision opencv-python pillow matplotlib timm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, WeightedRandomSampler
from torchvision import transforms, models as torchvision_models
from torchvision.datasets import ImageFolder
import timm  # For EfficientNet
import numpy as np
import os
from PIL import Image
import matplotlib.pyplot as plt
import random

# Check GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"✓ Using device: {device}")
if torch.cuda.is_available():
    print(f"✓ GPU: {torch.cuda.get_device_name(0)}")
    print(f"✓ GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

✓ Using device: cuda
✓ GPU: Tesla T4
✓ GPU Memory: 15.83 GB


## Step 2: Upload Dataset

Choose one option:
- **Option A**: Upload ZIP file (Colab web interface only)
- **Option B**: Mount Google Drive (fastest for large files)
- **Option C**: Direct copy from local machine (VS Code connected to Colab)

**⚠️ Important:** The upload widget only works in Colab's web interface at https://colab.research.google.com/

**For large files (1.67 GB):** Use Option B (Google Drive) - it's much faster!

1. Upload `road_dataset_20251118_201844.zip` to your Google Drive
2. Use Option B below to mount Drive and extract

In [3]:
# OPTION A: Upload ZIP file
from google.colab import files

print("Upload your road_dataset_20251118_201844.zip file:")
uploaded = files.upload()

# Get the uploaded filename
zip_filename = list(uploaded.keys())[0]

# Extract
print(f"\nExtracting {zip_filename}...")
!unzip -q "$zip_filename" -d /content/
print("✓ Dataset extracted!")

# Set paths (ZIP contains data/train and data/validation folders)
TRAIN_DIR = '/content/data/train'
VAL_DIR = '/content/data/validation'

print(f"\n✓ Train dir: {TRAIN_DIR}")
print(f"✓ Val dir: {VAL_DIR}")

Upload your road_dataset_20251118_201844.zip file:


KeyboardInterrupt: 

In [None]:
# OPTION B: Mount Google Drive (RECOMMENDED for large files)
# 1. Upload road_dataset_20251118_201844.zip to your Google Drive first
# 2. Run this cell and authorize
# 3. Extract the dataset

from google.colab import drive
drive.mount('/content/drive')

# Extract dataset from Drive (adjust path if needed)
print("\nExtracting dataset from Google Drive...")
!unzip -q /content/drive/MyDrive/road_dataset_20251118_201844.zip -d /content/
print("✓ Dataset extracted!")

# Set paths
TRAIN_DIR = '/content/data/train'
VAL_DIR = '/content/data/validation'

In [5]:
# OPTION C: Direct upload from local machine (VS Code connected to Colab)
# This works when your Colab runtime is connected to VS Code

import os
import shutil

# Path to your local ZIP file (update this path if needed)
LOCAL_ZIP_PATH = r'C:\Users\amitu\Downloads\yolo\road_dataset_20251118_201844.zip'

print("Checking for local ZIP file...")
if os.path.exists(LOCAL_ZIP_PATH):
    print(f"✓ Found: {LOCAL_ZIP_PATH}")
    
    # Copy to Colab runtime
    print("\nCopying ZIP to Colab runtime...")
    shutil.copy2(LOCAL_ZIP_PATH, '/content/road_dataset_20251118_201844.zip')
    print("✓ ZIP copied to /content/")
    
    # Extract
    print("\nExtracting dataset...")
    !unzip -q /content/road_dataset_20251118_201844.zip -d /content/
    print("✓ Dataset extracted!")
    
    # Set paths
    TRAIN_DIR = '/content/data/train'
    VAL_DIR = '/content/data/validation'
    
    print(f"\n✓ Train dir: {TRAIN_DIR}")
    print(f"✓ Val dir: {VAL_DIR}")
else:
    print(f"✗ ZIP file not found at: {LOCAL_ZIP_PATH}")
    print("Please use Option A or B instead")

Checking for local ZIP file...
✗ ZIP file not found at: C:\Users\amitu\Downloads\yolo\road_dataset_20251118_201844.zip
Please use Option A or B instead


In [None]:
# Verify dataset structure
print("\n=== Dataset Structure ===")
print(f"Training classes: {os.listdir(TRAIN_DIR)}")
print(f"Validation classes: {os.listdir(VAL_DIR)}")

print("\nImages per class:")
for class_name in os.listdir(TRAIN_DIR):
    train_count = len(os.listdir(os.path.join(TRAIN_DIR, class_name)))
    val_count = len(os.listdir(os.path.join(VAL_DIR, class_name)))
    print(f"  {class_name}: {train_count} train, {val_count} val")

## Step 3: Build YOLO-Based Model

In [None]:
class EnhancedRoadModel(nn.Module):
    def __init__(self, num_classes=3):
        super(EnhancedRoadModel, self).__init__()
        
        # EfficientNet-B0 backbone (more accurate than MobileNetV2)
        self.backbone = timm.create_model('efficientnet_b0', pretrained=True, num_classes=0)
        num_features = self.backbone.num_features  # 1280 for EfficientNet-B0
        
        # Freeze only early layers (unfreeze more for better learning)
        for param in list(self.backbone.parameters())[:-60]:
            param.requires_grad = False
        
        # Simplified classification head without dropout
        self.classifier = nn.Sequential(
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            
            nn.Linear(256, 128),
            nn.ReLU(),
            
            nn.Linear(128, num_classes)
        )
    
    def forward(self, x):
        features = self.backbone(x)
        return self.classifier(features)

# Create enhanced model (3 active classes only)
model = EnhancedRoadModel(num_classes=3).to(device)

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"\n✓ Enhanced model built with EfficientNet-B0!")
print(f"  Total parameters: {total_params:,}")
print(f"  Trainable parameters: {trainable_params:,}")
print(f"  Classes: Crack, Pothole, Severe_Damage")
print(f"  Device: {device}")

## Step 4: Train Model

In [None]:
# Enhanced training configuration
EPOCHS = 100
BATCH_SIZE = 32
ACCUMULATION_STEPS = 2  # Effective batch size = 64
LEARNING_RATE = 1e-4  # Higher for better learning
WEIGHT_DECAY = 1e-4
WARMUP_EPOCHS = 5

print(f"\n=== Enhanced Training Configuration ===")
print(f"Epochs: {EPOCHS}")
print(f"Batch Size: {BATCH_SIZE} (effective: {BATCH_SIZE * ACCUMULATION_STEPS})")
print(f"Learning Rate: {LEARNING_RATE}")
print(f"Weight Decay: {WEIGHT_DECAY}")
print(f"Device: {device}")

In [None]:
# Simplified augmentation pipeline for better learning
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print("✓ Simplified augmentation pipeline created (no dropout, lighter transforms)")

# Load datasets
train_dataset = ImageFolder(TRAIN_DIR, transform=train_transforms)
val_dataset = ImageFolder(VAL_DIR, transform=val_transforms)

# Class weights for imbalanced data (Crack: 0.839, Pothole: 0.832, Severe_Damage: 1.651)
class_weights = torch.tensor([0.839, 0.832, 1.651]).to(device)

# Weighted sampler for balanced batches
sample_weights = [class_weights[label] for _, label in train_dataset.samples]
sampler = WeightedRandomSampler(sample_weights, len(sample_weights))

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"\n✓ Training samples: {len(train_dataset)}")
print(f"✓ Validation samples: {len(val_dataset)}")
print(f"✓ Classes: {train_dataset.classes}")
print(f"✓ Class weights applied: {class_weights.tolist()}")

In [None]:
# Enhanced training setup with FIXED learning rate
criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)
optimizer = optim.AdamW([
    {'params': model.backbone.parameters(), 'lr': LEARNING_RATE * 0.1},  # Backbone: lower LR
    {'params': model.classifier.parameters(), 'lr': LEARNING_RATE}  # Classifier: higher LR
], weight_decay=WEIGHT_DECAY)

# NO scheduler - keep learning rate constant
scaler = torch.cuda.amp.GradScaler()  # Mixed precision

print(f"\n✓ Loss: CrossEntropyLoss with class weights + label smoothing")
print(f"✓ Optimizer: AdamW with FIXED learning rate")
print(f"✓ Backbone LR: {LEARNING_RATE * 0.1} (constant)")
print(f"✓ Classifier LR: {LEARNING_RATE} (constant)")
print(f"✓ Mixed Precision: Enabled")

# Training loop with all enhancements
best_val_acc = 0.0
patience_counter = 0
patience = 20
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'lr': []}

print("\n" + "="*70)
print("STARTING ENHANCED TRAINING")
print("="*70)
print(f"Target: 85-90% validation accuracy")
print(f"Improvements: EfficientNet + Class weights + FIXED LR + No dropout")
print("="*70 + "\n")

for epoch in range(EPOCHS):
    # Training phase
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0
    optimizer.zero_grad()
    
    for batch_idx, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        
        # Mixed precision forward pass
        with torch.cuda.amp.autocast():
            outputs = model(images)
            loss = criterion(outputs, labels) / ACCUMULATION_STEPS
        
        # Mixed precision backward pass
        scaler.scale(loss).backward()
        
        # Update weights every ACCUMULATION_STEPS
        if (batch_idx + 1) % ACCUMULATION_STEPS == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
        
        train_loss += loss.item() * ACCUMULATION_STEPS
        _, predicted = torch.max(outputs.data, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels).sum().item()
        
        # Show batch accuracy every 20 batches
        if (batch_idx + 1) % 20 == 0:
            batch_acc = (predicted == labels).sum().item() / labels.size(0)
            print(f"  Batch {batch_idx+1}/{len(train_loader)} | Loss: {loss.item()*ACCUMULATION_STEPS:.4f} | Acc: {batch_acc*100:.2f}%")
    
    train_loss = train_loss / len(train_loader)
    train_acc = train_correct / train_total
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(val_loader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()
            
            # Show batch accuracy every 10 batches during validation
            if (batch_idx + 1) % 10 == 0:
                batch_acc = (predicted == labels).sum().item() / labels.size(0)
                print(f"  Val Batch {batch_idx+1}/{len(val_loader)} | Loss: {loss.item():.4f} | Acc: {batch_acc*100:.2f}%")
    
    val_loss = val_loss / len(val_loader)
    val_acc = val_correct / val_total
    
    current_lr = optimizer.param_groups[0]['lr']
    
    # Save history
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    history['lr'].append(current_lr)
    
    # Print epoch summary
    print(f"\n{'='*70}")
    print(f"Epoch {epoch+1}/{EPOCHS}")
    print(f"{'='*70}")
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%")
    print(f"Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc*100:.2f}%")
    print(f"LR: {current_lr:.7f} (FIXED)")
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': val_acc,
            'class_names': train_dataset.classes
        }, '/content/road_enhanced_best.pth')
        patience_counter = 0
        print(f"✓ Best model saved! (Val Acc: {val_acc*100:.2f}%)")
    else:
        patience_counter += 1
    
    # Early stopping
    if patience_counter >= patience:
        print(f"\n✓ Early stopping triggered after {epoch+1} epochs")
        break

print(f"\n{'='*70}")
print("✓ ENHANCED TRAINING COMPLETED!")
print(f"{'='*70}")
print(f"Best Validation Accuracy: {best_val_acc*100:.2f}%")
print(f"Expected: 85-90%")
print(f"{'='*70}")

## Step 5: Visualize Results

In [None]:
# Plot training results with LR schedule
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# Loss plot
axes[0].plot(history['train_loss'], label='Train Loss')
axes[0].plot(history['val_loss'], label='Val Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training and Validation Loss')
axes[0].legend()
axes[0].grid(True)

# Accuracy plot
axes[1].plot([acc*100 for acc in history['train_acc']], label='Train Acc')
axes[1].plot([acc*100 for acc in history['val_acc']], label='Val Acc')
axes[1].axhline(y=81.53, color='r', linestyle='--', label='Baseline (81.53%)', alpha=0.7)
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy (%)')
axes[1].set_title('Training and Validation Accuracy')
axes[1].legend()
axes[1].grid(True)

# Learning Rate schedule
axes[2].plot(history['lr'])
axes[2].set_xlabel('Epoch')
axes[2].set_ylabel('Learning Rate')
axes[2].set_title('Learning Rate Schedule (Cosine Annealing)')
axes[2].set_yscale('log')
axes[2].grid(True)

plt.tight_layout()
plt.savefig('/content/enhanced_training_results.png', dpi=150, bbox_inches='tight')
plt.show()

print("✓ Enhanced training plots saved!")

## Step 6: Test Model

In [None]:
# Test on a sample validation image
def test_prediction(image_path, class_names):
    img = Image.open(image_path).convert('RGB')
    
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    img_tensor = transform(img).unsqueeze(0).to(device)
    
    model.eval()
    with torch.no_grad():
        outputs = model(img_tensor)
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
        confidence, predicted_class = torch.max(probabilities, 1)
    
    plt.figure(figsize=(8, 6))
    plt.imshow(img)
    plt.axis('off')
    plt.title(f"Prediction: {class_names[predicted_class.item()]}\n"
              f"Confidence: {confidence.item()*100:.2f}%", fontsize=14)
    plt.tight_layout()
    plt.show()
    
    print(f"\nPrediction: {class_names[predicted_class.item()]}")
    print(f"Confidence: {confidence.item()*100:.2f}%")
    print("\nAll probabilities:")
    for i, class_name in enumerate(class_names):
        print(f"  {class_name}: {probabilities[0][i].item()*100:.2f}%")

# Test on first validation image
sample_class = os.listdir(VAL_DIR)[0]
sample_image = os.path.join(VAL_DIR, sample_class, os.listdir(os.path.join(VAL_DIR, sample_class))[0])

print(f"Testing on: {sample_image}")
test_prediction(sample_image, train_dataset.classes)

## Step 7: Download Trained Model

In [None]:
# Download enhanced model
from google.colab import files

print("Downloading enhanced trained model...")
files.download('/content/road_enhanced_best.pth')

print("\nDownloading training plots...")
files.download('/content/enhanced_training_results.png')

print("\n" + "="*70)
print("✓ ENHANCED TRAINING COMPLETE!")
print("="*70)
print(f"\nFinal Accuracy: {best_val_acc*100:.2f}%")
print(f"Target Range: 85-90%")
print("\nNext steps:")
print("1. Copy road_enhanced_best.pth to your local project")
print("2. Rename to: models/road_condition_model.pth")
print("3. Update config.py NUM_CLASSES = 3")
print("4. Run locally: python deploy_model.py")
print("5. Start detection: python main.py")
print("="*70)

In [None]:
# Optional: Save to Google Drive as backup
# Uncomment to use

# from google.colab import drive
# drive.mount('/content/drive')
# !cp /content/road_enhanced_best.pth /content/drive/MyDrive/
# !cp /content/enhanced_training_results.png /content/drive/MyDrive/
# print("✓ Enhanced model backup saved to Google Drive")