## Setup Google Colab

First, mount Google Drive to access files:

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

# project_name = "SeamTaping"
project_name = "WRB"
print("Project:", project_name)

# Path to saved images
image_folder = f'/content/gdrive/MyDrive/CrackDetection/{project_name}_dataset/images'

# Load dataset from JSON
train_dataset_json_path = f'/content/gdrive/MyDrive/CrackDetection/{project_name}_dataset/train_data.json'
val_dataset_json_path = f'/content/gdrive/MyDrive/CrackDetection/{project_name}_dataset/val_data.json'
test_dataset_json_path = f'/content/gdrive/MyDrive/CrackDetection/{project_name}_dataset/test_data.json'


## Define Custom Dataset Class

Create a custom dataset class to load images and annotations.

In [None]:
import os
import json
import numpy as np
import torch
from PIL import Image, ImageDraw
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader

class CustomDataset(Dataset):
    def __init__(self, dataset_json_path, image_folder):
        with open(dataset_json_path, 'r') as f:
            dataset = json.load(f)

        self.dataset = dataset
        self.image_folder = image_folder
        self.mean = [0.485, 0.456, 0.406]
        self.std = [0.229, 0.224, 0.225]
        self.image_size = (800, 800)
        self.transforms = T.Compose([
            T.Resize(self.image_size),
            T.ToTensor(),
            T.Normalize(mean=self.mean, std=self.std)
        ])

        self.label_map = {
            'WRB-Bad': 0,
            # Add more labels as needed
        }

    def __len__(self):
        return len(self.dataset)

    def xywh_to_xyxy(self, xywh):
        x, y, w, h = xywh
        x2 = x + w
        y2 = y + h
        xyxy = [x, y, x2, y2]
        return xyxy

    def __getitem__(self, idx):
        image_data = self.dataset[idx]
        image_file_name = image_data['image_file_name']
        image_path = os.path.join(self.image_folder, image_file_name)

        # Load image
        image_original = Image.open(image_path).convert("RGB")

        # Get bounding boxes and labels
        boxes = []
        labels = []
        for annotation in image_data['annotations']:
            bbox = annotation['bbox']
            box = self.xywh_to_xyxy(bbox)
            boxes.append(box)
            labels.append(self.label_map[annotation['label']])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # Apply transformations
        if self.transforms is not None:
            image = self.transforms(image_original)

        # Calculate scaling factor for resizing bounding boxes AFTER transforms
        original_size = np.array(image_original.size)  # Get original size from the image file
        # print(original_size)
        resized_size = self.image_size
        scale = resized_size / original_size
        # print(scale)

        # Adjust bounding box coordinates based on resizing
        boxes[:, 0] *= scale[0]  # x_min
        boxes[:, 1] *= scale[1]  # y_min
        boxes[:, 2] *= scale[0]  # x_max
        boxes[:, 3] *= scale[1]  # y_max

        labels = torch.tensor(labels, dtype=torch.int64)

        target = {
            'boxes': boxes,
            'labels': labels
        }

        return image, target

# Create custom dataset instance with augmentation enabled
train_dataset = CustomDataset(train_dataset_json_path, image_folder)
val_dataset = CustomDataset(val_dataset_json_path, image_folder)
test_dataset = CustomDataset(test_dataset_json_path, image_folder)

train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=4)
val_dataloader = DataLoader(val_dataset, batch_size=4, shuffle=True, num_workers=4)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=True, num_workers=4)


## Train TorchVision FasterRCNN model

In [None]:
import os
import tqdm
import torchvision

def get_model(pretrained=False):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=pretrained)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, 2)
    return model

In [None]:
def train_epoch(model, train_dataloader, optimizer, device):
    model.train()
    train_loss = 0
    for images, targets in train_dataloader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        train_loss += losses.item()

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    return train_loss / len(train_dataloader)

def validate_epoch(model, val_dataloader, device):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for images, targets in val_dataloader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            if isinstance(loss_dict, dict):  # Check if the loss_dict is indeed a dictionary
                losses = sum(loss for loss in loss_dict.values())
                val_loss += losses.item()

    return val_loss / len(val_dataloader)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_model(pretrained=True)
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
num_epochs = 10

train_losses = []
val_losses = []
learning_rates = []

for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_dataloader, optimizer, device)
    val_loss = validate_epoch(model, val_dataloader, device)
    learning_rate = optimizer.param_groups[0]['lr']

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    learning_rates.append(learning_rate)

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Learning Rate: {learning_rate:.6f}")

print("Training complete.")


### Define Training Function

Set up the training function.

In [None]:
# Function to train the model
def train_model(model, data_loader, optimizer, device, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        for images, targets in tqdm(data_loader):
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)

            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()


### Run Training Model

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
data_loader = DataLoader(custom_dataset, batch_size=2, shuffle=True, num_workers=4)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_model(model, data_loader, optimizer, device, num_epochs=10)

### Save the Model

Save the trained model.

In [None]:
# Save model
checkpoint_dir = '/content/drive/My Drive/CrackDetection'
torch.save(model.state_dict(), os.path.join(checkpoint_dir, f'faster_rcnn_model.pth'))

## Load and Evaluate the Model


### Load the model for inference.

In [None]:
# Load model
model = get_torchvision_fastrcnn_model()
model.load_state_dict(torch.load(os.path.join(checkpoint_dir, f'faster_rcnn_model.pth')))
model.eval()

### Evaluate the trained model

using metrics like accuracy, precision, recall, and F1-score.

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np

def evaluate_model(model, data_loader, device):
    model.eval()
    true_labels = []
    pred_labels = []

    with torch.no_grad():
        for images, targets in tqdm(data_loader):
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            predictions = model(images)
            
            # Process predictions and targets to calculate metrics
            for pred, target in zip(predictions, targets):
                true_labels.extend(target['labels'].cpu().numpy())
                pred_labels.extend(pred['labels'].cpu().numpy())

    # Calculate evaluation metrics
    accuracy = accuracy_score(true_labels, pred_labels)
    precision = precision_score(true_labels, pred_labels, average='weighted')
    recall = recall_score(true_labels, pred_labels, average='weighted')
    f1 = f1_score(true_labels, pred_labels, average='weighted')

    print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")

# Example usage of evaluation function
data_loader_test = DataLoader(custom_dataset, batch_size=1, shuffle=False, num_workers=4)
evaluate_model(model, data_loader_test, device)
