<a href="https://colab.research.google.com/github/beruck/Zamnuel/blob/main/RoadConditiontype.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# %% [Initial Setup]
# Run this cell first to set up the environment

!pip install -q bing-image-downloader
!sudo apt-get install tree

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import numpy as np
from collections import Counter
from bing_image_downloader import downloader
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# %% [Dataset Preparation]
# Custom dataset class with automatic balancing
class RoadDamageDataset(Dataset):
    def __init__(self, root_dir, target_size=2500, transform=None):
        self.classes = ['Normal', 'Pothole', 'Crack', 'Erosion',
                       'Rutting', 'Edge Cracking', 'Surface Deterioration', 'Washboarding']
        self.target_size = target_size
        self.transform = transform
        self.samples = []

        # Collect and balance images
        for class_idx, class_name in enumerate(self.classes):
            class_path = os.path.join(root_dir, class_name)
            if not os.path.exists(class_path):
                continue

            images = [os.path.join(class_path, f)
                     for f in os.listdir(class_path)
                     if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

            # Repeat images to reach target size
            repeat_factor = (target_size // len(images)) + 1 if len(images) > 0 else 0
            selected_images = (images * repeat_factor)[:target_size]

            self.samples.extend([(img, class_idx) for img in selected_images])

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label

# %% [Data Augmentation]
# Define transforms
train_transform = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomRotation(25),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomPerspective(distortion_scale=0.3, p=0.5),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# %% [Data Collection]
# Configuration
DATASET_PATH = '/content/dataset'
DRIVE_PATH = '/content/drive/MyDrive/RoadDamageDataset'

# Create directories
!mkdir -p "{DATASET_PATH}"
!mkdir -p "{DRIVE_PATH}"

# Download sample images (Bing Image Downloader)
classes = ['Normal road', 'Pothole road damage', 'Crack road damage',
           'Erosion road damage', 'Rutting road damage', 'Edge Cracking road damage',
           'Surface Deterioration road damage', 'Washboarding road damage']

for query in classes:
    downloader.download(
        query,
        limit=300,  # Reduce for testing, increase for actual use
        output_dir=DATASET_PATH,
        adult_filter_off=True,
        force_replace=False,
        timeout=60
    )

# Check directory structure
!tree -L 2 "{DATASET_PATH}"

# %% [Dataset Initialization]
# Create dataset instance
full_dataset = RoadDamageDataset(
    root_dir=DATASET_PATH,
    target_size=2500,
    transform=train_transform
)

# Split dataset
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

# Apply validation transform
val_dataset.dataset.transform = val_transform

# Create data loaders
BATCH_SIZE = 32
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

# %% [Check Data Distribution]
print(f"\nTotal training samples: {len(train_dataset)}")
print(f"Total validation samples: {len(val_dataset)}")
print("\nClass distribution in training set:")
print(Counter([label for _, label in train_dataset]))

# %% [Model Setup]
# Initialize pretrained model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = models.resnet50(pretrained=True)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, len(classes))
model = model.to(device)

# Training configuration
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# %% [Training Loop]
# Simple training loop
def train_model(model, num_epochs=10):
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_dataset)
        epoch_acc = running_corrects.double() / len(train_dataset)

        print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_corrects = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                val_corrects += torch.sum(preds == labels.data)

        val_loss = val_loss / len(val_dataset)
        val_acc = val_corrects.double() / len(val_dataset)

        print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Update scheduler
        scheduler.step()

    return model

# Start training
model = train_model(model, num_epochs=10)

# Save model
torch.save(model.state_dict(), os.path.join(DRIVE_PATH, 'road_damage_model.pth'))
print("Model saved to Google Drive!")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[!!]Indexing page: 3858

[%] Indexed 25 Images on Page 3858.




[!!]Indexing page: 3859

[%] Indexed 25 Images on Page 3859.




[!!]Indexing page: 3860

[%] Indexed 25 Images on Page 3860.




[!!]Indexing page: 3861

[%] Indexed 25 Images on Page 3861.




[!!]Indexing page: 3862

[%] Indexed 25 Images on Page 3862.




[!!]Indexing page: 3863

[%] Indexed 25 Images on Page 3863.




[!!]Indexing page: 3864

[%] Indexed 25 Images on Page 3864.




[!!]Indexing page: 3865

[%] Indexed 25 Images on Page 3865.




[!!]Indexing page: 3866

[%] Indexed 25 Images on Page 3866.




[!!]Indexing page: 3867

[%] Indexed 25 Images on Page 3867.




[!!]Indexing page: 3868

[%] Indexed 25 Images on Page 3868.




[!!]Indexing page: 3869

[%] Indexed 25 Images on Page 3869.




[!!]Indexing page: 3870

[%] Indexed 25 Images on Page 3870.




[!!]Indexing page: 3871

[%] Indexed 25 Images on Page 3871.




[!!]Indexing page: 3872



ValueError: num_samples should be a positive integer value, but got num_samples=0