# Preparing the LISA Traffic Light Dataset

This notebook will guide you through the following steps:
1.  **Imports and Setup**
2.  **Consolidating and Converting Annotations** from the original LISA format (``frameAnnotationsBOX.csv`` files in a specific folder structure) to a single, standardized CSV file (``all_annotations.csv``).
    * Copying images from the raw location to the ``PROCESSED_IMAGES_DIR_FRCNN_SSD`` folder while preserving the structure.
    * CSV Format: ``filename,xmin,ymin,xmax,ymax,label,period``.
    * ``filename`` will be the relative path to the image from the main image directory (e.g., ``daySequence1/frames/frame_0000.jpg``).
3.  **Splitting the Consolidated CSV Annotations** into ``train``, ``val``, and ``test`` sets.
4.  **Converting Annotations to YOLO Format** (.txt for each frame) and organizing the folder structure for YOLO.

## 0. Imports and settings

In [None]:
import os
import cv2
import pandas as pd
import glob 
from sklearn.model_selection import train_test_split
from PIL import Image
import shutil
import numpy as np

# --- CONFIGURATION PATHS ---
LISA_RAW_DATA_ROOT = "../dataset/lisa_traffic_light_dataset_raw/" 
LISA_ANNOTATIONS_BASE_DIR = os.path.join(LISA_RAW_DATA_ROOT, "Annotations", "Annotations")

PROCESSED_DATA_ROOT_FRCNN_SSD = "../dataset/lisa_traffic_light_dataset/"
PROCESSED_IMAGES_DIR_FRCNN_SSD = os.path.join(PROCESSED_DATA_ROOT_FRCNN_SSD, "images") 
PROCESSED_ANNOTATIONS_DIR_FRCNN_SSD = os.path.join(PROCESSED_DATA_ROOT_FRCNN_SSD, "annotations")

YOLO_DATA_ROOT = "../dataset/lisa_yolo_formatted/"

os.makedirs(PROCESSED_IMAGES_DIR_FRCNN_SSD, exist_ok=True)
os.makedirs(PROCESSED_ANNOTATIONS_DIR_FRCNN_SSD, exist_ok=True)
os.makedirs(YOLO_DATA_ROOT, exist_ok=True)

# --- MAPPING LABELS ---
LISA_TO_COMMON_LABEL_MAP = {
    "stop": "stop", "stopLeft": "stop", "stopRight": "stop", "stopAhead": "stop",
    "go": "go", "goLeft": "go", "goRight": "go", "goAhead": "go",
    "warning": "warning", "warningLeft": "warning", "warningRight": "warning", "warningAhead": "warning",
    "off": "off" 
}

COMMON_TO_YOLO_CLASS_ID_MAP = {'go': 0, 'stop': 1, 'warning': 2, 'off': 3}

TRAIN_RATIO = 0.7
VAL_RATIO = 0.15
TEST_RATIO = 0.15 
RANDOM_STATE = 42

## 1. Consolidation and Converting Annotations to CSV

In [None]:
def find_annotation_files(root_dir, filename_to_find="frameAnnotationsBOX.csv"):
    """Recursively finds all files with the given name in the given directory."""
    found_files = []
    for dirpath, _, filenames in os.walk(root_dir):
        for f_name in filenames:
            if f_name == filename_to_find:
                found_files.append(os.path.join(dirpath, f_name))
    return found_files

