In [None]:
import numpy as np 
import matplotlib.pyplot as plt 
import re
import os
import cv2
import pandas as pd
import time
import torch
import supervision as sv
from ultralytics import YOLO
# import albumentations as A
%matplotlib inline

In [None]:
!pip install -U ultralytics

In [None]:
%pip uninstall -y albumentations

# Functions

In [None]:
def read_image_data(path):
    root_path = os.getcwd()
    img_paths = []
    if os.path.exists(os.path.join(root_path, path)):
        for root, dirs, files in os.walk(os.path.join(root_path, path)):
            for i in files:
                img_paths.append(
                    os.path.join(root,i)
                )
        return img_paths
    else:
        print("folder does not exist")

def count_classes(folder_path, con=".txt"):
    counts = {"tm":0,"tnb":0,"other":0}
    sys_root = os.getcwd()
    
    if con == ".txt":
        if os.path.exists(os.path.join(sys_root, folder_path)):
            for root, _, files in os.walk(os.path.join(sys_root, folder_path)):
                for file_name in files:
                    if file_name.endswith(con):
                        source_path = os.path.join(root, file_name)
                        with open(source_path, 'r') as file:
                            for line in file:
                                values = line.strip().split()
                                if values:
                                    first_index = int(values[0])
                                    if first_index == 0:
                                        counts["tm"] += 1
                                    elif first_index == 1:
                                        counts["tnb"] += 1
                                    elif first_index == 2:
                                        counts["other"] += 1
            return counts
        else:
            print("Folder does not exist")
    elif con == ".xml":
        pass
    else:
        print("No extension found")

In [None]:
image_paths = read_image_data("data\images/val")

In [None]:
train_count = count_classes("data/labels/train", ".txt")
test_count = count_classes("data/labels/test", ".txt")
val_count = count_classes("data/labels/val", ".txt")

In [None]:
_, axis = plt.subplots(1,3, figsize=(16,6))
bars1 = axis[0].bar([*train_count.keys()], [*train_count.values()])
axis[0].set_title("Train set")
axis[0].set_xlabel("Classes")
axis[0].set_ylabel("Total")
axis[0].bar_label(bars1)
bars2= axis[1].bar([*test_count.keys()], [*test_count.values()])
axis[1].set_title("Test set")
axis[1].set_xlabel("Classes")
axis[1].set_ylabel("Total")
axis[1].bar_label(bars2)
bars3 = axis[2].bar([*val_count.keys()], [*val_count.values()])
axis[2].set_title("Validation set")
axis[2].set_xlabel("Classes")
axis[2].set_ylabel("Total")
axis[2].bar_label(bars3)
plt.tight_layout()
plt.show()

In [None]:
# write csv of train, test, and val (image path, annot path, classes)
def text_dataFrame(annots_path, images_path= None):
    root_path = os.getcwd()
    def read_files(ap):
        data = []
        for root, dirs, files in os.walk(os.path.join(root_path, ap)):
            for i in files:
                data.append(
                    os.path.join(root, i)
                )
        return data
    
    def search_image_path(folder_name):
        data = {}
        for root, dirs, files in os.walk(os.path.join(root_path, folder_name)):
            for i in files:
                data[i.split(".")[0]] = os.path.join(root, i)
        return data
    
    image_data = search_image_path(images_path)

    data_dic = {"image_path":[],"annot_path": [], "class_label": []}
    if os.path.exists(os.path.join(root_path, annots_path)):
        data = read_files(annots_path)
        for i in data:
            name = re.findall(re.compile(r"[a-zA-Z-0-9\.\_]+"), i)[-1]
            with open(i, "r") as f:
                lines = f.readlines()
            for line in lines:
                data = line.strip().split()  # Assuming space-separated values
                class_, x1_, y1_, x2_, y2_ = int(data[0]), float(
                    data[1]), float(data[2]), float(data[3]), float(data[4])
                an = name.split('.')[0]
                data_dic['image_path'].append(image_data[an])
                data_dic['annot_path'].append(i)
                data_dic['class_label'].append(class_)
                #"image":data_dic['image_path'],
        return pd.DataFrame(
            {
                "image":data_dic['image_path'],"annot": data_dic['annot_path'], "Class": data_dic['class_label']
            }
        )
    else:
        print("path does not exist")


train_df = text_dataFrame("data/labels/train", "data/images/train")
# test_df = text_dataFrame("data/labels/test", "data/images/test")
# val_df = text_dataFrame("data/labels/validation", "data/images/validation")

