In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Concatenate
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from collections import Counter

In [None]:
# Define paths (adjust these based on your actual dataset location)
base_path = "/content/drive/MyDrive/my_folder"
train_images_path = os.path.join(base_path, "Visual Pollution Dataset/images/train")
val_images_path = os.path.join(base_path, "Visual Pollution Dataset/images/val")
train_labels_path = os.path.join(base_path, "Visual Pollution Dataset/labels/train")
val_labels_path = os.path.join(base_path, "Visual Pollution Dataset/labels/val")

# Class names from dataset.yaml
class_names = ['barriers', 'sidewalks', 'pothole']
num_classes = len(class_names)

# Function to load labels from a .txt file (YOLO format: class_id center_x center_y width height)
def load_label(label_path):
    with open(label_path, 'r') as f:
        lines = f.readlines()
    # Take the first object in the label file (for simplicity, assuming one object per image)
    if lines:
        label_data = lines[0].strip().split()
        class_id = int(label_data[0])
        bbox = [float(x) for x in label_data[1:]]  # [center_x, center_y, width, height]
        return class_id, bbox
    return None, None

# Function to load images and labels
def load_data(images_path, labels_path):
    images = []
    class_labels = []
    bboxes = []
    for img_name in os.listdir(images_path):
        if not img_name.endswith(('.jpg', '.jpeg', '.png')):
            continue
        # Load image
        img_path = os.path.join(images_path, img_name)
        img = cv2.imread(img_path)
        if img is None:
            continue
        # Load corresponding label
        label_name = os.path.splitext(img_name)[0] + '.txt'
        label_path = os.path.join(labels_path, label_name)
        if not os.path.exists(label_path):
            continue
        class_id, bbox = load_label(label_path)
        if class_id is None or bbox is None:
            continue
        # Resize image to 128x128 for consistency
        img = cv2.resize(img, (128, 128))
        images.append(img)
        class_labels.append(class_id)
        bboxes.append(bbox)
    return np.array(images), np.array(class_labels), np.array(bboxes)

In [None]:
# Load training and validation data
print("Loading training data...")
train_images, train_class_labels, train_bboxes = load_data(train_images_path, train_labels_path)
print("Loading validation data...")
val_images, val_class_labels, val_bboxes = load_data(val_images_path, val_labels_path)

# Combine train and val for exploration
all_images = np.concatenate([train_images, val_images])
all_class_labels = np.concatenate([train_class_labels, val_class_labels])
all_bboxes = np.concatenate([train_bboxes, val_bboxes])

# Print dataset summary
print(f"Total images: {len(all_images)}")
print("Class distribution:")
label_counts = Counter(all_class_labels)
for class_id, count in label_counts.items():
    print(f"{class_names[class_id]}: {count}")

Loading training data...
Loading validation data...
Total images: 31795
Class distribution:
sidewalks: 5923
pothole: 20454
barriers: 5418


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Normalize pixel values to [0, 1]
all_images = all_images.astype('float32') / 255.0

# One-hot encode class labels
all_class_labels = to_categorical(all_class_labels, num_classes)

# Split into train and test sets (80-20 split)
X_train, X_test, y_train_class, y_test_class, y_train_bbox, y_test_bbox = train_test_split(
    all_images, all_class_labels, all_bboxes, test_size=0.2, random_state=42
)

# Create an ImageDataGenerator for data augmentation
datagen = ImageDataGenerator(
    rotation_range=10,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True,
    zoom_range=0.1,
    fill_mode='nearest'
)

# Fit the generator on the training data
datagen.fit(X_train)

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence
import numpy as np

# Force TensorFlow to use CPU to avoid GPU-related issues
tf.config.set_visible_devices([], 'GPU')
print("Num GPUs Available (should be 0):", len(tf.config.list_physical_devices('GPU')))