def consolidate_lisa_annotations_and_copy_images(annotations_search_root,
                                                 raw_data_images_root,
                                                 processed_images_output_root,
                                                 output_csv_path):
    all_annotations_list = []
    copied_image_count = 0
    processed_annotation_files_count = 0
    total_objects_annotated = 0

    print(f"Searching {annotations_search_root} for frameAnnotationsBOX.csv files...")

    if not os.path.exists(annotations_search_root):
        print(f"ERROR: Base path for annotations {annotations_search_root} does not exist!")
        return

    annotation_file_paths = find_annotation_files(annotations_search_root, "frameAnnotationsBOX.csv")

    if not annotation_file_paths:
        print(f"No frameAnnotationsBOX.csv files found in {annotations_search_root} and its subfolders.")
        return

    print(f"Found {len(annotation_file_paths)} annotation files to process.")

    for annotation_file_path_raw in annotation_file_paths:
        annotation_file_path = os.path.normpath(annotation_file_path_raw)
        print(f"  Processing annotation file: {annotation_file_path}")
        processed_annotation_files_count += 1
        try:
            df_seq = pd.read_csv(annotation_file_path, sep=';')

            required_cols_lisa = ['Filename', 'Annotation tag', 'Upper left corner X', 'Upper left corner Y', 'Lower right corner X', 'Lower right corner Y']
            if not all(col in df_seq.columns for col in required_cols_lisa):
                print(f"    WARNING: Missing standard LISA columns in {annotation_file_path}. Skipping file. Columns: {df_seq.columns.tolist()}")
                continue

            annotation_file_dir = os.path.dirname(annotation_file_path)
            relative_path_of_annotations_dir = os.path.relpath(annotation_file_dir, annotations_search_root)
            relative_path_of_annotations_dir = os.path.normpath(relative_path_of_annotations_dir).replace('\\', '/')

            period_parts = relative_path_of_annotations_dir.lower().split('/')
            period = "unknown"
            if any("day" in part for part in period_parts): period = "day"
            elif any("night" in part for part in period_parts): period = "night"
            if period == "unknown": print(f"    WARNING: Could not determine time of day for {relative_path_of_annotations_dir}.")

            path_components_from_annotations_dir = [comp for comp in relative_path_of_annotations_dir.split('/') if comp]
            
            candidate_image_folders = []
            
            # Change specific structures based on the annotations directory
            if len(path_components_from_annotations_dir) > 0:
                cand1_path = os.path.join(raw_data_images_root,
                                            path_components_from_annotations_dir[0],
                                            relative_path_of_annotations_dir,
                                            "frames")
                candidate_image_folders.append(os.path.normpath(cand1_path))

            innermost_folder_name_from_annotations = path_components_from_annotations_dir[-1]
            cand2_path = os.path.join(raw_data_images_root, 
                                        relative_path_of_annotations_dir, 
                                        innermost_folder_name_from_annotations, 
                                        "frames")
            candidate_image_folders.append(os.path.normpath(cand2_path))

            cand3_path = os.path.join(raw_data_images_root, 
                                        relative_path_of_annotations_dir, 
                                        "frames")
            candidate_image_folders.append(os.path.normpath(cand3_path))
            
            cand4_path = os.path.join(raw_data_images_root, 
                                        relative_path_of_annotations_dir)
            candidate_image_folders.append(os.path.normpath(cand4_path))


            images_source_folder_for_sequence = None
            for cand_path in candidate_image_folders:
                if os.path.isdir(cand_path):
                    if any(f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')) for f in os.listdir(cand_path)):
                        images_source_folder_for_sequence = cand_path
                        break
            
            if not images_source_folder_for_sequence:
                print(f"    WARNING: Could not find image folder for annotation sequence {relative_path_of_annotations_dir}.")
                print(f"       Checked candidate paths (normalized):")
                for cp_idx, cp in enumerate(candidate_image_folders):
                     print(f"         {cp_idx+1}. {cp}")
                print(f"    Skipping annotations from file {annotation_file_path}.")
                continue
            
            for _, row in df_seq.iterrows():
                original_frame_name_from_csv = str(row['Filename']).split('/')[-1]
                raw_label = row['Annotation tag']
                common_label = LISA_TO_COMMON_LABEL_MAP.get(raw_label)

                if common_label is None: continue

                source_image_path = os.path.join(images_source_folder_for_sequence, original_frame_name_from_csv)
                source_image_path = os.path.normpath(source_image_path)

                if not os.path.exists(source_image_path):
                    continue
                
                path_to_frames_folder_relative_to_raw_root = os.path.relpath(images_source_folder_for_sequence, raw_data_images_root)
                path_to_frames_folder_relative_to_raw_root = os.path.normpath(path_to_frames_folder_relative_to_raw_root).replace('\\', '/')
                
                dest_relative_image_path = os.path.join(path_to_frames_folder_relative_to_raw_root, original_frame_name_from_csv)
                dest_relative_image_path = os.path.normpath(dest_relative_image_path).replace('\\', '/')
                
                dest_image_full_path = os.path.join(processed_images_output_root, dest_relative_image_path)
                dest_image_full_path = os.path.normpath(dest_image_full_path)

                os.makedirs(os.path.dirname(dest_image_full_path), exist_ok=True)

                if not os.path.exists(dest_image_full_path):
                    shutil.copy2(source_image_path, dest_image_full_path)
                    copied_image_count +=1
                
                all_annotations_list.append({
                    'filename': dest_relative_image_path,
                    'xmin': row['Upper left corner X'],
                    'ymin': row['Upper left corner Y'],
                    'xmax': row['Lower right corner X'],
                    'ymax': row['Lower right corner Y'],
                    'label': common_label,
                    'period': period
                })
                total_objects_annotated +=1

        except pd.errors.EmptyDataError:
            print(f"    INFO: File {annotation_file_path} is empty. Skipping.")
        except Exception as e:
            print(f"    ERROR during processing of file {annotation_file_path}: {e}")
            import traceback
            traceback.print_exc()
    
    if not all_annotations_list:
        print("No valid annotations found to process.")
        return

    final_df = pd.DataFrame(all_annotations_list)
    
    for col in ['xmin', 'ymin', 'xmax', 'ymax']:
        final_df[col] = pd.to_numeric(final_df[col], errors='coerce')
    final_df.dropna(subset=['xmin', 'ymin', 'xmax', 'ymax', 'label', 'filename'], inplace=True)
    final_df = final_df[(final_df['xmax'] > final_df['xmin']) & (final_df['ymax'] > final_df['ymin'])]

    final_df.to_csv(output_csv_path, index=False)
    print(f"Saved consolidated annotations to: {output_csv_path} ({len(final_df['filename'].unique())} unique images, {total_objects_annotated} objects).")
    print(f"Processed {processed_annotation_files_count} annotation files.")
    print(f"Copied {copied_image_count} new images to {processed_images_output_root}.")

consolidated_csv_file = os.path.join(PROCESSED_ANNOTATIONS_DIR_FRCNN_SSD, "all_annotations.csv")

consolidate_lisa_annotations_and_copy_images(
    LISA_ANNOTATIONS_BASE_DIR, 
    LISA_RAW_DATA_ROOT,      
    PROCESSED_IMAGES_DIR_FRCNN_SSD, 
    consolidated_csv_file
)
print("Consolidation of annotations and copying of images completed.")

## 2. Splitting Consolidated Annotations into Train, Val, Test Sets

In [None]:
def perform_train_val_test_split(consolidated_csv, output_dir, 
                                     train_r=0.7, val_r=0.15, random_s=42):
    if not os.path.exists(consolidated_csv):
        print(f"File {consolidated_csv} does not exist. Cannot perform split.")
        return
    df = pd.read_csv(consolidated_csv)
    if df.empty: print(f"File {consolidated_csv} is empty. Cannot perform split."); return
    unique_files = df['filename'].unique()
    if len(unique_files) < 3:
        print(f"Too few unique files ({len(unique_files)}) to split. All will go to 'train'.")
        df.to_csv(os.path.join(output_dir, "train_annotations.csv"), index=False)
        pd.DataFrame(columns=df.columns).to_csv(os.path.join(output_dir, "val_annotations.csv"), index=False)
        pd.DataFrame(columns=df.columns).to_csv(os.path.join(output_dir, "test_annotations.csv"), index=False)
        return
    current_test_r = 1.0 - train_r - val_r
    if current_test_r < -1e-5: 
        print(f"Sum of train_r ({train_r}) and val_r ({val_r}) exceeds 1.0. Correcting proportions.")
        if train_r < 1.0: val_r = 1.0 - train_r; current_test_r = 0.0
        else: val_r = 0.0; current_test_r = 0.0
        print(f"New proportions: train={train_r}, val={val_r}, test={current_test_r}")
    elif abs(current_test_r) < 1e-5 : current_test_r = 0.0
    train_filenames, remaining_filenames = train_test_split(unique_files, test_size=(val_r + current_test_r), random_state=random_s, shuffle=True)
    df_train = df[df['filename'].isin(train_filenames)]
    df_train.to_csv(os.path.join(output_dir, "train_annotations.csv"), index=False)
    print(f"Training set: {len(df_train['filename'].unique())} images, {len(df_train)} annotations.")
    if remaining_filenames.size > 0 and (val_r > 1e-5 or current_test_r > 1e-5):
        if current_test_r < 1e-5 or val_r < 1e-5 or len(remaining_filenames) < 2:
            if val_r > 1e-5: val_filenames = remaining_filenames; test_filenames = np.array([])
            else: test_filenames = remaining_filenames; val_filenames = np.array([])
        else:
            relative_test_ratio_for_split = current_test_r / (val_r + current_test_r)
            val_filenames, test_filenames = train_test_split(remaining_filenames, test_size=relative_test_ratio_for_split, random_state=random_s, shuffle=True)
        df_val = df[df['filename'].isin(val_filenames)]
        df_test = df[df['filename'].isin(test_filenames)]
        df_val.to_csv(os.path.join(output_dir, "val_annotations.csv"), index=False)
        df_test.to_csv(os.path.join(output_dir, "test_annotations.csv"), index=False)
        print(f"Validation set: {len(df_val['filename'].unique())} images, {len(df_val)} annotations.")
        print(f"Test set: {len(df_test['filename'].unique())} images, {len(df_test)} annotations.")
    else:
        print("No data for validation and/or test set after split.")
        pd.DataFrame(columns=df.columns).to_csv(os.path.join(output_dir, "val_annotations.csv"), index=False)
        pd.DataFrame(columns=df.columns).to_csv(os.path.join(output_dir, "test_annotations.csv"), index=False)

perform_train_val_test_split(consolidated_csv_file, PROCESSED_ANNOTATIONS_DIR_FRCNN_SSD, 
                                 train_r=TRAIN_RATIO, val_r=VAL_RATIO, random_s=RANDOM_STATE)
print("Splitting into train/val/test sets (CSV) completed.")

## 3. Converting Annotations to YOLO Format and Organizing Folders

In [None]:
def convert_csv_to_yolo_and_copy_images(source_csv_path, 
                                            processed_images_root,
                                            yolo_target_root,
                                            split_name):
    if not os.path.exists(source_csv_path): 
        print(f"CSV file {source_csv_path} does not exist. Skipping YOLO conversion for {split_name}.")
        return
    
    df = pd.read_csv(source_csv_path)
    if df.empty: 
        print(f"CSV file {source_csv_path} is empty. Skipping YOLO conversion for {split_name}.")
        return

    yolo_img_dir = os.path.join(yolo_target_root, "images", split_name)
    yolo_lbl_dir = os.path.join(yolo_target_root, "labels", split_name)
    os.makedirs(yolo_img_dir, exist_ok=True)
    os.makedirs(yolo_lbl_dir, exist_ok=True)

    print(f"YOLO conversion for {split_name}...")
    
    for img_relative_path_from_csv, group in df.groupby('filename'):
        full_source_img_path = os.path.join(processed_images_root, img_relative_path_from_csv)
        full_source_img_path = os.path.normpath(full_source_img_path)

        if not os.path.exists(full_source_img_path):
            continue
        
        try:
            with Image.open(full_source_img_path) as img:
                img_width, img_height = img.size
        except Exception as e:
            print(f"  WARNING: Could not open image {full_source_img_path}: {e}. Skipping for YOLO.")
            continue

        flat_img_name_base = img_relative_path_from_csv.replace('/', '_').replace('\\', '_')
        
        yolo_img_dest_path = os.path.join(yolo_img_dir, flat_img_name_base)
        yolo_img_dest_path = os.path.normpath(yolo_img_dest_path)

        if not os.path.exists(yolo_img_dest_path):
            try:
                shutil.copy2(full_source_img_path, yolo_img_dest_path)
            except FileNotFoundError:
                print(f"  Copy ERROR: Could not copy {full_source_img_path} to {yolo_img_dest_path}")
                print(f"    img_relative_path_from_csv: {img_relative_path_from_csv}")
                print(f"    flat_img_name_base: {flat_img_name_base}")
                continue
        
        yolo_label_filename = os.path.splitext(flat_img_name_base)[0] + ".txt"
        yolo_label_file_path = os.path.join(yolo_lbl_dir, yolo_label_filename)
        yolo_label_file_path = os.path.normpath(yolo_label_file_path)
        
        with open(yolo_label_file_path, 'w') as f_yolo:
            for _, row in group.iterrows():
                class_id = COMMON_TO_YOLO_CLASS_ID_MAP.get(row['label'])
                if class_id is None:
                    continue
                
                xmin, ymin, xmax, ymax = row['xmin'], row['ymin'], row['xmax'], row['ymax']
                
                if not (xmax > xmin and ymax > ymin and xmin >= 0 and ymin >= 0 and xmax <= img_width and ymax <= img_height):
                    continue

                x_center = (xmin + xmax) / 2.0
                y_center = (ymin + ymax) / 2.0
                box_width = xmax - xmin
                box_height = ymax - ymin

                x_center_norm = x_center / img_width
                y_center_norm = y_center / img_height
                box_width_norm = box_width / img_width
                box_height_norm = box_height / img_height
                
                if not (0 <= x_center_norm <= 1 and 0 <= y_center_norm <= 1 and 0 <= box_width_norm <= 1 and 0 <= box_height_norm <= 1):
                    continue

                f_yolo.write(f"{class_id} {x_center_norm:.6f} {y_center_norm:.6f} {box_width_norm:.6f} {box_height_norm:.6f}\n")
    
    print(f"Finished YOLO conversion for {split_name}.")

# --- Run YOLO format conversion ---
for split in ['train', 'val', 'test']:
    csv_file_for_split = os.path.join(PROCESSED_ANNOTATIONS_DIR_FRCNN_SSD, f"{split}_annotations.csv")
    if os.path.exists(csv_file_for_split):
        convert_csv_to_yolo_and_copy_images(csv_file_for_split,
                                              PROCESSED_IMAGES_DIR_FRCNN_SSD, 
                                              YOLO_DATA_ROOT,
                                              split_name=split)
    else:
        print(f"File {csv_file_for_split} does not exist, skipping YOLO data creation for {split}.")
print("\nYOLO formatted data preparation completed.")