# Imports

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
print(torch.cuda.is_available())
import pandas as pd

import os
from PIL import Image
import torchvision.transforms as transforms

True


# CNN Model

In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PotholeAsphaltCNN(nn.Module):
    def __init__(self, num_additional_features):
        super(PotholeAsphaltCNN, self).__init__()
        
        # First Convolutional Block
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout1 = nn.Dropout(0.25)
        
        # Second Convolutional Block
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout2 = nn.Dropout(0.25)
        
        # Third Convolutional Block
        self.conv5 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.conv6 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn6 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout3 = nn.Dropout(0.25)
        
        # Fully Connected Layers
        self.fc1 = nn.Linear(128 * 28 * 28, 400)
        self.bn_fc1 = nn.BatchNorm1d(400)
        self.dropout_fc1 = nn.Dropout(0.5)

        self.fc_additional = nn.Sequential(
            nn.Linear(num_additional_features, 100),
            nn.ReLU(),
            nn.BatchNorm1d(100),
            nn.Dropout(0.5)
        )
        
        self.fc2 = nn.Linear(400 + 100, 128)
        self.bn_fc2 = nn.BatchNorm1d(128)
        self.dropout_fc2 = nn.Dropout(0.5)
        
        self.fc3 = nn.Linear(128, 1)  # Output single regression value

    def forward(self, x, additional_features):
        # First Conv Block
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool1(x)
        x = self.dropout1(x)
        
        # Second Conv Block
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool2(x)
        x = self.dropout2(x)
        
        # Third Conv Block
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn6(self.conv6(x)))
        x = self.pool3(x)
        x = self.dropout3(x)
        
        # Flatten for fully connected layers
        x = x.view(x.size(0), -1)
        
        # Fully connected layers
        x = F.relu(self.bn_fc1(self.fc1(x)))
        x = self.dropout_fc1(x)

        # Process additional features separately
        additional_features = self.fc_additional(additional_features)

        # Concatenate CNN features with additional features
        x = torch.cat((x, additional_features), dim=1)
          
        x = F.relu(self.bn_fc2(self.fc2(x)))
        x = self.dropout_fc2(x)
        
        x = self.fc3(x)  # Output layer
        
        return x


# Setup

In [14]:
label_df = pd.read_csv('updated_csv_file.csv')
labels_dict = {'p'+str(int(row['Pothole number']))+'.jpg': row['Bags used '] for _, row in label_df.iterrows()}

class PotholeDataset(Dataset):
    def __init__(self, image_dir, labels_dict, transform=None):
        self.image_dir = image_dir
        self.labels_dict = labels_dict
        self.transform = transform
        self.image_filenames = os.listdir(image_dir)
        self.data = pd.read_csv('updated_csv_file.csv')
    
    def __len__(self):
        return len(self.image_filenames)
    
    def __getitem__(self, idx):
        img_name = self.image_filenames[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        # Extract label and additional features
        label = torch.tensor(self.data.iloc[idx, 1], dtype=torch.float32)
        additional_features = torch.tensor(self.data.iloc[idx, 2:].values, dtype=torch.float32)

        return image, label, additional_features
    
# Define transforms
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to match model input size
    transforms.RandomHorizontalFlip(),  # Data augmentation
    transforms.ToTensor(),  # Convert images to PyTorch tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize with ImageNet means and stds
])

# Paths to image directories
train_dir = 'train/cropped_images/'

# Create datasets
train_dataset = PotholeDataset(train_dir, labels_dict, transform=train_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)


# Train

In [15]:
num_additional_features = 5

# Model instantiation
model = PotholeAsphaltCNN(num_additional_features)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 15  # Adjust based on your needs

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for images, labels, additional_features in train_loader:
        # Move data to the appropriate device
        images, labels, additional_features = images.to(device), labels.to(device), additional_features.to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(images, additional_features)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Accumulate the loss
        running_loss += loss.item() * images.size(0)
    
    # Calculate and print the average loss for this epoch
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

print('Training complete')


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1/15, Loss: 2.0813
Epoch 2/15, Loss: 1.8227
Epoch 3/15, Loss: 1.7691
Epoch 4/15, Loss: 1.7505
Epoch 5/15, Loss: 1.7245
Epoch 6/15, Loss: 1.6981
Epoch 7/15, Loss: 1.6880
Epoch 8/15, Loss: 1.6826
Epoch 9/15, Loss: 1.6696
Epoch 10/15, Loss: 1.6680
Epoch 11/15, Loss: 1.6601
Epoch 12/15, Loss: 1.6569
Epoch 13/15, Loss: 1.6568
Epoch 14/15, Loss: 1.6449
Epoch 15/15, Loss: 1.6395
Training complete


# R2D2

In [16]:
from sklearn.metrics import r2_score
import numpy as np

# Paths to image directories
train_dir = 'train/cropped_images/'

# Create datasets
train_dataset = PotholeDataset(train_dir, labels_dict, transform=train_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

def compute_r2_score(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for images, labels, additional_features in dataloader:
            images, labels, additional_features = images.to(device), labels.to(device), additional_features.to(device)
            outputs = model(images, additional_features)
            
            all_preds.append(outputs.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    
    # Flatten the lists of predictions and labels
    all_preds = np.concatenate(all_preds, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)
    
    # Compute R² score
    r2 = r2_score(all_labels, all_preds)
    return r2

# Example usage after training
train_r2 = compute_r2_score(model, train_loader, device)
print(f"Training R² Score: {train_r2:.4f}")

Training R² Score: 0.0032
