In [None]:
import os
import re

import cv2
from tqdm import tqdm

# Frame Extracter

In [None]:
input_dir = r'E:\Dataset\Video'
output_dir = r'E:\Dataset\Frame'

In [None]:
video_files = [f for f in os.listdir(input_dir)]

for video_name in video_files:
    video_path = os.path.join(input_dir, video_name)
    vid = cv2.VideoCapture(video_path)
    
    if not vid.isOpened():
        print(f"failed to open {video_name}")
        continue
    
    current_frame = 0
    sign_name = os.path.splitext(video_name)[0]
    data_folder = os.path.join(output_dir, sign_name)

    if not os.path.exists(data_folder):
        os.makedirs(data_folder)

    while (True):

        success, frame = vid.read()
        if not success:
            break
        
        # rotated_frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
        
        cv2.imshow("output", frame)
        cv2.imwrite(os.path.join(data_folder, f"frame_{current_frame}.png"),
                                frame)
        current_frame += 1

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    vid.release()
    print(f"Completed:  {video_name} - {current_frame}")
    cv2.destroyAllWindows()

# Crop to 1:1

In [None]:
base_dir = r'DatasetTest/Original'

y1 = 420
y2 = 1500

In [None]:
for folder in os.listdir((base_dir)):
        input_dir = os.path.join(base_dir, folder)
        if not os.path.isdir(input_dir):
                continue
        
        output_dir = os.path.join(base_dir, folder + "_cropped")
        
        if not os.path.exists(output_dir):
                os.makedirs(output_dir)
        

        img_files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        for filename in tqdm(img_files, desc=f"Processing {folder}", unit="file"):
                
                img_path = os.path.join(input_dir, filename)
                img = cv2.imread(img_path)
                
                if img is None:
                    print(f"Error reading image: {img_path}")
                    continue

                cropped_img = img[y1:y2, :]

                output_path = os.path.join(output_dir, filename)
                cv2.imwrite(output_path, cropped_img)

print("Done!")

### Plan F

In [None]:
crop_sizes_raw = [
    288,      #barred_area
    320,      #cross_walk
    384,      #go_straight
    384,      #no_passing_zone_beginning
    480,      #parking_zone
    384,      #priority_over
    512,      #steep_hill_downhill
    544,      #steep_hill_uphill
    512,      #stop
    448,      #tunnel_beginning
    416,      #turn_left
    416,      #turn_right
] 

crop_sizes_field = [
    128,      #barred_area
    128,      #cross_walk
    128,      #go_straight
    128,      #no_passing_zone_beginning
    128,      #parking_zone
    128,      #priority_over
    128,      #steep_hill_downhill
    128,      #steep_hill_uphill
    128,      #stop
    128,      #tunnel_beginning
    128,      #turn_left
    128,      #turn_right
]

In [None]:
SIZE = 288

center_selected = False
center_x, center_y = 0, 0
abort_task = False
WINDOW_SCALE = 0.5
move_step = 1
move_directions = {'w': (0, -1), 'a': (-1, 0), 's': (0, 1), 'd': (1, 0)}

def select_center(event, x, y, flags, param):
    global center_selected, center_x, center_y
    if event == cv2.EVENT_LBUTTONDOWN:
        center_x = int(x / WINDOW_SCALE)
        center_y = int(y / WINDOW_SCALE)
        center_selected = True

def calculate_crop_bounds(img, center, size=SIZE):
    h, w = img.shape[:2]
    half_size = size // 2
    
    y1 = max(0, center[1] - half_size)
    y2 = min(h, center[1] + half_size)
    x1 = max(0, center[0] - half_size)
    x2 = min(w, center[0] + half_size)

    if y2 - y1 < size:
        if y1 == 0: y2 = size
        else: y1 = h - size
    if x2 - x1 < size:
        if x1 == 0: x2 = size
        else: x1 = w - size
    
    return x1, y1, x2, y2

def crop_around_center(img, center, size=SIZE):
    x1, y1, x2, y2 = calculate_crop_bounds(img, center, size)
    return img[y1:y2, x1:x2], (x1, y1, x2, y2)