In [None]:
# tm = train_df[train_df['Class'] == 0]
# tnb = train_df[train_df['Class'] == 1]
# other = train_df[train_df['Class'] == 2]

# Processes

In [None]:
def save(data, to_save, process):
    root_path = os.getcwd()
    if os.path.exists(os.path.join(root_path, to_save)):
        for i in data:
            p = process(i)
            name = re.findall(re.compile(r"[a-zA-Z-0-9\.\_]+"),i)[-1]
            cv2.imwrite(os.path.join(root_path, to_save, name),p)
        print("image writing done.")
    else:
        print(f"{to_save} folder does not exist!")
    
def smoothing(img):
    img = cv2.imread(img)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    denoised_image = cv2.bilateralFilter(img, 3,10,10)
    return denoised_image

def contrast_enhance(image):
    from CE_modified import ContastEnhancement
    #denoised_image = smoothing(image)
    img = cv2.imread(image)
    ce = ContastEnhancement(img)
    processed_image = ce.prcoess_image()
    return processed_image

In [None]:
def display_image(image, annot):
    def read_yolo_annotation(annotation_file):
        with open(annotation_file, 'r') as file:
            lines = file.readlines()
            annotations = [list(map(float, line.strip().split()))
                           for line in lines]
        return annotations

    def draw_boxes(image_path, annotation_file):
        # Read image
        image = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB)

        # Read bounding box annotations
        annotations = read_yolo_annotation(annotation_file)

        # Draw bounding boxes on the image
        for annotation in annotations:
            _, center_x, center_y, width, height = annotation
            x, y, w, h = int((center_x - width / 2) * image.shape[1]), int(
                (center_y - height / 2) * image.shape[0]), int(width * image.shape[1]), int(height * image.shape[0])
            cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)

        plt.imshow(image)

    draw_boxes(image, annot)


def apply_augment(image_path, annot_path, exclude=None):
    transform = A.Compose([
        A.HorizontalFlip(p=1)
    ], bbox_params=A.BboxParams(format='yolo'))

    root_path = os.getcwd()
    if os.path.exists(os.path.join(root_path, image_path)) and os.path.exists(os.path.join(root_path, annot_path)):
        image = cv2.imread(os.path.join(root_path, image_path))
        with open(os.path.join(root_path, annot_path), "r") as f:
            lines = f.readlines()
        bboxes = []
        classes = []
        for line in lines:
            data = line.strip().split()
            class_, x1_, y1_, x2_, y2_ = int(data[0]), float(
                data[1]), float(data[2]), float(data[3]), float(data[4])
            # YOLO format
            bboxes.append([x1_, y1_, x2_, y2_, str(class_)])
            classes.append(class_)
        transformed = transform(image=image, bboxes=bboxes)
        transformed_image = transformed['image']
        transformed_bboxes = transformed['bboxes']
        return transformed_image, transformed_bboxes
    else:
        print("not found!")


def write_annotated_image(name, path, tm):
    to_save = os.path.join(os.getcwd(), path, name)
    cv2.imwrite(to_save, tm)


def write_yolo_annot(name, path, transformed_bboxes):
    list_ = []
    for i in transformed_bboxes:
        l = list(i)
        temp = l[-1]
        l.remove(l[-1])
        l.insert(0, temp)
        list_.append(l)
    to_save = os.path.join(os.getcwd(), path, name)
    with open(to_save, 'w') as file:
        for annotation in list_:
            line = ' '.join([str(item) for item in annotation]) + '\n'
            file.write(line)

def apply1(dataset, ex, tp, loop):
    annot_ = dataset['annot'].values
    for i, v in enumerate(dataset['image']):
        for t in range(loop):
            name = re.findall(re.compile(r"[a-zA-Z-0-9\_\.]+"), v)[-1]
            n = name.split(".")[0]
            an = annot_[i]
            transformed_image, transformed_bboxes = apply_augment(
                v, an, exclude=ex)
            write_annotated_image(f"{n}AUG{tp}{t}.png", "data/images/train", transformed_image)
            write_yolo_annot(f"{n}AUG{tp}{t}.txt", "data/labels/train", transformed_bboxes)


def delete_aug(path):
    folder_path = path
    files = os.listdir(folder_path)
    for file_name in files:
        if "AUG" in file_name:
            file_path = os.path.join(folder_path, file_name)
            os.remove(file_path)


# Model

