<div align="center">

  <a href="https://ultralytics.com/yolov8" target="_blank">
    <img width="1024", src="https://i.imgur.com/STDGXU4.jpg"></a>




Lammah (Observer) is a state-of-the-art machine learning model for the detection and evaluation of visual pollution elements. The model is trained to accurately identify and locate these elements within any kind of devices in real-time, allowing for efficient and automated analysis of the level of visual pollution present in a given area. This powerful tool allows for effective planning and execution of clean-up efforts, as well as for monitoring the progress of such efforts over time, thus having a significant impact on the community's aesthetic and environment.

The model is trained using YOLOv8 <a href="https://ultralytics.com">Ultralytics</a>
</div>

Note: Please contact me if you need the weights of the models, they are not uploaded due to file's size

Dataset link: https://drive.google.com/file/d/1ULqYtd9yomeGz53WBhgRdPRFB37ppeDU/view

# 0. Setup

Pip install `ultralytics` and [dependencies](https://github.com/ultralytics/ultralytics/blob/main/requirements.txt) and check PyTorch and GPU.

In [None]:
# Pip install method (recommended)
%pip install ultralytics
import ultralytics
ultralytics.checks()

In [None]:
# Git clone method (for development)
!git clone https://github.com/ultralytics/ultralytics
%pip install -qe ultralytics

In [None]:
# import the needed libraries
from imgaug import augmenters as iaa
from collections import defaultdict
import matplotlib.pyplot as plt
from ultralytics import YOLO
import albumentations as A
import pandas as pd
import numpy as np
import random
import shutil
import cv2
import csv
import os

# 1. Preprocessing
<p align=""><a><img width="1000" src="https://i.imgur.com/2Z4n4n3.jpg"/></a></p>
This section contains the experiments of data preprocessing including:

| Task   | Description   
| -------- | ----------- | 
| Annotations Preparation | Convert images coordinates to Yolov8 annotations form |
| Class-based augmentation | A proposed technique from me to augment the <br> images based on classes frequency in the dataset. |




## 1.1 Annotations Preparation

Validate a model's performance on the dataset's `val` or `test` splits (90% `training`, 10% `validation`). The best custom-trained model is used.


In [1]:
# Load the data from the CSV file
df = pd.read_csv("train.csv")

In [None]:
# create annotations folder
if not os.path.exists("annotations"):
    os.makedirs("annotations")

In [None]:
# Group the data by the "image_path" column and count the number of rows for each value
counts = df.groupby("image_path").size()
df["num_objects"] = df["image_path"].apply(lambda x: counts[x])

In [None]:
# Create the annotations .txt files in COCO format

for index, row in df.iterrows():
    class_name = row['name']
    image_path = row['image_path']
    class_ = int(row['class'])

    # Load the image
    image = cv2.imread(os.path.join("images", image_path))
    height, width, _ = image.shape
    xmin = max(0,min(int(row['xmin'])*2, width))
    ymin = max(0,min(int(row['ymin'])*2, height))
    xmax = max(0,min(int(row['xmax'])*2, width))
    ymax = max(0,min(int(row['ymax'])*2, height))
    
    
    b_width = abs(xmax-xmin)
    b_heigth = abs(ymax-ymin)
    b_center_x = xmin + (b_width)/2
    b_center_y = ymin + (b_heigth)/2
    
    # Normalize the coordinates
    center_x_norm = round((b_center_x / width), 8)
    center_y_norm = round((b_center_y / height), 8)
    b_width_norm = round((b_width / width), 8)
    b_heigth_norm = round((b_heigth / height), 8)
    
    
    # Open the text file for the image
    with open(os.path.join("annotations", image_path) + '.txt', 'a') as f:
      # Write the information to the file
      f.write(str(class_) + ' ' + str(center_x_norm) + ' ' + str(center_y_norm) + ' ' + str(b_width_norm) + ' ' + str(b_heigth_norm) + '\n')


In [None]:
''' assign variable names for the folders where:
1- annotations: The folder where COCO format images are stored
2- images: Where all of the images are stored
3- train: Where train images will be stored
4- valid: Where validation images will be stored
5- test: Where test images will be stored
'''
annotations_dir = 'annotations'
images_dir = 'images'
train_images_dir = 'train'
valid_images_dir = 'valid'
test_images_dir = 'test'

# create folders for the images and their labels (in COCO format)
if not os.path.exists(train_images_dir):
    os.makedirs(train_images_dir)
if not os.path.exists(os.path.join(train_images_dir,"labels")):
    os.makedirs(os.path.join(train_images_dir,"labels"))
if not os.path.exists(os.path.join(train_images_dir,"images")):
    os.makedirs(os.path.join(train_images_dir,"images"))

if not os.path.exists(valid_images_dir):
    os.makedirs(valid_images_dir)
if not os.path.exists(os.path.join(valid_images_dir,"labels")):
    os.makedirs(os.path.join(valid_images_dir,"labels"))
if not os.path.exists(os.path.join(valid_images_dir,"images")):
    os.makedirs(os.path.join(valid_images_dir,"images"))
    
if not os.path.exists(test_images_dir):
    os.makedirs(test_images_dir)
if not os.path.exists(os.path.join(test_images_dir,"labels")):
    os.makedirs(os.path.join(test_images_dir,"labels"))
if not os.path.exists(os.path.join(test_images_dir,"images")):
    os.makedirs(os.path.join(test_images_dir,"images"))

In [None]:
# Get the list of txt files
txt_files = [file for file in os.listdir(annotations_dir) if file.endswith('.jpg.txt')]

# Rename the txt files
for txt_file in txt_files:
    old_file_name = os.path.join(annotations_dir, txt_file)
    new_file_name = os.path.join(annotations_dir, txt_file.replace('.jpg.txt', '.txt'))
    os.rename(old_file_name, new_file_name)

## 1.2 Splitting The Data

Split training, validation and testing ratios.
- Training Pecentage: 80%
- Validation Percentage: 19%
- Testing Percentage: 1%


In [None]:
num_img = len(txt_files)
train_ratio = 0.8
valid_ratio = 0.19
test_ratio = 0.01
train_idx = int(num_img*train_ratio)
valid_idx = train_idx + int(num_img*valid_ratio)
test_idx = valid_idx + int(num_img*test_ratio)

train_images = txt_files[:train_idx]
valid_images = txt_files[train_idx:valid_idx]
test_images = txt_files[valid_idx:]

In [None]:
# Copy the corresponding images to the destination directory

# Training images
for txt_file in train_images:
    image_name = txt_file.replace('.txt', '')
    label_path = os.path.join(annotations_dir, image_name+".txt") 
    image_path = os.path.join(images_dir, image_name+".jpg")
    
    if os.path.exists(label_path):
        shutil.copy(label_path, os.path.join(train_images_dir,"labels"))
    
    if os.path.exists(image_path):
        shutil.copy(image_path, os.path.join(train_images_dir,"images"))

        
# Validation images
for txt_file in valid_images:
    image_name = txt_file.replace('.txt', '')
    label_path = os.path.join(annotations_dir, image_name+".txt") 
    image_path = os.path.join(images_dir, image_name+".jpg")
    
    if os.path.exists(label_path):
        shutil.copy(label_path, os.path.join(valid_images_dir,"labels"))
    
    if os.path.exists(image_path):
        shutil.copy(image_path, os.path.join(valid_images_dir,"images"))
        
        
# Test images
for txt_file in test_images:
    image_name = txt_file.replace('.txt', '')
    label_path = os.path.join(annotations_dir, image_name+".txt") 
    image_path = os.path.join(images_dir, image_name+".jpg")
    
    if os.path.exists(label_path):
        shutil.copy(label_path, os.path.join(test_images_dir,"labels"))
    
    if os.path.exists(image_path):
        shutil.copy(image_path, os.path.join(test_images_dir,"images"))


## 1.3 Augmentation

Augmentation is a crucial step in the preprocessing phase, as it creates and mimic the images to increase classes existance

In [None]:
# Define the paths for the images and annotations
images_path = "train/images"
annotations_path = "train/labels"
aug_images_path = images_path
aug_annotations_path = annotations_path

# Create a dictionary to store the class counts
class_counts = defaultdict(int)
class_names = {
    0:"GRAFFITI",
    1:"FADED_SIGNAGE",
    2:"POTHOLES",
    3:"GARBAGE",
    4:"CONSTRUCTION_ROAD",
    5:"BROKEN_SIGNAGE",
    6:"BAD_STREETLIGHT",
    7:"BAD_BILLBOARD",
    8:"SAND_ON_ROAD",
    9:"CLUTTER_SIDEWALK",
    10:"UNKEPT_FACADE"
}

category_ids = class_names.keys()

In [None]:
def convert_bbox_coco2yolo(img_width, img_height, bbox, cls_):
    """
    Convert bounding box from COCO  format to YOLO format

    Parameters
    ----------
    img_width : int
        width of image
    img_height : int
        height of image
    bbox : list[int]
        bounding box annotation in COCO format: 
        [top left x position, top left y position, width, height]

    Returns
    -------
    list[float]
        bounding box annotation in YOLO format: 
        [x_center_rel, y_center_rel, width_rel, height_rel]
    """
    
    # YOLO bounding box format: [x_center, y_center, width, height]
    # (float values relative to width and height of image)
    converted_boxes = []
    for bx, c in zip(bbox, cls_):
        x_tl, y_tl, w, h = bx

        dw = 1.0 / img_width
        dh = 1.0 / img_height

        x_center = x_tl + w / 2.0
        y_center = y_tl + h / 2.0

        x = x_center * dw
        y = y_center * dh
        w = w * dw
        h = h * dh

        converted_boxes.append([c, x, y, w, h])
    return converted_boxes

In [None]:
# depending of the augmentation probability, get number of times to augment a certain image
# the less the probability is, the more we generate augmented images
def get_num_samples(proba):
    if 0<=proba<=0.3:
        return 4
    if 0.3<proba<=0.5:
        return 2
    else:
        return 1

In [None]:
for name in class_names.values():
    class_counts[name]=0
# Iterate over all of the annotations
for annotation_file in os.listdir(annotations_path):
    # Open the annotation file
    with open(os.path.join(annotations_path, annotation_file), "r") as f:
        lines = f.readlines()

    # Iterate over each line of the annotation file
    for line in lines:
        # Split the line into parts
        parts = line.strip().split()

        # Get the class index
        class_index = int(parts[0])

        # Get the class name
        class_name = class_names[class_index]

        # Increment the count for the class
        class_counts[class_name] += 1

# Determine the minimum number of samples in any class
# min_samples = min(class_counts.values())
data_median = np.median([val for val in class_counts.values() if val>0])
p_under_represented = lambda x: abs(min(0,(class_counts[class_names[x]]-data_median))/data_median)
p_over_represented  = 0.3


min_samples = 50

In [None]:
# Create a list to store the augmented annotations
augmented_annotations = [] 

no_color_aug_classes = [8, 5, ] # no color aug


# transform 1 : no color changes
transform1 = A.Compose([
                        A.HorizontalFlip(p=0.5),
                        A.ShiftScaleRotate(p=0.5),
                        A.CLAHE(p=0.5),
                        A.Emboss(p=0.2),
                        A.PiecewiseAffine(scale=(0.01, 0.03), p=0.3),
                        A.Perspective(p=0.3),
                        A.RandomBrightness(p=0.25),
                        A.Cutout(p=0.15, num_holes=15, max_h_size=30, max_w_size=20, fill_value=0),
                    ],
                    bbox_params=A.BboxParams(format='coco', label_fields=['class_annotations']),
                )
# tranform 2 : with color changes
transform2 = A.Compose([
                        A.HorizontalFlip(p=0.5),
                        A.ShiftScaleRotate(p=0.5),
                        A.RandomBrightnessContrast(p=0.3),
                        A.RGBShift(r_shift_limit=30, g_shift_limit=30, b_shift_limit=30, p=0.3),
                        A.CLAHE(p=0.5),
                        A.HueSaturationValue(hue_shift_limit=10, sat_shift_limit=25, val_shift_limit=30, p=0.2),
                        A.Emboss(p=0.2),
                        A.PiecewiseAffine(scale=(0.01, 0.03), p=0.3),
                        A.Perspective(p=0.3),
                        A.RandomBrightness(p=0.25),
                        A.RandomContrast(p=0.25),
                        A.RandomFog(p=0.05, fog_coef_lower=0.01, fog_coef_upper=0.05, alpha_coef=0.08),
                        A.ToGray(p=0.1),
                        A.Cutout(p=0.15, num_holes=15, max_h_size=30, max_w_size=20, fill_value=0),
                    ],
                    bbox_params=A.BboxParams(format='coco', label_fields=['class_annotations']),
                )

          
# Iterate over all of the images
for image_file in os.listdir(images_path):
    # Load the image
    image = cv2.imread(os.path.join(images_path, image_file))
    img_h = image.shape[0]
    img_w = image.shape[1]


    # Get the corresponding annotation file
    annotation_file = image_file.replace(".jpg", ".txt")
    annotation_file = os.path.join(annotations_path, annotation_file)

    # Open the annotation file
    with open(annotation_file, "r") as f:
        lines = f.readlines()

    # Iterate over each line of the annotation file
    image_annotations = []
    class_annotations = []
    for line in lines:
        # Split the line into parts
        parts = line.strip().split()

        # Get the class index
        class_index = int(parts[0])

        # Get the class name
        class_name = class_names[class_index]

        # class x_center y_center width height

        bx_w    = int(float(parts[3])*img_w)
        bx_h    = int(float(parts[4])*img_h)
        bx_xmin = max(0,min(img_w,int((float(parts[1])- (float(parts[3])/2))*img_w)))
        bx_ymin = max(0,min(img_h,int((float(parts[2])- (float(parts[4])/2))*img_h)))
        bx_xmax = bx_xmin+bx_w
        bx_ymax = bx_ymin+bx_h

        image_annotations.append([bx_xmin, bx_ymin, bx_w, bx_h ])
        # image_annotations.append([class_index, bx_xmin, bx_ymin, bx_xmax, bx_ymax ])
        # image_annotations.append([bx_xmin, bx_ymin, bx_xmax, bx_ymax ])
        class_annotations.append(class_index)

        # If the class is underrepresented

        aug_probability = max(p_under_represented(class_index), p_over_represented)
        if random.random() < aug_probability:
            # Determine the number of samples to add
            # samples_to_add = min_samples - class_counts[class_name]
            samples_to_add = get_num_samples(aug_probability)

            #### New Aug
            # Augment the image
            try:
                for i in range(samples_to_add):
                    category_ids = [c[0] for c in image_annotations]
                    if class_index in no_color_aug_classes:
                        transformed = transform1(image=image, bboxes=image_annotations, class_annotations=class_annotations)
                    else:
                        transformed = transform2(image=image, bboxes=image_annotations, class_annotations=class_annotations)

                    augmented_image = transformed['image']
                    augmented_annotations = transformed['bboxes']
                    class_annotations = transformed['class_annotations']

                    # augmented_image = seq(images=np.array([image]))[0]
                    cv2.imwrite(os.path.join(aug_images_path, f"{image_file.split('.')[0]}_{i}.jpg"), augmented_image)

                    augmented_annotations = convert_bbox_coco2yolo(img_width=img_w, img_height=img_h, bbox=augmented_annotations, cls_=class_annotations)

                    # Write the augmented annotations to a new file
                    with open(os.path.join(aug_annotations_path, f"{image_file.split('.')[0]}_{i}.txt"), "w") as f:
                        for annotation in augmented_annotations:
                            f.write(" ".join([str(x) for x in annotation]) + "\n")
            except:
                pass
            #### End New Aug


# Now you can use the "augmented_annotations.txt" file to train your YOLOv5or7or8 model

In [None]:
# Set the directories containing images and annotations
image_dir = "train/images"
annotated_images = image_dir
annotation_dir = "train/labels"


# Loop through all images in the image directory
for image_file in os.listdir(image_dir):
    # Check if the file is a JPG image
    if image_file.endswith('.jpg'):
        # Load the image
        image = cv2.imread(os.path.join(image_dir, image_file))
        # Get the image width and height
        img_height, img_width, _ = image.shape
        # Load the corresponding annotation file
        annotation_file = image_file.replace('.jpg', '.txt')
        with open(os.path.join(annotation_dir, annotation_file)) as f:
            lines = f.readlines()
        
        # Loop through all lines in the annotation file
        for line in lines:
            # Extract the annotation values
            class_id, x_center, y_center, width, height = line.strip().split()
            # Convert the annotation values to integers
            x_center = int(float(x_center) *float(img_width))
            y_center = int(float(y_center) * float(img_height))
            width = int(float(width) * float(img_width))
            height = int(float(height) * float(img_height))

            # calculate the xmin, ymin, xmax, ymax
            xmin = int(x_center - (width / 2))
            ymin = int(y_center - (height / 2))
            xmax = int(x_center + (width / 2))
            ymax = int(y_center + (height / 2))
            # Draw the boundary box on the image
            cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
        
        # Save the image with the boundary boxes plotted
        output_file = image_file.replace('.jpg', '_annotated.jpg')
        cv2.imwrite(os.path.join(annotated_images, output_file), image)


# 2. Train

<p align=""><a><img width="1000" src="https://i.imgur.com/PpaBsmW.jpg"/></a></p>

Train the model on detecting the labeled images, with the following tuned arguments, the rest is kept as the default:

| Argument                                                             | Value          | 
|----------------------------------------------------------------------------|--------------------|
| `task`                                            | detect                           
| `mode`                    | train      
| `model`                                                   | yolov8x.pt                  
| `data`                    | data.yaml        
|`epochs`                          | 60           
| `patience`                             | 10          
| `batch`      | 8      |     |
| `imgsz` | 640               
| `device`                         | 0           
| `workers`         |8          
|`optimizer`                             | SGD           
|`augment`                            | true       
|`iou`                            | 0.7         

In [None]:
!yolo task=detect mode=train model=yolov8x.pt  epochs=60 data=data.yaml workers=8 imgsz=640 batch=8  device=0 augment=true patience=10 dropout=0.05

# 3. Prediction
<img width="1024" src="https://i.imgur.com/VwgnWek.jpg">

This section is for validation, testing, and postprocessing




## 3.1 Validation

Validate a model's performance on the 1st theme dataset's `val` or `test` splits (90% `training`, 10% `validation`). The best custom-trained model is used.


In [None]:
# Load best weight and validate it using the dataset  
!yolo task=detect mode=val model=best.pt data=data.yaml

## 3.2 Model Postprocessing

Once the model is trained, there are some classes that are under presented. Therefore, I have proposed and used a technique called "class-based acceptance" and "Class-based bounding box" to tune the model's confidence based on the class.

In [None]:
'''
Class-based acceptance : A method to accept a boundry box based on 
the confidence of the prediction, note that for each class we have  
a confidence level that will assure us a precision of 60% or higher
'''

def accept_bbox(bbox: list) -> bool:
    # bbox = [cls_ xmax xmin ymax ymin conf]
    # class2conf is a dictionary of acceptance confidence values for each class
    class2conf = {  0:0.2, 1:0.35, 2:0.4, 3:0.3, 4:0.2, 5:0.3, 
                    6:0.1, 7:0.25, 8:0.35, 9:0.18, 10:0.45} # 0.5515

    # class2conf = {  k:v*2 for k,v in class2conf.items()} # double harsh [score = 0.4325]
    class2conf = {  k:v/2 for k,v in class2conf.items()} # double soft [0.5518]
    # class2conf = {  k:v/4 for k,v in class2conf.items()}   # quad soft [0.0]

    # class2conf = {i:0.1 for i in range(11)} # 0.5518

    if float(bbox[-1]) >= class2conf[int(bbox[0])]:
        return True
    return False

In [None]:
def merge_similar_boxes(bboxes: list) -> list:
    return bboxes
    # hyperparameters:
    # boxes that intersect by intersection_thresh % of their total area are counted as similar, and are merged
    original_bboxes = bboxes[:]
    intersection_thresh = 0.7
    add_both = False
    # the custom probability of merging similar boxes for some certain classes
    class2merge_acceptance_proba = {0:0.3 , 3:0.2, 5:0.3 ,7:0.2 } 
    default_merge_acceptance_proba = 0.3 # probability of merging similar boxes
    # bboxes = [[cls_ xmax xmin ymax ymin conf], ...]
    # bboxes = [[current_cls, img, class_names[current_cls], xmax//2, xmin//2, ymax//2, ymin//2, conf], ...]
    merged_groupes = {}
    num_boxes_merged = 0
    filtered_boxes = []
    for i,first_box in enumerate(bboxes[:-1]):
        my_parent = "No body"
        has_intersection = False
        best_box = first_box[:]
        for current_box in bboxes[i+1:]:
            cls_1, _ , _ , xmax1, xmin1, ymax1, ymin1, conf1 = first_box
            cls_2, _ , _ , xmax2, xmin2, ymax2, ymin2, conf2 = current_box
            box1 = [xmax1, xmin1, ymax1, ymin1, conf1]
            box2 = [xmax2, xmin2, ymax2, ymin2, conf2]
            box_intersection = intersection_area(box1, box2)
            same_classes = (cls_1==cls_2)
            enough_intersection = False
            accept_intersection = True

            if box_intersection > intersection_thresh:
                enough_intersection = True
            
            # if all merge conditions are met:
            if same_classes and enough_intersection:
                has_intersection=True
                if first_box[-1]>current_box[-1]:
                    best_box = first_box[:]
                else:
                    best_box = current_box[:]
                
        if not has_intersection:
            if add_both:
                filtered_boxes.append(first_box[:-1])
                filtered_boxes.append(current_box[:-1])
            else:
                filtered_boxes.append(first_box[:-1])
        else:
            filtered_boxes.append(best_box[:-1])
    
    if len(filtered_boxes) > 0:
        return filtered_boxes
    
    return [box[:-1] for box in original_bboxes[:]]

In [None]:
def intersection_area(box1, box2):
    # unpack the coordinates of the rectangles
    xmax1, xmin1, ymax1, ymin1,_ = box1
    xmax2, xmin2, ymax2, ymin2,_ = box2

    # calculate the intersection area
    x_intersection = max(0, min(xmax1, xmax2) - max(xmin1, xmin2))
    y_intersection = max(0, min(ymax1, ymax2) - max(ymin1, ymin2))
    intersection = x_intersection * y_intersection

    # calculate the total area of the rectangles
    total_area = (xmax1 - xmin1) * (ymax1 - ymin1) + (xmax2 - xmin2) * (ymax2 - ymin2) - intersection

    # calculate the normalized intersection area
    if total_area <=0:
        normalized_intersection = 0
    else:
        normalized_intersection = intersection / total_area

    return normalized_intersection

In [None]:
# Define the paths for the images and annotations
images_path = "images"
model_paths = [ "best.pt"]
test_csv_path = "test.csv"


class_names = {
    0:"GRAFFITI",
    1:"FADED_SIGNAGE",
    2:"POTHOLES",
    3:"GARBAGE",
    4:"CONSTRUCTION_ROAD",
    5:"BROKEN_SIGNAGE",
    6:"BAD_STREETLIGHT",
    7:"BAD_BILLBOARD",
    8:"SAND_ON_ROAD",
    9:"CLUTTER_SIDEWALK",
    10:"UNKEPT_FACADE"
}

category_ids = class_names.keys()

In [None]:
# load the best trained model
models = [YOLO(p) for p in model_paths]

In [None]:
image_names = []
with open(test_csv_path, 'r') as f:
    reader = csv.reader(f)
    next(reader) # skip the header row
    for row in reader:
        image_names.append(row[0])

# 4. Results
<img width="1024" src="https://i.imgur.com/Dv4lkAg.jpg">

This section contains the results of the trained model with different examples, various evaluation metrics



&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<img align="left" src="https://i.imgur.com/D1S3oD8.png" width="600">

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<img align="left" src="https://i.imgur.com/yDAEPX6.png" width="600">

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<img align="left" src="https://i.imgur.com/28OhYXb.png" width="600">

## Examples 


&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<img align="left" src="https://i.imgur.com/FmHSuoa.jpg" width="600">

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
<img align="left" src="https://i.imgur.com/TCZgcec.jpg" width="600">