In [112]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision import transforms
import torch.multiprocessing
torch.multiprocessing.set_start_method('spawn', force=True)
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import matplotlib.pyplot as plt
import cv2
import random
from tqdm import tqdm
import time
from torchmetrics.detection.mean_ap import MeanAveragePrecision

In [113]:

# Define the Concrete Crack Dataset
class ConcreteCrackDataset(Dataset):
    def __init__(self, root_dir, split='train', transforms=None):
        """
        Args:
            root_dir (string): Root directory with all the data
            split (string): 'train', 'val', or 'test' split
            transforms (callable, optional): Optional transforms to be applied on a sample
        """
        self.root_dir = root_dir
        self.split = split
        self.transforms = transforms
        
        # Set path to the specific split (train, val, or test)
        self.split_dir = os.path.join(root_dir, split)
        
        # Get all images recursively from the split directory
        self.imgs = []
        
        # Walk through all subdirectories (Decks, Pavements, Walls, etc.)
        for root, _, files in os.walk(self.split_dir):
            for file in files:
                if file.endswith(('.jpg', '.jpeg', '.png')):
                    self.imgs.append(os.path.join(root, file))
        
        print(f"Found {len(self.imgs)} images in {split} split")
    
    def __len__(self):
        return len(self.imgs)
    
    def __getitem__(self, idx):
        img_path = self.imgs[idx]
        
        # Derive annotation path - assuming annotation files are in same directory with .txt extension
        img_dir, img_name = os.path.split(img_path)
        annot_path = os.path.join(img_dir, img_name.replace('.jpg', '.txt').replace('.jpeg', '.txt').replace('.png', '.txt'))
        
        # Load image
        img = Image.open(img_path).convert("RGB")
        
        # Get image dimensions
        width, height = img.size
        
        # Initialize boxes and labels lists
        boxes = []
        labels = []
        
        # Check if annotation file exists (for crack images)
        if os.path.exists(annot_path):
            with open(annot_path, 'r') as f:
                for line in f:
                    # Parse annotation (assuming YOLO format: class x_center y_center width height)
                    parts = line.strip().split()
                    cls_id = int(parts[0])  # Typically 0 for crack in YOLO format
                    
                    # Convert YOLO format to (x1, y1, x2, y2)
                    x_center = float(parts[1]) * width
                    y_center = float(parts[2]) * height
                    box_width = float(parts[3]) * width
                    box_height = float(parts[4]) * height
                    
                    x1 = max(0, x_center - box_width / 2)
                    y1 = max(0, y_center - box_height / 2)
                    x2 = min(width, x_center + box_width / 2)
                    y2 = min(height, y_center + box_height / 2)
                    
                    # Only add valid boxes
                    if x2 > x1 and y2 > y1:
                        boxes.append([x1, y1, x2, y2])
                        labels.append(1)  # 1 for crack (0 is background in Faster R-CNN)
        
        # If no cracks (empty annotation or non-existent file), this is likely a non-crack image
        # In Faster R-CNN training, we still need a target dictionary, but it can have empty boxes
        
        # Convert to tensor
        if len(boxes) == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)
        else:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)
        
        # Create target dictionary
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        image_id = torch.tensor([idx])
        target["image_id"] = image_id
        
        # Calculate area
        if len(boxes) > 0:
            area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        else:
            area = torch.zeros((0,), dtype=torch.float32)
        target["area"] = area
        
        # Suppose all instances are not crowd
        iscrowd = torch.zeros((len(boxes),), dtype=torch.int64)
        target["iscrowd"] = iscrowd
        
        if self.transforms:
            img, target = self.transforms(img, target)
        
        return img, target


In [114]:
# Helper function for data augmentation and preprocessing
class Compose:
    def __init__(self, transforms):
        self.transforms = transforms
    
    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

class ToTensor:
    def __call__(self, image, target):
        image = transforms.ToTensor()(image)
        return image, target

class RandomHorizontalFlip:
    def __init__(self, prob=0.5):
        self.prob = prob
    
    def __call__(self, image, target):
        if random.random() < self.prob:
            height, width = image.size
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
            
            # Flip boxes
            if target["boxes"].shape[0] > 0:
                boxes = target["boxes"]
                boxes[:, [0, 2]] = width - boxes[:, [2, 0]]
                target["boxes"] = boxes
                
        return image, target

class Normalize:
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std
    
    def __call__(self, image, target):
        image = transforms.Normalize(mean=self.mean, std=self.std)(image)
        return image, target