In [None]:
yolo8_model = YOLO('yolov5s.yaml')
yolo8_model = YOLO("yolov5s.pt")
yolo8_model.to('cuda')
yolo8_results = yolo8_model.train(
    data='assets.yaml',
    imgsz = 896,
    epochs=100,
    batch=32,
    device=0,
    verbose=False,
    val=True,
    optimizer="AdamW",
    lr0 = 0.001,
    lrf = 0.01,
    patience=50
)

In [None]:
yolo8_model = YOLO('yolov8s.yaml')
yolo8_model = YOLO("yolov8s.pt")
yolo8_model.to('cuda')
yolo8_results = yolo8_model.train(
    data='assets.yaml',
    imgsz = 896,
    epochs=100,
    batch=32,
    device=0,
    verbose=False,
    val=True,
    optimizer="AdamW",
    lr0 = 0.001,
    lrf = 0.01,
    patience=50
)

# Model Evaluation

In [None]:
test_images = []
test_annot = []
def read_test_data(path):
    data = []
    for root, dirs, files in os.walk(os.path.join(os.getcwd(),path)):
        for i in files:
            data.append(os.path.join(root,i))
    return data
test_images = read_test_data("data\\images\\test")
test_annot = read_test_data("data\\labels\\test")

In [None]:
def plot_confusion_matrix(image_dir, annot_dir, classes, model, conf_threshold=0.5, iou_threshold=0.5):
    if os.path.exists(os.path.join(os.getcwd(), image_dir)) and os.path.exists(os.path.join(os.getcwd(), annot_dir)):
        dataset = sv.DetectionDataset.from_yolo(
            images_directory_path=image_dir,
            annotations_directory_path=annot_dir,
            data_yaml_path=classes,
        )

        def callback(image: np.ndarray) -> sv.Detections:
            result = model(image)[0]
            return sv.Detections.from_ultralytics(result)

        confusion_matrix = sv.ConfusionMatrix.benchmark(
            dataset=dataset,
            callback=callback,
            conf_threshold=conf_threshold,
            iou_threshold=iou_threshold
        )
        confusion_matrix.plot(fig_size=(12, 6))
    else:
        print("path does not exists")


def get_confusion_matrix(image_dir, annot_dir, classes, model,conf_threshold=0.5, iou_threshold=0.5):
    if os.path.exists(os.path.join(os.getcwd(), image_dir)) and os.path.exists(os.path.join(os.getcwd(), annot_dir)):
        dataset = sv.DetectionDataset.from_yolo(
            images_directory_path=image_dir,
            annotations_directory_path=annot_dir,
            data_yaml_path=classes,
        )

        def callback(image: np.ndarray) -> sv.Detections:
            result = model(image)[0]
            return sv.Detections.from_ultralytics(result)

        confusion_matrix = sv.ConfusionMatrix.benchmark(
            dataset=dataset,
            callback=callback,
            conf_threshold=conf_threshold,
            iou_threshold=iou_threshold
        )
        cm = confusion_matrix.matrix
        classes_ = confusion_matrix.classes
        return cm, classes_
    else:
        print("path does not exists")


def calculate_meanAveragePrecision(image_dir, annot_dir, classes, model):
    dataset = sv.DetectionDataset.from_yolo(
        images_directory_path=image_dir,
        annotations_directory_path=annot_dir,
        data_yaml_path=classes,
    )

    def callback(image: np.ndarray) -> sv.Detections:
        result = model(image)[0]
        return sv.Detections.from_ultralytics(result)

    mean_average_precision = sv.MeanAveragePrecision.benchmark(
        dataset=dataset,
        callback=callback
    )

    return {'mAP50': mean_average_precision.map50,
            'mAP50_95': mean_average_precision.map50_95}


def calculate_precision_multi_class(confusion_matrix):
    num_classes = confusion_matrix.shape[0]
    precision_per_class = []

    for i in range(num_classes):
        true_positives = confusion_matrix[i, i]
        false_positives = np.sum(confusion_matrix[:, i]) - true_positives
        denominator = true_positives + false_positives

        if denominator == 0:
            precision_per_class.append(0.0)
        else:
            precision_per_class.append(true_positives / denominator)

    return precision_per_class


def calculate_recall_multi_class(confusion_matrix):
    num_classes = confusion_matrix.shape[0]
    recall_per_class = []

    for i in range(num_classes):
        true_positives = confusion_matrix[i, i]
        false_negatives = np.sum(confusion_matrix[i, :]) - true_positives
        denominator = true_positives + false_negatives

        if denominator == 0:
            recall_per_class.append(0.0)
        else:
            recall_per_class.append(true_positives / denominator)

    return recall_per_class


