In [None]:
!pip install ultralytics
!pip install albumentations opencv-python

In [None]:
import os
import shutil
from random import shuffle
from shutil import copyfile
import albumentations as A
import cv2

def augment_image(image_path):
    image = cv2.imread(image_path)
    transform = A.Compose([
        A.RandomBrightnessContrast(p=0.2),
        A.GaussNoise(p=0.2),
        A.GaussianBlur(p=0.2),
        A.Sharpen(p=0.2),
        A.RandomGamma(p=0.2),
        A.CLAHE(p=0.2)
    ])
    augmented = transform(image=image)
    return augmented['image']

def save_augmented_image(image, dest_path):
    cv2.imwrite(dest_path, image)

def augment_and_save(image_path, label_path, image_dest, label_dest):
    image = cv2.imread(image_path)
    augmented_image = augment_image(image_path)
    save_augmented_image(augmented_image, image_dest)
    shutil.copyfile(label_path, label_dest)

def split_data(train_ratio, test_ratio, directory, root_directory):
    extensions = ['jpg', 'webp', 'JPG', 'jpeg']

    # Create subdirectories
    os.makedirs(os.path.join(directory, "datasets/images", "train"), exist_ok=True)
    os.makedirs(os.path.join(directory, "datasets/images", "valid"), exist_ok=True)
    os.makedirs(os.path.join(directory, "datasets/images", "test"), exist_ok=True)
    
    os.makedirs(os.path.join(directory, "datasets/labels", "train"), exist_ok=True)
    os.makedirs(os.path.join(directory, "datasets/labels", "valid"), exist_ok=True)
    os.makedirs(os.path.join(directory, "datasets/labels", "test"), exist_ok=True)
    
    image_files = [f for f in os.listdir(root_directory) if any(f.endswith(ext) for ext in extensions)]
    
    shuffle(image_files)
    
    total_samples = len(image_files)
    train_split = int(train_ratio * total_samples)
    valid_split = int((1 - (train_ratio + test_ratio)) * total_samples)
    
    train_files = image_files[:train_split]
    valid_files = image_files[train_split:train_split + valid_split]
    test_files = image_files[train_split + valid_split:]
    
    def copy_files(file_list, split, augment=False):
        for file in file_list:
            for ext in extensions:
                if file.endswith(ext):
                    base_file = file.rsplit('.', 1)[0]
                    image_src = os.path.join(root_directory, file)
                    image_dest = os.path.join(directory, f"datasets/images/{split}", file)
                    label_src = os.path.join(root_directory, base_file + '.txt')
                    label_dest = os.path.join(directory, f"datasets/labels/{split}", base_file + '.txt')
                    
                    copyfile(image_src, image_dest)
                    if os.path.exists(label_src):
                        copyfile(label_src, label_dest)
                    
                    if augment:
                        augmented_image_dest = os.path.join(directory, f"datasets/images/{split}", base_file + '_aug.jpg')
                        augmented_label_dest = os.path.join(directory, f"datasets/labels/{split}", base_file + '_aug.txt')
                        augment_and_save(image_src, label_src, augmented_image_dest, augmented_label_dest)
    
    copy_files(train_files, 'train', augment=True)
    copy_files(valid_files, 'valid', augment=False)
    copy_files(test_files, 'test', augment=False)

In [None]:
split_data(train_ratio=0.8, test_ratio=0.1, directory='/kaggle/working/',root_directory="/kaggle/input/traffic-signs-and-lights-in-yolo-format/Zavrsni")

In [None]:
from ultralytics import YOLO

os.environ['WANDB_DISABLED'] = 'true'
model = YOLO("yolov8x.pt")

In [None]:
config_content = """path: /kaggle/working/datasets
train: images/train
val: images/valid
test: images/test

# Classes
names:
  0: traffic_sign
  1: traffic_light
"""
with open("/kaggle/working/data_config.yaml", "w") as f:
    f.write(config_content)

In [None]:
print(model)

In [None]:
model.train(data="/kaggle/working/data_config.yaml", epochs=200)
metrics = model.val()
path = model.export(format="onnx")

In [None]:
metrics = model.val()
print(metrics.box.map) 
print(metrics.box.map50)

In [None]:
metrics = model.val(split='test')
print(metrics.box.map)
print(metrics.box.map50)

In [None]:
results = model.predict("/kaggle/working/datasets/images/test/", save=True, conf=0.5)


In [None]:
print("folder being saved: ",results[0].save_dir)


In [None]:
detect_folder = results[0].save_dir
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

def show_images_from_folder(folder_path,detect_folder):
    extensions = ['jpg', 'JPG', 'jpeg', 'webp']
    image_files = [os.path.join(detect_folder,f) for f in os.listdir(folder_path) if any(f.endswith(ext) for ext in extensions)]

    selected_images = image_files[:8]

    fig, axes = plt.subplots(2, 4, figsize=(12, 6))

    for i, image_file in enumerate(selected_images):
        row, col = divmod(i, 4)
        img_path = image_file
        img = mpimg.imread(img_path)
        axes[row, col].imshow(img)
        axes[row, col].axis('off')
        axes[row, col].set_title(image_file)

    plt.tight_layout()
    plt.show()

In [None]:
show_images_from_folder("/kaggle/working/datasets/images/test/",detect_folder)


In [None]:
import cv2
import matplotlib.pyplot as plt

image_path = '/kaggle/input/showcase-pics/0007.jpg'

image = cv2.imread(image_path)

results = model.predict(source=image, show=False)

plt.figure(figsize=(10, 10))
plt.imshow(cv2.cvtColor(results[0].plot(), cv2.COLOR_BGR2RGB))
plt.axis('off')
plt.show()


In [None]:
import cv2
from IPython.display import Video


video_path = '/kaggle/input/showcase-pics/0011.webm'
output_path = '/kaggle/working/output_video.mp4'

video = cv2.VideoCapture(video_path)

if not video.isOpened():
    print("Error: Could not open video.")
else:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, 20.0, (int(video.get(3)), int(video.get(4))))

    while video.isOpened():
        ret, frame = video.read()
        if ret:
            results = model.predict(source=frame, show=False)
            
            processed_frame = results[0].plot()
            
            processed_frame = cv2.cvtColor(processed_frame, cv2.COLOR_RGB2BGR)
            
            out.write(processed_frame)
        else:
            break

    video.release()
    out.release()

    print("Video processing complete. Output saved at:", output_path)

In [None]:
pip install -r requirements.txt