# Debug: Check shapes and data types of input data
print("X_train shape:", X_train.shape, "dtype:", X_train.dtype)
print("y_train_class shape:", y_train_class.shape, "dtype:", y_train_class.dtype)
print("y_train_bbox shape:", y_train_bbox.shape, "dtype:", y_train_bbox.dtype)
print("X_test shape:", X_test.shape, "dtype:", X_test.dtype)
print("y_test_class shape:", y_test_class.shape, "dtype:", y_test_class.dtype)
print("y_test_bbox shape:", y_test_bbox.shape, "dtype:", y_test_bbox.dtype)

# Ensure data types are float32
X_train = X_train.astype('float32')
y_train_class = y_train_class.astype('float32')
y_train_bbox = y_train_bbox.astype('float32')
X_test = X_test.astype('float32')
y_test_class = y_test_class.astype('float32')
y_test_bbox = y_test_bbox.astype('float32')

# Custom data generator for multi-output model with augmentation
class MultiOutputDataGenerator(Sequence):
    def __init__(self, x, y_class, y_bbox, batch_size, datagen, shuffle=True):
        self.x = x
        self.y_class = y_class
        self.y_bbox = y_bbox
        self.batch_size = batch_size
        self.datagen = datagen
        self.shuffle = shuffle
        self.indices = np.arange(len(self.x))
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.x) / self.batch_size))

    def __getitem__(self, index):
        # Get batch indices
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]

        # Generate batch
        batch_x = self.x[batch_indices]
        batch_y_class = self.y_class[batch_indices]
        batch_y_bbox = self.y_bbox[batch_indices]

        # Apply data augmentation
        batch_x_aug = np.zeros_like(batch_x)
        for i in range(len(batch_x)):
            batch_x_aug[i] = self.datagen.random_transform(batch_x[i])
            # Ensure pixel values remain in [0, 1]
            batch_x_aug[i] = np.clip(batch_x_aug[i], 0, 1)

        return batch_x_aug, {'class_output': batch_y_class, 'bbox_output': batch_y_bbox}

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

# Define focal loss for class imbalance
def focal_loss(gamma=2.0, alpha=0.25):
    def focal_loss_fixed(y_true, y_pred):
        # Clip predictions to avoid log(0)
        epsilon = tf.keras.backend.epsilon()
        y_pred = tf.clip_by_value(y_pred, epsilon, 1.0 - epsilon)

        # Compute cross-entropy loss
        cross_entropy = -y_true * tf.math.log(y_pred)

        # Compute focal factors
        focal_weight = alpha * tf.math.pow(1 - y_pred, gamma) * y_true
        focal_loss = focal_weight * cross_entropy

        return tf.reduce_mean(focal_loss)
    return focal_loss_fixed

# Build a simplified YOLO-like model
def build_yolo_model(input_shape=(128, 128, 3), num_classes=3):
    inputs = Input(shape=input_shape)

    # Backbone: Simple CNN
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Flatten()(x)

    # Head 1: Class prediction
    class_output = Dense(128, activation='relu')(x)
    class_output = Dense(num_classes, activation='softmax', name='class_output')(class_output)

    # Head 2: Bounding box prediction
    bbox_output = Dense(128, activation='relu')(x)
    bbox_output = Dense(4, activation='sigmoid', name='bbox_output')(bbox_output)  # [center_x, center_y, width, height]

    # Combine outputs
    model = Model(inputs, [class_output, bbox_output])
    return model

# Create data generator for augmentation
datagen = ImageDataGenerator(
    rotation_range=10,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True,
    zoom_range=0.1,
    fill_mode='nearest'
)

# Build and compile the model with focal loss for classification
model = build_yolo_model(input_shape=(128, 128, 3), num_classes=num_classes)
model.compile(
    optimizer=Adam(learning_rate=0.0001),
    loss={
        'class_output': focal_loss(gamma=2.0, alpha=0.25),
        'bbox_output': 'mean_squared_error'
    },
    loss_weights={
        'class_output': 1.0,
        'bbox_output': 1.0
    },
    metrics={
        'class_output': 'accuracy'
    }
)

# Define early stopping callback
early_stopping = EarlyStopping(
    monitor='val_class_output_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1,
    mode='min'
)

