In [1]:
# Updated: aruco_dataset_builder_splitter.py
import os
import cv2
import csv
import numpy as np
import shutil
import random
import json
from tqdm import tqdm
from datetime import datetime
from aruco_box_pose_estimation import CubePoseEstimator
from utility.rotation_utils import rotation_matrix_to_6d
import time
import datetime
import csv

PREREQS AND DECS AND INITIALS BLAAABLABLABLBLA

In [10]:
with open("GlobVar.json", "r") as file:
    gv = json.load(file)
batch = gv['batch']
BATCH_ID = batch
FRAME_STEP = 1  # Skip every N frames to reduce redundancy
VAL_RATIO = 0.1
TEST_RATIO = 0.1
TRAIN_RATIO = 1 - VAL_RATIO - TEST_RATIO
AUG_PER_FRAME = 1
VIDEO_PATH = "video/raw/ArUco.mp4"
BATCH_FOLDER = f"dataset/batch{BATCH_ID}"
UNSPLIT_FOLDER = os.path.join(BATCH_FOLDER, "unsplit")
IMAGES_FOLDER = os.path.join(UNSPLIT_FOLDER, "images")
LABELS_PATH = os.path.join(UNSPLIT_FOLDER, "labels.csv")

os.makedirs(IMAGES_FOLDER, exist_ok=True)

In [11]:
def adjust_brightness_contrast(image, alpha_range=(0.8, 1.2), beta_range=(-30, 30)):
    """Randomly adjusts brightness and contrast."""
    alpha = np.random.uniform(*alpha_range)  # Contrast
    beta = np.random.uniform(*beta_range)    # Brightness
    return cv2.convertScaleAbs(image, alpha=alpha, beta=beta)


MAKE THE LABLES

In [12]:
# Initialize
cap = cv2.VideoCapture(VIDEO_PATH)
cap.set(cv2.CAP_PROP_FPS, 30)
estimator = CubePoseEstimator()
frame_idx = 0
write_idx = 0
now = []
now.append(time.time())

with open(LABELS_PATH, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['image_name', 'x', 'y', 'z', 'r1x', 'r1y', 'r1z', 'r2x', 'r2y', 'r2z'])
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            if FRAME_STEP != 0 and frame_idx % FRAME_STEP != 0:
                frame_idx += 1
                continue

            cube_rvec, cube_tvec = estimator.process_frame(frame)
            if cube_rvec is not None:
                rmat, _ = cv2.Rodrigues(cube_rvec)
                r6d = rotation_matrix_to_6d(rmat)

                for i in range(AUG_PER_FRAME):
                    aug_frame = adjust_brightness_contrast(frame)

                    fname = f"frame_{write_idx:04d}_v{i}.png"
                    image_path = os.path.join(IMAGES_FOLDER, fname)
                    cv2.imwrite(image_path, aug_frame)

                    writer.writerow([
                        fname,
                        float(cube_tvec[0][0]),
                        float(cube_tvec[1][0]),
                        float(cube_tvec[2][0]),
                        *r6d
                    ])

                write_idx += 1  # Only increment write_idx per base frame

            frame_idx += 1
    finally:
        cap.release()
        now.append(time.time())
        print("✅ Finished reading video and writing labels.")
        print(f"🕒 Time took: {int(now[1]-now[0])}s")

✅ Finished reading video and writing labels.
🕒 Time took: 171s


LOAD THAT BIH AND SHUFFFLEE

In [13]:
# Load labels for split
with open(LABELS_PATH, 'r') as f:
    reader = list(csv.reader(f))
    header = reader[0]
    rows = reader[1:]

# random.seed(42)
random.shuffle(rows)

total = len(rows)
train_end = int(TRAIN_RATIO * total)
val_end = train_end + int(VAL_RATIO * total)

# First split by chunks, then shuffle each set to avoid overlap
train = rows[:train_end]
val = rows[train_end:val_end]
test = rows[val_end:]

