In [None]:
!pip install ultralytics -q
!pip install opencv-python -q
!pip install matplotlib -q
!pip install kagglehub -q

In [2]:
import logging
import os
import yaml
import kagglehub
from ultralytics import YOLO
import cv2  # For inference

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [3]:
def download_dataset(dataset_handle):
    """Download the Kaggle dataset."""
    try:
        path = kagglehub.dataset_download(dataset_handle)
        logging.info(f"Downloaded dataset to: {path}")
        return path
    except Exception as e:
        logging.error(f"Failed to download dataset: {e}")
        raise

def detect_dataset_structure(dataset_path):
    """Detect train/val/test images and labels paths in the dataset."""
    paths = {}
    subdirs = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]
    logging.info(f"Detected subdirs in dataset: {subdirs}")
    splits = ['train', 'valid', 'test']
    for split in splits:
        key_split = 'val' if split == 'valid' else split
        if split in subdirs:
            split_dir = os.path.join(dataset_path, split)
            images_dir = os.path.join(split_dir, 'images')
            labels_dir = os.path.join(split_dir, 'labels')
            if os.path.exists(images_dir):
                paths[f'{key_split}_images'] = images_dir
            if os.path.exists(labels_dir):
                paths[f'{key_split}_labels'] = labels_dir
    if not paths:
        # Check one level deeper
        for subdir in subdirs:
            sub_path = os.path.join(dataset_path, subdir)
            if os.path.isdir(sub_path):
                inner_subdirs = [d for d in os.listdir(sub_path) if os.path.isdir(os.path.join(sub_path, d))]
                logging.info(f"Checking inner subdirs in {subdir}: {inner_subdirs}")
                for split in splits:
                    key_split = 'val' if split == 'valid' else split
                    if split in inner_subdirs:
                        split_dir = os.path.join(sub_path, split)
                        images_dir = os.path.join(split_dir, 'images')
                        labels_dir = os.path.join(split_dir, 'labels')
                        if os.path.exists(images_dir):
                            paths[f'{key_split}_images'] = images_dir
                        if os.path.exists(labels_dir):
                            paths[f'{key_split}_labels'] = labels_dir
                if paths:
                    dataset_path = sub_path
                    break
        if not paths:
            # Check two levels deeper
            for subdir in subdirs:
                sub_path = os.path.join(dataset_path, subdir)
                if os.path.isdir(sub_path):
                    inner_subdirs = [d for d in os.listdir(sub_path) if os.path.isdir(os.path.join(sub_path, d))]
                    for inner_subdir in inner_subdirs:
                        inner_path = os.path.join(sub_path, inner_subdir)
                        if os.path.isdir(inner_path):
                            deepest_subdirs = [d for d in os.listdir(inner_path) if os.path.isdir(os.path.join(inner_path, d))]
                            logging.info(f"Checking deepest subdirs in {inner_subdir}: {deepest_subdirs}")
                            for split in splits:
                                key_split = 'val' if split == 'valid' else split
                                if split in deepest_subdirs:
                                    split_dir = os.path.join(inner_path, split)
                                    images_dir = os.path.join(split_dir, 'images')
                                    labels_dir = os.path.join(split_dir, 'labels')
                                    if os.path.exists(images_dir):
                                        paths[f'{key_split}_images'] = images_dir
                                    if os.path.exists(labels_dir):
                                        paths[f'{key_split}_labels'] = labels_dir
                            if paths:
                                dataset_path = inner_path
                                break
                    if paths:
                        break
    logging.info(f"Detected paths: {paths}")
    return paths, dataset_path

def create_yaml(dataset_path, paths, nc, names):
    """Create the data.yaml file for YOLO training."""
    data_yaml = {
        "path": dataset_path,
        "train": os.path.relpath(paths.get('train_images', ''), dataset_path) if 'train_images' in paths else '',
        "val": os.path.relpath(paths.get('val_images', ''), dataset_path) if 'val_images' in paths else '',
        "test": os.path.relpath(paths.get('test_images', ''), dataset_path) if 'test_images' in paths else '',
        "nc": nc,
        "names": names,
    }
    yaml_path = f"{os.path.basename(dataset_path)}.yaml"
    with open(yaml_path, "w") as f:
        yaml.dump(data_yaml, f)
    logging.info(f"Created YAML config at: {yaml_path}")
    return yaml_path