# Create the custom data generator for training
batch_size = 8
train_generator = MultiOutputDataGenerator(
    X_train,
    y_train_class,
    y_train_bbox,
    batch_size=batch_size,
    datagen=datagen,
    shuffle=True
)

# Train the model with the custom generator
history = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // batch_size,
    epochs=100,
    validation_data=(X_test, {'class_output': y_test_class, 'bbox_output': y_test_bbox}),
    callbacks=[early_stopping],
    verbose=1
)

# Save the model in .h5 format
model.save('visual_pollution_yolo_focal.h5')
print("Model saved as 'visual_pollution_yolo_focal.h5'")

Num GPUs Available (should be 0): 0
X_train shape: (25436, 128, 128, 3) dtype: float32
y_train_class shape: (25436, 3) dtype: float32
y_train_bbox shape: (25436, 4) dtype: float32
X_test shape: (6359, 128, 128, 3) dtype: float32
y_test_class shape: (6359, 3) dtype: float32
y_test_bbox shape: (6359, 4) dtype: float32
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 22: early stopping
Model saved as 'visual_pollution_yolo_focal.h5'


In [None]:
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

# Function to compute IoU (Intersection over Union) for bounding boxes
def compute_iou(box1, box2):
    # box1 and box2 are in [center_x, center_y, width, height] format
    # Convert to [x1, y1, x2, y2] format
    x1_1 = box1[0] - box1[2] / 2
    y1_1 = box1[1] - box1[3] / 2
    x2_1 = box1[0] + box1[2] / 2
    y2_1 = box1[1] + box1[3] / 2

    x1_2 = box2[0] - box2[2] / 2
    y1_2 = box2[1] - box2[3] / 2
    x2_2 = box2[0] + box2[2] / 2
    y2_2 = box2[1] + box2[3] / 2

    # Compute intersection coordinates
    x1_i = max(x1_1, x1_2)
    y1_i = max(y1_1, y1_2)
    x2_i = min(x2_1, x2_2)
    y2_i = min(y2_1, y2_2)

    # Compute intersection area
    intersection = max(0, x2_i - x1_i) * max(0, y2_i - y1_i)

    # Compute union area
    area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
    area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
    union = area1 + area2 - intersection

    return intersection / (union + 1e-6)  # Avoid division by zero

# Evaluate the model on the test set
all_true_classes = []
all_pred_classes = []
all_true_bboxes = []
all_pred_bboxes = []
ious = []
maes = []

class_output, bbox_output = model.predict(X_test, batch_size=8, verbose=1)

# Classification predictions
pred_classes = np.argmax(class_output, axis=1)
true_classes = np.argmax(y_test_class, axis=1)

# Bounding box predictions
pred_bboxes = bbox_output
true_bboxes = y_test_bbox

# Collect predictions and true values
all_true_classes.extend(true_classes)
all_pred_classes.extend(pred_classes)
all_true_bboxes.extend(true_bboxes)
all_pred_bboxes.extend(pred_bboxes)

# Compute IoU for each sample
for t_bbox, p_bbox in zip(true_bboxes, pred_bboxes):
    iou = compute_iou(t_bbox, p_bbox)
    ious.append(iou)

    # Compute MAE for bounding box coordinates
    mae = np.mean(np.abs(t_bbox - p_bbox))
    maes.append(mae)

# Convert lists to numpy arrays
all_true_classes = np.array(all_true_classes)
all_pred_classes = np.array(all_pred_classes)
all_true_bboxes = np.array(all_true_bboxes)
all_pred_bboxes = np.array(all_pred_bboxes)
ious = np.array(ious)
maes = np.array(maes)

# Compute classification metrics
accuracy = accuracy_score(all_true_classes, all_pred_classes)
precision, recall, f1, _ = precision_recall_fscore_support(all_true_classes, all_pred_classes, average=None, labels=[0, 1, 2])
class_metrics = {class_names[i]: {'precision': precision[i], 'recall': recall[i], 'f1': f1[i]} for i in range(num_classes)}