def process_images(input_folder, output_folder):
    global abort_task
    
    os.makedirs(output_folder, exist_ok=True)
    valid_ext = ('.jpg', '.jpeg', '.png')
    images = [f for f in os.listdir(input_folder) if f.lower().endswith(valid_ext)]
    
    for filename in images:
        global center_selected, center_x, center_y
        center_selected = False
        center_x, center_y = 0, 0
        
        if abort_task:
            print("Task aborted by user!")
            return
        
        img_path = os.path.join(input_folder, filename)
        img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
        if img is None:
            continue
        
        original_channels = img.shape[2] if len(img.shape) > 2 else 1
        has_alpha = original_channels == 4
            
        cv2.namedWindow('Select Center', cv2.WINDOW_NORMAL)
        cv2.resizeWindow('Select Center', 
                        int(img.shape[1] * WINDOW_SCALE),       # type: ignore
                        int(img.shape[0] * WINDOW_SCALE))      # type: ignore
        cv2.setMouseCallback('Select Center', select_center)
        
        display_img = cv2.resize(img.copy(), None, fx=WINDOW_SCALE, fy=WINDOW_SCALE)
        h_display, w_display = display_img.shape[:2]
        
        instructions = [
            "LEFT CLICK: Select center",
            "SPACE: Confirm crop",
            "Q: Skip image",
            "ESC: Abort all"
        ]
        for i, text in enumerate(instructions):
            cv2.putText(display_img, text, (10, 30 + i*30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        while True:
            current_display = display_img.copy()
            key = cv2.waitKey(1) & 0xFF
            
            if center_selected and key in (ord('w'), ord('a'), ord('s'), ord('d')):
                dx, dy = move_directions[chr(key)]
                center_x = max(0, min(img.shape[1]-1, center_x + dx * move_step)) # type: ignore
                center_y = max(0, min(img.shape[0]-1, center_y + dy * move_step)) # type: ignore
                # print(f"Center moved to: ({center_x}, {center_y})")
            
            if center_selected:
                _, (x1, y1, x2, y2) = crop_around_center(img, (center_x, center_y))
                
                cv2.rectangle(current_display,
                            (int(x1 * WINDOW_SCALE), int(y1 * WINDOW_SCALE)),
                            (int(x2 * WINDOW_SCALE), int(y2 * WINDOW_SCALE)),
                            (0, 0, 255), 2)
                
                cv2.drawMarker(current_display,
                              (int(center_x * WINDOW_SCALE), int(center_y * WINDOW_SCALE)),
                              (0, 255, 0), cv2.MARKER_CROSS, 20, 2)
                
                # coord_text = f"({center_x}, {center_y})"
                # cv2.putText(current_display, coord_text,
                #           (int(center_x * WINDOW_SCALE) + 15, 
                #           int(center_y * WINDOW_SCALE) - 15),
                #           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

            cv2.putText(current_display, "WASD: Fine-tune position", 
                       (10, h_display - 30), cv2.FONT_HERSHEY_SIMPLEX, 
                       0.5, (0, 255, 255), 1)
            
            cv2.imshow('Select Center', current_display)
            
            if key == ord(' '):
                break
            elif key == ord('q'):
                cv2.destroyAllWindows()
                print(f"Skipped {filename}")
                break
            elif key == 27:
                abort_task = True
                cv2.destroyAllWindows()
                print("Aborting task...")
                return

        if abort_task:
            return
            
        if key == ord('q'):
            continue
            
        cropped, _ = crop_around_center(img, (center_x, center_y))
        
        if has_alpha and cropped.shape[2] != 4:
            raise ValueError("Alpha channel lost during cropping!")

        if cropped.dtype != img.dtype:
            raise TypeError(f"Data type changed from {img.dtype} to {cropped.dtype}")

        if cropped.shape[:2] != (SIZE, SIZE):
            print(f"Warning: Cropped size {cropped.shape} != {SIZE}x{SIZE}")
        
        output_path = os.path.join(output_folder, filename)
        if filename.lower().endswith('.png'):
            cv2.imwrite(output_path, cropped, [
                cv2.IMWRITE_PNG_COMPRESSION, 0,
                cv2.IMWRITE_PNG_STRATEGY, cv2.IMWRITE_PNG_STRATEGY_DEFAULT
            ])
        else:
            cv2.imwrite(output_path, cropped, [cv2.IMWRITE_JPEG_QUALITY, 100])
        
        print(f"Processed {filename} | Original: {os.path.getsize(img_path)//1024}KB -> Cropped: {os.path.getsize(output_path)//1024}KB")
        cv2.destroyAllWindows()

In [None]:
input_folder = r'DatasetTest'
output_folder = r'DatasetTest'

process_images(input_folder, output_folder)

# Renaming the Frames

In [None]:
base_dir = r'DatasetTest'

def natural_sort_key(s):
    return [int(text) if text.isdigit() else text.lower() 
            for text in re.split(r'(\d+)', s)]

In [None]:
sorted_folders = sorted(
    [f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f))],
    key=natural_sort_key
)