In [115]:
# Function to get the Faster R-CNN model
def get_faster_rcnn_model(num_classes=2):  # 2 classes: background and crack
    # Load a pre-trained model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights='DEFAULT')
    
    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model

In [116]:
# Function to train one epoch
def train_one_epoch(model, data_loader, optimizer, device):
    model.train()
    
    total_loss = 0
    num_batches = 0  # Only count batches that were actually used

    for images, targets in tqdm(data_loader, desc="Training"):
        # Move images and targets to device
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        # Filter valid targets (some images might not have annotations)
        valid_images, valid_targets = [], []
        for img, tgt in zip(images, targets):
            if tgt["boxes"].shape[0] > 0:
                valid_images.append(img)
                valid_targets.append(tgt)
        
        if len(valid_targets) == 0:
            continue  # Skip if no valid targets
        
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        loss_dict = model(valid_images, valid_targets)
        losses = sum(loss for loss in loss_dict.values())
        
        # Backward pass
        losses.backward()
        optimizer.step()
        
        total_loss += losses.item()
        num_batches += 1

    # Prevent division by zero
    if num_batches == 0:
        return 0.0
    
    return total_loss / num_batches


In [117]:

# Function to evaluate the model on validation set
def evaluate(model, data_loader, device):
    model.eval()
    
    # Initialize evaluation metric
    metric = MeanAveragePrecision()
    
    with torch.no_grad():
        for images, targets in tqdm(data_loader, desc="Evaluating"):
            # Move images to device
            images = list(image.to(device) for image in images)
            
            # Get predictions
            outputs = model(images)
            
            # Convert outputs and targets to format expected by metric
            for i, (output, target) in enumerate(zip(outputs, targets)):
                pred = {
                    'boxes': output['boxes'].cpu(),
                    'scores': output['scores'].cpu(),
                    'labels': output['labels'].cpu()
                }
                
                gt = {
                    'boxes': target['boxes'].cpu(),
                    'labels': target['labels'].cpu()
                }
                
                # Update metric
                metric.update([pred], [gt])
    
    # Compute metric
    result = metric.compute()
    return result


In [118]:

# Function to train the model
def train_model(model, train_loader, val_loader, optimizer, scheduler, device, num_epochs=10):
    best_map = 0.0
    best_model_wts = None
    
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        print("-" * 10)
        
        # Train for one epoch
        epoch_loss = train_one_epoch(model, train_loader, optimizer, device)
        print(f"Training Loss: {epoch_loss:.4f}")
        
        # Update learning rate
        scheduler.step()
        
        # Evaluate on validation set
        val_metrics = evaluate(model, val_loader, device)
        val_map = val_metrics['map'].item()
        print(f"Validation mAP: {val_map:.4f}")
        
        # Save best model
        if val_map > best_map:
            best_map = val_map
            best_model_wts = model.state_dict().copy()
            torch.save(best_model_wts, f'best_model_epoch_{epoch+1}.pth')
            print(f"Saved best model with mAP: {best_map:.4f}")
        
        print()
    
    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model, best_map


In [119]:

# Function to visualize predictions
def visualize_prediction(model, img_path, device, threshold=0.5):
    # Set model to evaluation mode
    model.eval()
    
    # Load and transform the image
    img = Image.open(img_path).convert("RGB")
    transform = transforms.Compose([transforms.ToTensor()])
    img_tensor = transform(img).unsqueeze(0).to(device)
    
    # Get prediction
    with torch.no_grad():
        prediction = model(img_tensor)
    
    # Convert image back to numpy for visualization
    img_np = np.array(img)
    
    # Draw bounding boxes on the image
    for idx, box in enumerate(prediction[0]['boxes']):
        score = prediction[0]['scores'][idx].item()
        if score > threshold:
            x1, y1, x2, y2 = box.cpu().numpy().astype(np.int32)
            cv2.rectangle(img_np, (x1, y1), (x2, y2), (0, 255, 0), 2)
            label = f"Crack: {score:.2f}"
            cv2.putText(img_np, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    
    # Display the image
    plt.figure(figsize=(10, 10))
    plt.imshow(img_np)
    plt.axis('off')
    plt.show()
    
    return img_np

In [120]:

# Function to evaluate on test set
def evaluate_test_set(model, test_loader, device, output_dir='test_results'):
    model.eval()
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Initialize evaluation metric
    metric = MeanAveragePrecision()
    
    with torch.no_grad():
        for i, (images, targets) in enumerate(tqdm(test_loader, desc="Testing")):
            # Move images to device
            images = list(image.to(device) for image in images)
            
            # Get predictions
            outputs = model(images)
            
            # Convert outputs and targets for metric calculation
            for j, (output, target) in enumerate(zip(outputs, targets)):
                pred = {
                    'boxes': output['boxes'].cpu(),
                    'scores': output['scores'].cpu(),
                    'labels': output['labels'].cpu()
                }
                
                gt = {
                    'boxes': target['boxes'].cpu(),
                    'labels': target['labels'].cpu()
                }
                
                # Update metric
                metric.update([pred], [gt])
            
            # Save visualizations for the first few samples
            if i < 10:  # Save first 10 test examples
                img_np = np.array(images[0].cpu().permute(1, 2, 0))
                img_np = (img_np * 255).astype(np.uint8)
                
                # Draw ground truth boxes in blue
                for box, label in zip(targets[0]['boxes'], targets[0]['labels']):
                    if label == 1:  # Only draw crack boxes
                        x1, y1, x2, y2 = box.cpu().numpy().astype(np.int32)
                        cv2.rectangle(img_np, (x1, y1), (x2, y2), (255, 0, 0), 2)  # Blue for ground truth
                
                # Draw predicted boxes in green
                for box, score, label in zip(outputs[0]['boxes'], outputs[0]['scores'], outputs[0]['labels']):
                    if score > 0.5 and label == 1:  # Only draw high confidence crack predictions
                        x1, y1, x2, y2 = box.cpu().numpy().astype(np.int32)
                        cv2.rectangle(img_np, (x1, y1), (x2, y2), (0, 255, 0), 2)  # Green for predictions
                        cv2.putText(img_np, f"{score:.2f}", (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                
                # Save image
                cv2.imwrite(os.path.join(output_dir, f"test_result_{i}.jpg"), cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR))
    
    # Compute final metrics
    result = metric.compute()
    print("Test Results:")
    for k, v in result.items():
        print(f"{k}: {v}")
    
    # Save metrics to file
    with open(os.path.join(output_dir, "test_metrics.txt"), "w") as f:
        for k, v in result.items():
            f.write(f"{k}: {v}\n")
    
    return result


In [121]:
# Define the collate function outside of main()
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
# Main function to run the training and evaluation
def main():
    import random
    
    # Set random seeds for reproducibility
    random.seed(42)
    np.random.seed(42)
    torch.manual_seed(42)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(42)
    
    # Set device
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    print(f"Using device: {device}")
    
    # Data transforms
    train_transform = Compose([
        ToTensor(),
        RandomHorizontalFlip(0.5),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    val_test_transform = Compose([
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    root_dir = '../artifact_folder'

    # Create datasets
    train_dataset = ConcreteCrackDataset(
        root_dir=root_dir,
        split='train/images',  # Updated path to match your structure
        transforms=train_transform
    )

    val_dataset = ConcreteCrackDataset(
        root_dir=root_dir,
        split='val/images',  # Updated path to match your structure
        transforms=val_test_transform
    )

    test_dataset = ConcreteCrackDataset(
        root_dir=root_dir,
        split='test/images',  # Updated path to match your structure
        transforms=val_test_transform
    )
        
    # Create data loaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=4,  # Adjust based on your GPU memory
        shuffle=True,
        num_workers=0,
        collate_fn=collate_fn  # Required for variable size inputs
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=2,
        shuffle=False,
        num_workers=0,
        collate_fn=collate_fn
    )
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=1,
        shuffle=False,
        num_workers=0,
        collate_fn=collate_fn
    )
    
    # Get model
    model = get_faster_rcnn_model(num_classes=2)
    model.to(device)
    
    # Define optimizer and learning rate scheduler
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
    
    # Use learning rate scheduler to reduce lr by 0.1 every 3 epochs
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
    
    # Train model
    print("Starting training...")
    start_time = time.time()
    
    model, best_map = train_model(
        model, 
        train_loader, 
        val_loader, 
        optimizer, 
        scheduler, 
        device, 
        num_epochs=15
    )
    
    end_time = time.time()
    print(f"Training completed in {(end_time - start_time) / 60:.2f} minutes")
    print(f"Best validation mAP: {best_map:.4f}")
    
    # Save final model
    torch.save(model.state_dict(), 'final_faster_rcnn_concrete_crack_detector.pth')
    
    # Evaluate on test set
    print("Evaluating on test set...")
    test_results = evaluate_test_set(model, test_loader, device)
    
    print("Training and evaluation completed!")


In [None]:
if __name__ == "__main__":
    main()