def train_model(yaml_path, epochs, imgsz, batch, device, project, name):
    """Train the YOLO model."""
    model = YOLO("yolov8m.pt")
    results = model.train(
        data=yaml_path,
        epochs=epochs,
        imgsz=imgsz,
        batch=batch,
        device=device,
        project=project,
        name=name,
        exist_ok=True,
    )
    return results


In [4]:
def load_model(model_path):
    """Load the YOLO model from the specified path."""
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model not found: {model_path}")
    model = YOLO(model_path)
    logging.info(f"Model loaded from {model_path}")
    return model

def infer_image(model, image_path, conf_thresh=0.5, save_path=None):
    """Perform inference on a single image."""
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image not found: {image_path}")
    img = cv2.imread(image_path)
    results = model(img, conf=conf_thresh)
    annotated = results[0].plot()
    if save_path:
        cv2.imwrite(save_path, annotated)
        logging.info(f"Annotated image saved to {save_path}")
    else:
        cv2.imshow("Inference", annotated)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

def infer_video(model, video_path, conf_thresh=0.5, save_path=None):
    """Perform inference on a video file."""
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video not found: {video_path}")
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {video_path}")
    if save_path:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        fps = cap.get(cv2.CAP_PROP_FPS) or 30
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        out = cv2.VideoWriter(save_path, fourcc, fps, (width, height))
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        results = model(frame, conf=conf_thresh)
        annotated = results[0].plot()
        if save_path:
            out.write(annotated)
        else:
            cv2.imshow("Inference", annotated)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
        frame_count += 1
        if frame_count % 100 == 0:
            logging.info(f"Processed {frame_count} frames")
    cap.release()
    if save_path:
        out.release()
        logging.info(f"Annotated video saved to {save_path}")
    cv2.destroyAllWindows()

def infer_webcam(model, conf_thresh=0.5):
    """Perform real-time inference on webcam feed."""
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        raise ValueError("Cannot access webcam")
    logging.info("Starting webcam inference. Press 'q' to quit.")
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        results = model(frame, conf=conf_thresh)
        annotated = results[0].plot()
        cv2.imshow("Webcam Inference", annotated)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    cap.release()
    cv2.destroyAllWindows()



In [5]:
# Example regarding potholes

dataset_handle = 'jocelyndumlao/multi-weather-pothole-detection-mwpd'  # Kaggle dataset handle
nc = 1  # Number of classes
names = ['Potholes']  # Class names
epochs = 1  # Training epochs
imgsz = 512  # Image size
batch = 32  # Batch size
device = '0'  # Device: '0' for GPU, 'cpu' for CPU
project = 'runs/train'  # Project dir
name = 'yolo_train'  # Experiment name



In [None]:
dataset_path = download_dataset(dataset_handle)
logging.info(f"Dataset downloaded to: {dataset_path}")
paths, dataset_path = detect_dataset_structure(dataset_path)
if not paths:
    raise ValueError("No standard train/val/test structure found in dataset")
yaml_path = create_yaml(dataset_path, paths, nc, names)
results = train_model(yaml_path, epochs, imgsz, batch, device, project, name)
logging.info("Training completed successfully")



In [9]:
model_path = 'runs/train/yolo_train/weights/best.pt'  # Path to trained model
input_source = ''  # 'path/to/image.jpg', 'path/to/video.mp4', or 'webcam'
conf_thresh = 0.5  # Confidence threshold
output_path = ''  # Optional: 'annotated.jpg' or 'annotated.mp4'

In [None]:
model = load_model(model_path)
input_lower = input_source.lower()
if input_lower == 'webcam':
    infer_webcam(model, conf_thresh)
elif input_lower.endswith(('.jpg', '.png', '.jpeg', '.bmp', '.tiff')):
    infer_image(model, input_source, conf_thresh, output_path)
elif input_lower.endswith(('.mp4', '.avi', '.mov', '.mkv', '.flv')):
    infer_video(model, input_source, conf_thresh, output_path)
else:
    raise ValueError("Unsupported input type. Use image/video path or 'webcam'")
logging.info("Inference completed successfully")