# Compute average IoU and MAE for bounding boxes
mean_iou = np.mean(ious)
mean_mae = np.mean(maes)

# Print evaluation report
print("=== Model Evaluation on Test Set ===")
print(f"Classification Accuracy: {accuracy:.4f}")
print("\nClass-wise Metrics:")
for class_name, metrics in class_metrics.items():
    print(f"{class_name}:")
    print(f"  Precision: {metrics['precision']:.4f}")
    print(f"  Recall: {metrics['recall']:.4f}")
    print(f"  F1-Score: {metrics['f1']:.4f}")

print(f"\nBounding Box Metrics:")
print(f"  Mean IoU: {mean_iou:.4f}")
print(f"  Mean Absolute Error (MAE): {mean_mae:.4f}")

# Compute IoU distribution for further analysis
iou_thresholds = [0.5, 0.75, 0.9]
for thresh in iou_thresholds:
    iou_above_threshold = np.mean(ious >= thresh)
    print(f"  IoU >= {thresh}: {iou_above_threshold:.4f}")

# Visualize detection results on a few test images
num_samples = 5
sample_images = X_test[:num_samples]
true_class_labels = y_test_class[:num_samples]
true_bboxes = y_test_bbox[:num_samples]

# Predict class probabilities and bounding boxes
pred_class_probs, pred_bboxes = model.predict(sample_images, batch_size=8, verbose=1)