def calculate_f1_score_multi_class(confusion_matrix):
    num_classes = confusion_matrix.shape[0]
    f1_score_per_class = []

    for i in range(num_classes):
        true_positives = confusion_matrix[i, i]
        false_positives = np.sum(confusion_matrix[:, i]) - true_positives
        false_negatives = np.sum(confusion_matrix[i, :]) - true_positives
        precision = true_positives / (true_positives + false_positives)
        recall = true_positives / (true_positives + false_negatives)

        if precision + recall == 0:
            f1_score_per_class.append(0.0)
        else:
            f1_score_per_class.append(
                2 * (precision * recall) / (precision + recall))

    return f1_score_per_class


def plot(data, c1,c2):
    df = pd.read_csv(data)
    df.columns = df.columns.str.strip()
    df[[c1,c2]].plot(kind="line")

def read_annotations_from_file(file_path):
    with open(file_path, 'r') as file:
        annotations = [list(map(float, line.strip().split())) for line in file]
    return annotations



def plot_test(original: list, model):
    class_map = {'0':'tm','1':'tnb','2':'other'}
    fig, axis = plt.subplots(1, 2, figsize=(12, 6))
    img, annot = original[0], original[1]
    annotations = read_annotations_from_file(annot)
    pred = model.predict(cv2.resize(img, (640,640)), conf=0.4)
    axis[0].imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    # Draw bounding boxes on the image
    for annotation in annotations:
        class_id, center_x, center_y, width, height = annotation
        x, y, w, h = int((center_x - width / 2) * img.shape[1]), int(
            (center_y - height / 2) * img.shape[0]), int(width * img.shape[1]), int(height * img.shape[0])
        class_name = class_map[str(int(class_id))]
        #label = f'{class_name} ({class_id})'
        rect = plt.Rectangle((x, y), w, h, linewidth=1, edgecolor='r', facecolor='none')
        axis[0].add_patch(rect)
        axis[0].text(x, y+30, class_name, color='r', verticalalignment='bottom', bbox={'facecolor': 'white', 'alpha': 0.7, 'pad': 2})
    
    res_plot = pred[0].plot()
    axis[0].set_title('Original with YOLO Annotation')
    axis[1].imshow(cv2.cvtColor(res_plot, cv2.COLOR_BGR2RGB))
    axis[1].set_title('YOLO Prediction')
    plt.show()

In [None]:
model = YOLO()

In [None]:
plot_confusion_matrix()

In [None]:
con_met, cls = get_confusion_matrix()
precision_per_class = calculate_precision_multi_class(con_met)
recall_per_class = calculate_recall_multi_class(con_met)
f1_score_per_class = calculate_f1_score_multi_class(con_met)

print(f'Precision per class: {precision_per_class}')
print(f'Recall per class: {recall_per_class}')
print(f'F1 score per class: {f1_score_per_class}')

In [None]:
# mAP 50 and 50_95
map = calculate_meanAveragePrecision()
print(map)

In [None]:
# plot predicted image
classes = {
    0: "tm",
    1: "tnb",
    2: "light pole"
}

colors = {
    "tm": (0,255,128),
    "tnb": (0, 255, 255),
    "light pole": (204,255,153)
}
for i in range(len(test_images)):
    name = re.findall(re.compile(r"[a-zA-Z-0-9\_]+"), test_images[i])[-2]
    img = cv2.imread(test_images[i])
    img = cv2.resize(img, (416, 640))
    pred = model.predict(img, conf=0.8)
    for result in pred:
        xywh = result.boxes.xywh
        cls = result.boxes.cls
        conf_ = result.boxes.conf
        for j in range(len(xywh)):
            x, y, w, h = xywh[j].cpu().numpy()
            c_name = classes[int(cls[j])]
            single_conf = round(float(conf_.cpu().numpy()[j]),2)
            # Convert from xywh to x1, y1, x2, y2
            x1 = int(x - w / 2)
            y1 = int(y - h / 2)
            x2 = int(x + w / 2)
            y2 = int(y + h / 2)

            # Draw the bounding box
            cv2.rectangle(img, (x1, y1), (x2, y2), colors[c_name], 1)

            (text_width, text_height), baseline = cv2.getTextSize(
                f"{c_name} {single_conf}", cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)

            # Draw a filled rectangle for the text background
            cv2.rectangle(img, (x1, y1 - text_height - baseline - 5),
                          (x1 + text_width, y1), colors[c_name], thickness=cv2.FILLED)

            cv2.putText(img, f"{c_name} {single_conf}", (x1, y1 - baseline - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
