In [None]:
import os
import cv2
import numpy as np
from pathlib import Path
import json

with open('config.json', 'r') as f:
    config = json.load(f)
frames_dir = Path("frames")
labels_dir = Path("labels")
output_dir = Path("yolo_labels")
output_dir.mkdir(exist_ok=True)

detection_classes = [1, 2, 5, 6, 11, 12]  # vehicle, person, bicycle, motorcycle, traffic light, traffic sign
class_colors = {}
for label_info in config['labels']:
    class_colors[label_info['id']] = tuple(label_info['color'])

# map
yolo_mapping = {}
yolo_names = []
for i, class_id in enumerate(detection_classes):
    yolo_mapping[class_id] = i
    for label_info in config['labels']:
        if label_info['id'] == class_id:
            yolo_names.append(label_info['label'])
            break

with open('classes.txt', 'w') as f:
    for name in yolo_names:
        f.write(f"{name}\n")

label_files = list(sorted(labels_dir.glob("*.png")))
total_files = len(label_files)
print(f"Found {total_files} label files")

processed_count = 0
error_count = 0

# Handle the label files
for i, label_file in enumerate(label_files):
    base_name = label_file.stem
    if "_gt_id" in base_name:
        base_name = base_name.replace("_gt_id", "")
    img_file = frames_dir / f"{base_name}_img.jpg"
    
    if not img_file.exists():
        error_count += 1
        if error_count <= 5 or error_count % 100 == 0:
            print(f"Jump ({i+1}/{total_files}): Not found: {img_file}")
        continue
    
    # read the mask
    mask = cv2.imread(str(label_file))
    if mask is None:
        print(f"can not read mask: {label_file}")
        continue
    
    # read the image
    img = cv2.imread(str(img_file))
    if img is None:
        print(f"can not read image: {img_file}")
        continue
    
    img_height, img_width = img.shape[:2]
    
    yolo_file = output_dir / f"{base_name}.txt"
    
    with open(yolo_file, 'w') as f:
        #read the mask
        for class_id in detection_classes:
            # Create binary mask for this class
            binary_mask = np.zeros((img_height, img_width), dtype=np.uint8)
            
            # The mask pixels have RGB values equal to [class_id, class_id, class_id]
            class_value = np.array([class_id, class_id, class_id])
            mask_match = np.all(mask == class_value, axis=2)
            binary_mask[mask_match] = 255
            
            if np.sum(binary_mask) == 0:  # Skip if no pixels for this class
                continue
                
            # Continue with contour detection as before
            contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            # create a mask for the class
            for contour in contours:
                area = cv2.contourArea(contour)
                if area < 100:  
                    continue
                    
               
                x_min, y_min, w, h = cv2.boundingRect(contour)
                x_max = x_min + w
                y_max = y_min + h
                
                
                yolo_class = yolo_mapping[class_id]
                x_center = (x_min + x_max) / 2 / img_width
                y_center = (y_min + y_max) / 2 / img_height
                width = w / img_width
                height = h / img_height
                
                f.write(f"{yolo_class} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n")
    
    processed_count += 1
    if processed_count % 100 == 0:
        print(f"Handled {processed_count}/{total_files} files")

print(f"Convert Successfully! Handle {processed_count} files, jump {error_count} files")

Found 5000 label files
Handled 100/5000 files
Handled 200/5000 files
Handled 300/5000 files
Handled 400/5000 files
Handled 500/5000 files
Handled 600/5000 files
Handled 700/5000 files
Handled 800/5000 files
Handled 900/5000 files
Handled 1000/5000 files
Handled 1100/5000 files
Handled 1200/5000 files
Handled 1300/5000 files
Handled 1400/5000 files
Handled 1500/5000 files
Handled 1600/5000 files
Handled 1700/5000 files
Handled 1800/5000 files
Handled 1900/5000 files
Handled 2000/5000 files
Handled 2100/5000 files
Handled 2200/5000 files
Handled 2300/5000 files
Handled 2400/5000 files
Handled 2500/5000 files
Handled 2600/5000 files
Handled 2700/5000 files
Handled 2800/5000 files
Handled 2900/5000 files
Handled 3000/5000 files
Handled 3100/5000 files
Handled 3200/5000 files
Handled 3300/5000 files
Handled 3400/5000 files
Handled 3500/5000 files
Handled 3600/5000 files
Handled 3700/5000 files
Handled 3800/5000 files
Handled 3900/5000 files
Handled 4000/5000 files
Handled 4100/5000 files
Ha