plt.figure(figsize=(15, 3))
for i in range(num_samples):
    img = sample_images[i].copy() * 255.0  # Denormalize for visualization
    img = img.astype(np.uint8)

    # True bounding box
    true_bbox = true_bboxes[i]
    true_x = int(true_bbox[0] * 128)  # center_x
    true_y = int(true_bbox[1] * 128)  # center_y
    true_w = int(true_bbox[2] * 128)  # width
    true_h = int(true_bbox[3] * 128)  # height
    true_x1 = int(true_x - true_w / 2)
    true_y1 = int(true_y - true_h / 2)
    true_x2 = int(true_x + true_w / 2)
    true_y2 = int(true_y + true_h / 2)
    true_class = class_names[np.argmax(true_class_labels[i])]

    # Predicted bounding box
    pred_bbox = pred_bboxes[i]
    pred_x = int(pred_bbox[0] * 128)
    pred_y = int(pred_bbox[1] * 128)
    pred_w = int(pred_bbox[2] * 128)
    pred_h = int(pred_bbox[3] * 128)
    pred_x1 = int(pred_x - pred_w / 2)
    pred_y1 = int(pred_y - pred_h / 2)
    pred_x2 = int(pred_x + pred_w / 2)
    pred_y2 = int(pred_y + pred_h / 2)
    pred_class = class_names[np.argmax(pred_class_probs[i])]

    # Draw true bounding box (green)
    cv2.rectangle(img, (true_x1, true_y1), (true_x2, true_y2), (0, 255, 0), 2)
    cv2.putText(img, f"True: {true_class}", (true_x1, true_y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Draw predicted bounding box (red)
    cv2.rectangle(img, (pred_x1, pred_y1), (pred_x2, pred_y2), (255, 0, 0), 2)
    cv2.putText(img, f"Pred: {pred_class}", (pred_x1, pred_y1-30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

    # Plot the image
    plt.subplot(1, num_samples, i+1)
    plt.imshow(img)
    plt.axis('off')

plt.savefig('detection_results.png')
plt.close()

print("Detection results saved as 'detection_results.png'.")

=== Model Evaluation on Test Set ===
Classification Accuracy: 0.7198

Class-wise Metrics:
barriers:
  Precision: 0.4670
  Recall: 0.3791
  F1-Score: 0.4185
sidewalks:
  Precision: 0.5888
  Recall: 0.3835
  F1-Score: 0.4645
pothole:
  Precision: 0.7860
  Recall: 0.9032
  F1-Score: 0.8405

Bounding Box Metrics:
  Mean IoU: 0.1660
  Mean Absolute Error (MAE): 0.1239
  IoU >= 0.5: 0.0808
  IoU >= 0.75: 0.0060
  IoU >= 0.9: 0.0000
Detection results saved as 'detection_results.png'.


In [None]:
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

# Function to compute IoU (Intersection over Union) for bounding boxes
def compute_iou(box1, box2):
    # box1 and box2 are in [center_x, center_y, width, height] format
    # Convert to [x1, y1, x2, y2] format
    x1_1 = box1[0] - box1[2] / 2
    y1_1 = box1[1] - box1[3] / 2
    x2_1 = box1[0] + box1[2] / 2
    y2_1 = box1[1] + box1[3] / 2

    x1_2 = box2[0] - box2[2] / 2
    y1_2 = box2[1] - box2[3] / 2
    x2_2 = box2[0] + box2[2] / 2
    y2_2 = box2[1] + box2[3] / 2

    # Compute intersection coordinates
    x1_i = max(x1_1, x1_2)
    y1_i = max(y1_1, y1_2)
    x2_i = min(x2_1, x2_2)
    y2_i = min(y2_1, y2_2)

    # Compute intersection area
    intersection = max(0, x2_i - x1_i) * max(0, y2_i - y1_i)

    # Compute union area
    area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
    area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
    union = area1 + area2 - intersection

    return intersection / (union + 1e-6)  # Avoid division by zero

# Evaluate the model on the test set
all_true_classes = []
all_pred_classes = []
all_true_bboxes = []
all_pred_bboxes = []
ious = []
maes = []

# Get predictions
class_output, bbox_output = model.predict(X_test, batch_size=8, verbose=1)

# Classification predictions
pred_classes = np.argmax(class_output, axis=1)
true_classes = np.argmax(y_test_class, axis=1)

# Bounding box predictions
pred_bboxes = bbox_output
true_bboxes = y_test_bbox

# Collect predictions and true values
all_true_classes.extend(true_classes)
all_pred_classes.extend(pred_classes)
all_true_bboxes.extend(true_bboxes)
all_pred_bboxes.extend(pred_bboxes)

# Compute IoU and MAE for each sample
for t_bbox, p_bbox in zip(true_bboxes, pred_bboxes):
    iou = compute_iou(t_bbox, p_bbox)
    ious.append(iou)
    mae = np.mean(np.abs(t_bbox - p_bbox))
    maes.append(mae)

# Convert lists to numpy arrays
all_true_classes = np.array(all_true_classes)
all_pred_classes = np.array(all_pred_classes)
all_true_bboxes = np.array(all_true_bboxes)
all_pred_bboxes = np.array(all_pred_bboxes)
ious = np.array(ious)
maes = np.array(maes)

# Compute classification metrics
accuracy = accuracy_score(all_true_classes, all_pred_classes)
precision, recall, f1, _ = precision_recall_fscore_support(all_true_classes, all_pred_classes, average=None, labels=[0, 1, 2])
class_metrics = {class_names[i]: {'precision': precision[i], 'recall': recall[i], 'f1': f1[i]} for i in range(num_classes)}

# Compute average IoU and MAE for bounding boxes
mean_iou = np.mean(ious)
mean_mae = np.mean(maes)

print("Computed predictions and metrics successfully.")

Computed predictions and metrics successfully.


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Compute classification confusion matrix
class_cm = confusion_matrix(all_true_classes, all_pred_classes, labels=[0, 1, 2])

# Plot classification confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(class_cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.title('Classification Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.savefig('classification_confusion_matrix.png')
plt.close()

print("Classification Confusion Matrix:")
print(class_cm)
print("\nLabels:", class_names)
print("Classification confusion matrix saved as 'classification_confusion_matrix.png'.")

Classification Confusion Matrix:
[[ 389  146  491]
 [ 223  464  523]
 [ 221  178 3724]]

Labels: ['barriers', 'sidewalks', 'pothole']
Classification confusion matrix saved as 'classification_confusion_matrix.png'.