for class_idx, folder in tqdm(enumerate(sorted_folders), desc="Class Folders", unit="class"):
    folder_path = os.path.join(base_dir, folder)
    
    if not os.path.isdir(folder_path):
        continue

    img_files = [f for f in os.listdir(folder_path) 
                if f.lower().endswith(('.jpg', '.jpeg', '.png'))
                and not re.match(r'^frame_\d{5}\..+$', f)]
    
    if not img_files:
        continue

    img_files.sort(key=natural_sort_key)
    temp_dir = os.path.join(folder_path, "temp_renumber")
    os.makedirs(temp_dir, exist_ok=True)

    try:
        for filename in tqdm(img_files, desc=f"Moving '{folder}' files", unit="file"):
            os.rename(
                os.path.join(folder_path, filename),
                os.path.join(temp_dir, filename)
            )

        temp_files = sorted(os.listdir(temp_dir), key=natural_sort_key)
        for frame_idx, filename in enumerate(tqdm(temp_files, desc=f"Renaming '{folder}'", unit="file"), 1):
            ext = os.path.splitext(filename)[1].lower()
            new_name = f"{class_idx:02d}_01_{frame_idx:05d}{ext}"    # field 01 / frame 00
            os.rename(
                os.path.join(temp_dir, filename),
                os.path.join(folder_path, new_name)
            )
    finally:
         if os.path.exists(temp_dir):
            try:
                os.rmdir(temp_dir)
            except OSError as e:
                print(f"Could not delete '{temp_dir}', {e.strerror}")

print("Done!")

# Data Split

In [None]:
import shutil
from numpy import random

base_dir = r'DatasetTest/Base'
val_dir = r'DatasetTest/BaseVal'
output_train = r'DatasetTest/Train'
output_test = r'DatasetTest/Test'
output_val = r'DatasetTest/Val'

base_train_split = 200 
base_test_split = 200 
val_train_split = 106
val_test_split = 169
val_keep = 75 

In [None]:
def natural_sort_key(s):
    return [int(text) if text.isdigit() else text.lower() 
            for text in re.split(r'(\d+)', s)]

def process_split(src_dir, dest_dir, files, copy=True):
    os.makedirs(dest_dir, exist_ok=True)
    for f in files:
        src = os.path.join(src_dir, f)
        dst = os.path.join(dest_dir, f)
        if copy:
            shutil.copy(src, dst)
        else:
            shutil.move(src, dst)

sorted_classes = sorted(
    [f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f))],
    key=natural_sort_key
)

for class_folder in tqdm(sorted_classes, desc="Processing Base"):
    base_class_path = os.path.join(base_dir, class_folder)
    val_class_path = os.path.join(val_dir, class_folder)
    
    base_images = [f for f in os.listdir(base_class_path) 
                  if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    random.shuffle(base_images)
    
    base_train = base_images[:base_train_split]
    base_test = base_images[base_train_split:base_train_split+base_test_split]
    
    process_split(base_class_path, os.path.join(output_train, class_folder), base_train)
    process_split(base_class_path, os.path.join(output_test, class_folder), base_test)

for class_folder in tqdm(sorted_classes, desc="Processing Val"):
    val_class_path = os.path.join(val_dir, class_folder)
    
    val_images = [f for f in os.listdir(val_class_path) 
                if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    random.shuffle(val_images)
    
    val_keep_split = val_images[:val_keep]
    val_train = val_images[val_keep:val_keep+val_train_split]
    val_test = val_images[val_keep+val_train_split:val_keep+val_train_split+val_test_split]
    
    process_split(val_class_path, os.path.join(output_train, class_folder), val_train)
    process_split(val_class_path, os.path.join(output_test, class_folder), val_test)
    process_split(val_class_path, os.path.join(output_val, class_folder), val_keep_split)

print("Dataset splitting completed!")