random.shuffle(train)
random.shuffle(val)
random.shuffle(test)

CREATE THE TRAIN TEST VAL FOLDERS

In [14]:
# Create folders
for subset in ['train', 'val', 'test']:
    os.makedirs(os.path.join(BATCH_FOLDER, subset, 'images'), exist_ok=True)
now = []
now.append(time.time())
# Copy images + write labels
for subset_name, data in zip(['train', 'val', 'test'], [train, val, test]):
    subset_folder = os.path.join(BATCH_FOLDER, subset_name)
    csv_path = os.path.join(subset_folder, 'labels.csv')

    with open(csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header)
        for row in data:
            writer.writerow(row)
            src_img = os.path.join(IMAGES_FOLDER, row[0])
            dst_img = os.path.join(subset_folder, 'images', row[0])
            shutil.copy(src_img, dst_img)
now.append(time.time())
print("✅ Dataset split complete.")
print(f"Time took : {int(now[1]-now[0])}s")

✅ Dataset split complete.
Time took : 6s


WRITE IT OUTT (.md)

In [15]:
# Write metadata
sample_frame = cv2.imread(os.path.join(IMAGES_FOLDER, "frame_0000.png"))
md_name = "metadata_"+str(BATCH_ID)+".md"
metadata_path = os.path.join(BATCH_FOLDER, md_name)
with open(metadata_path, 'w') as f:
    f.write(f"# Batch {BATCH_ID} Metadata\n")
    f.write(f"- Date: {datetime.datetime.now().isoformat()}\n")
    f.write(f"- Total Frames: {len(rows)}\n")
    f.write(f"- Frame Step: {FRAME_STEP}\n")
    f.write(f"- Train: {len(train)} | Val: {len(val)} | Test: {len(test)}\n")
    f.write(f"- Image Resolution: 1280 x 480\n")
    f.write(f"- ArUco-based 6DoF Pose Estimation\n")
    f.write(f"- Augmented frames per frame : {AUG_PER_FRAME}\n")
    f.write(f"- pose_normalization': Zhou 6D Rotation Representation\n")
    f.write(f"- Cube Size: {estimator.CUBE_SIZE if hasattr(estimator, 'CUBE_SIZE') else 0.08}m\n")

print(f"📝 Metadata saved to {metadata_path}")
shutil.rmtree(UNSPLIT_FOLDER)
print("❌ Removed the unsplit folder")

📝 Metadata saved to dataset/batch7/metadata_7.md


[ WARN:0@764.820] global loadsave.cpp:268 findDecoder imread_('dataset/batch7/unsplit/images/frame_0000.png'): can't open/read file: check file path/integrity


❌ Removed the unsplit folder


In [16]:
csv_path = "dataset/dataset_info.csv"
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
write_header = not os.path.exists(csv_path)



csv_data = {
    'timestamp': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'batch_id': BATCH_ID,
    'total_frames': len(rows),
    'frame_step': FRAME_STEP,
    'train_count': len(train),
    'val_count': len(val),
    'test_count': len(test),
    "tr_val_test_ratio": f"{int(TRAIN_RATIO*100)}:{int(VAL_RATIO*100)}:{int(TEST_RATIO*100)}",
    'pose_method': "ArUco-based 6DoF Pose Estimation",
    'pose_normalization': "Zhou 6D Rotation Representation"
}

with open(csv_path, 'a', newline='') as csvfile:
    fieldnames = csv_data.keys()
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    if write_header:
        writer.writeheader()
    writer.writerow(csv_data)

print(f"📄 Dataset metadata appended to CSV: {csv_path}")


📄 Dataset metadata appended to CSV: dataset/dataset_info.csv


INCRIMENT .. or is it increment? encriment? incriment? wait that's just the same

In [17]:
gv['batch'] = batch + 1
# Save the updated JSON back to the file
with open("GlobVar.json", "w") as file:
    json.dump(gv, file, indent=4)
print(f"Batch ID updated to {gv['batch']}")

Batch ID updated to 8