In [10]:
import os
import shutil
import random
from pathlib import Path
import numpy as np

random.seed(42)
frames_dir = Path("frames")
yolo_labels_dir = Path("yolo_labels")
dataset_dir = Path("dataset")

# train & test
for split in ['train', 'test']:
    (dataset_dir / split / 'images').mkdir(parents=True, exist_ok=True)
    (dataset_dir / split / 'labels').mkdir(parents=True, exist_ok=True)

all_labels = list(yolo_labels_dir.glob('*.txt'))
base_names = [label.stem for label in all_labels]
valid_names = []
for name in base_names:
    img_file = frames_dir / f"{name}_img.jpg"
    if img_file.exists():
        valid_names.append(name)

# 80% train, 20% test
random.shuffle(valid_names)
split_idx = int(len(valid_names) * 0.8)
train_names = valid_names[:split_idx]
test_names = valid_names[split_idx:]

print(f"Total data: {len(valid_names)}")
print(f"Train set: {len(train_names)}")
print(f"Test set: {len(test_names)}")

# Copy files to train/test directories
def copy_files(names, split):
    for name in names:
        src_img = frames_dir / f"{name}_img.jpg"
        dst_img = dataset_dir / split / 'images' / f"{name}.jpg"
        shutil.copy(src_img, dst_img)
        src_label = yolo_labels_dir / f"{name}.txt"
        dst_label = dataset_dir / split / 'labels' / f"{name}.txt"
        shutil.copy(src_label, dst_label)

copy_files(train_names, 'train')
copy_files(test_names, 'test')

print("Data set divided successfully!")

Total data: 5000
Train set: 4000
Test set: 1000
Data set divided successfully!


3. 创建YOLO配置文件

In [9]:
import yaml
from pathlib import Path

classes = []
with open('classes.txt', 'r') as f:
    classes = [line.strip() for line in f.readlines()]

dataset_config = {
    'path': str(Path.cwd() / 'dataset'),  
    'train': 'train/images',  
    'val': 'test/images', 
    'nc': len(classes),       
    'names': classes          
}

with open('driveseg.yaml', 'w') as f:
    yaml.dump(dataset_config, f, sort_keys=False)

print("YOLO yaml file completed: driveseg.yaml")

YOLO yaml file completed: driveseg.yaml


In [None]:
from ultralytics import YOLO

# YOLOv8 training parameters
img_size = 640 
batch_size = 16  
epochs = 100    

# model size
model_size = 'n'  # n(nano), s(small),m(medium), l(large), x(xlarge)

print(f"Start train YOLOv8{model_size}model...")
print(f"Image size: {img_size}px")
print(f"Batching Size: {batch_size}")
print(f"Epochs: {epochs}")

# Create the runs directory
runs_dir = Path("runs")
runs_dir.mkdir(exist_ok=True)

# Load the YOLOv8 model
model = YOLO(f'yolov8{model_size}.pt')

# Define the autosave callback
def on_train_epoch_end(trainer):
    epoch = trainer.epoch
    print(f"\nSave the {epoch}th model weight...")
    return trainer

model.add_callback("on_train_epoch_end", on_train_epoch_end)


results = model.train(
    data='driveseg.yaml',
    epochs=epochs,
    imgsz=img_size,
    batch=batch_size,
    name='driveseg_yolo_model',  
    patience=20,                  
    save=True,
    save_period=1,                   
    #device=0,                    
    val=True,
    project="runs",
    exist_ok=True,
    pretrained=True                   
)

print("\nFinish!")
print(f"Result in: {model.trainer.save_dir}")
print("Best Model: best.pt")
print("Last Model: last.pt")

# Evaluate the model
model.val(data='driveseg.yaml')  