IMPORTS

In [2]:
import numpy as np 
import pandas as pd 
import os 
import json 
from PIL import Image
import random 
import time 
import cv2


CLASS DEFINITIONS 

In [3]:
class datapoint:
    def __init__(self, metadata_filepath, pose_filepath, rgb_filepath, seg_png_filepath, seg_json_filepath):
        # Store the filepaths
        self.metadata_filepath = metadata_filepath
        self.pose_filepath = pose_filepath
        self.rgb_filepath = rgb_filepath
        self.seg_png_filepath = seg_png_filepath
        self.seg_json_filepath = seg_json_filepath
        
        # Read the actual data from files and store it
        self.metadata = self._read_json(metadata_filepath) if metadata_filepath else None
        self.pose = self._read_json(pose_filepath) if pose_filepath else None
        self.rgb = self._read_rgb(rgb_filepath) if rgb_filepath else None
        self.seg_png = self._read_segmentation_png(seg_png_filepath) if seg_png_filepath else None
        self.seg_json = self._read_segmentation_json(seg_json_filepath) if seg_json_filepath else None 

        # read pose data 
        self.cam_pose = np.array(self.pose["cam"]).transpose() 
        self.tag_pose = np.array(self.pose["tag"]).transpose()  
        self.tag_pose *= np.array([
                            [10,10,10,1],
                            [10,10,10,1],
                            [10,10,10,1],
                            [1,1,1,1]
                        ]) # rescale the tag 

    def _read_json(self, filepath):
        """Read and parse JSON files."""
        with open(filepath, 'r') as file:
            return json.load(file)

    def _read_rgb(self, filepath):
        """Placeholder for reading RGB image files."""
        return filepath  # Placeholder: returning the file path to avoid memory overload

    def _read_segmentation_png(self, filepath):
        """Placeholder for reading segmentation PNG image files."""
        return filepath  # Placeholder: returning the file path to avoid memory overload

    def _read_segmentation_json(self, filepath):
        """Read segmentation JSON files."""
        with open(filepath, 'r') as file:
            return json.load(file)

    def compute_diffusion_reflectance(self): 
        """Compute the diffuse reflection based on pose and metadata."""
        N = np.array(self.pose["tag"])[:3,2] 
        L = np.array(self.pose["light"])[:3,2] 
        V = np.array(self.pose["cam"])[:3,2] 
        light_exposure = self.metadata["light"]["exposure"] 
        I_incident = 2**light_exposure 
        shininess = 1.0  # Placeholder value 
        self.diffuse_reflection = I_incident * max(np.dot(N, L), 0)

    def __repr__(self):
        """Custom representation for the datapoint object."""
        return f"datapoint(metadata_filepath={self.metadata_filepath}, pose_filepath={self.pose_filepath}, rgb_filepath={self.rgb_filepath}, seg_png_filepath={self.seg_png_filepath}, seg_json_filepath={self.seg_json_filepath})"


class DataProcessor:
    def __init__(self, data_folders, out_dir):
        self.data_folders = data_folders
        self.out_dir = out_dir
        self.datapoints = []
        self.datapoints_train = []
        self.datapoints_val = []

    def _get_files_in_subfolder(self, folder, file_extension=None):
        """Helper method to get files in a subfolder, with an optional file extension filter."""
        files_list = os.listdir(folder)
        if file_extension:
            files_list = [file for file in files_list if file.endswith(file_extension)]
        # Order files_list by date created
        files_list = sorted(files_list, key=lambda x: os.path.getctime(os.path.join(folder, x)))  # Assumes creation dates are synchronized
        return files_list

    def process_folders(self):
        """Process the folders and create datapoint objects."""
        for data_folder in self.data_folders:
            metadata_subfolder = os.path.join(data_folder, "metadata")
            pose_subfolder = os.path.join(data_folder, "pose")
            rgb_subfolder = os.path.join(data_folder, "rgb")
            seg_subfolder = os.path.join(data_folder, "seg")

            # List files in subfolders 
            metadata_files = self._get_files_in_subfolder(metadata_subfolder, file_extension=".json")
            pose_files = self._get_files_in_subfolder(pose_subfolder, file_extension=".json")
            rgb_files = self._get_files_in_subfolder(rgb_subfolder, file_extension=".png")
            seg_png_files = self._get_files_in_subfolder(seg_subfolder, file_extension=".png")
            seg_json_files = self._get_files_in_subfolder(seg_subfolder, file_extension=".json")

            # Make sure the files are indexed and aligned properly (by index) across the subfolders
            max_length = max(len(metadata_files), len(pose_files), len(rgb_files), len(seg_png_files), len(seg_json_files))

            # Verify that the lengths are the same
            if not all(len(files) == max_length for files in [metadata_files, pose_files, rgb_files, seg_png_files, seg_json_files]):
                print(f"Lengths do not match for folder: {data_folder}")
                continue

            for i in range(max_length):
                # Use index 'i' to fetch corresponding files. If a file doesn't exist, use None.
                metadata_filepath = os.path.join(metadata_subfolder, metadata_files[i]) if i < len(metadata_files) else None
                pose_filepath = os.path.join(pose_subfolder, pose_files[i]) if i < len(pose_files) else None
                rgb_filepath = os.path.join(rgb_subfolder, rgb_files[i]) if i < len(rgb_files) else None
                seg_png_filepath = os.path.join(seg_subfolder, seg_png_files[i]) if i < len(seg_png_files) else None
                seg_json_filepath = os.path.join(seg_subfolder, seg_json_files[i]) if i < len(seg_json_files) else None

                # Create a datapoint object for each corresponding file
                data_point = datapoint(metadata_filepath, pose_filepath, rgb_filepath, seg_png_filepath, seg_json_filepath)
                self.datapoints.append(data_point)

    def get_datapoints(self):
        """Return the list of datapoint objects."""
        return self.datapoints
    
    def get_datapoints_filtered(self):
        """Return the list of filtered datapoint objects."""
        return self.datapoints_filtered 

    def filter_datapoints(self): 
        """Compute the diffusion reflectance and only keep datapoints with positive values."""
        self.datapoints_filtered = [] 
        for dp in self.datapoints:
            dp.compute_diffusion_reflectance() 
            if dp.diffuse_reflection > 0: 
                self.datapoints_filtered.append(dp)

    def split_train_val(self, filter=True, frac_train=0.8):
        """Split the datapoints into training and validation datasets."""
        if filter: 
            self.datapoints_train = random.sample(self.datapoints_filtered, int(frac_train * len(self.datapoints_filtered)))
            self.datapoints_val = [dp for dp in self.datapoints_filtered if dp not in self.datapoints_train]
        else:
            self.datapoints_train = random.sample(self.datapoints, int(frac_train * len(self.datapoints)))
            self.datapoints_val = [dp for dp in self.datapoints if dp not in self.datapoints_train]

    def create_directories(self):
        """Create directories for training and validation data."""
        dir_train = os.path.join(self.out_dir, "train")
        dir_val = os.path.join(self.out_dir, "val")
        dir_train_rgb = os.path.join(dir_train, "rgb")
        dir_train_seg = os.path.join(dir_train, "seg")
        dir_val_rgb = os.path.join(dir_val, "rgb")
        dir_val_seg = os.path.join(dir_val, "seg")

        os.makedirs(dir_train_rgb, exist_ok=True)
        os.makedirs(dir_train_seg, exist_ok=True)
        os.makedirs(dir_val_rgb, exist_ok=True)
        os.makedirs(dir_val_seg, exist_ok=True)

        return dir_train_rgb, dir_train_seg, dir_val_rgb, dir_val_seg

    def preprocess_rgb(self, img_path):  
        """Preprocess RGB image by resizing it."""
        new_size = (480, 270)  # Define the new size
        img = Image.open(img_path)
        img_resized = img.resize(new_size)
        return img_resized

    def preprocess_seg_img(self, seg_img_path, seg_json_path, tag_seg_color=None):
        """
        Preprocesses the segmentation image by resizing and converting it to a binary mask based on tag color.
        """
        # Validate that the segmentation image file exists
        if not os.path.exists(seg_img_path):
            raise FileNotFoundError(f"Segmentation image file not found: {seg_img_path}")

        # Validate that the JSON file exists
        if not os.path.exists(seg_json_path):
            raise FileNotFoundError(f"Segmentation JSON file not found: {seg_json_path}")

        # Load the segmentation JSON data if tag_seg_color is not provided
        if tag_seg_color is None:
            with open(seg_json_path, 'r') as json_file:
                seg_json = json.load(json_file)

            # Find the tag color from the JSON data
            for key, val in seg_json.items(): 
                if val.get("class") == "tag0":  
                    # Convert the key (which is a string representing a tuple) into an actual tuple
                    tag_seg_color = tuple(map(int, key.strip('()').split(', ')))  # Convert string '(140, 25, 255, 255)' into a tuple (140, 25, 255, 255)
                    break
            else:
                # raise ValueError("Tag with class 'tag0' not found in JSON.")
                tag_seg_color = tuple([-1,-1,-1,-1]) # impossible color value # FIXME: this is a workaround which can be turned into something more elegant 

        # Load and resize the segmentation image
        seg_img = Image.open(seg_img_path)
        new_size = (480, 270)
        seg_img_resized = seg_img.resize(new_size)

        # Convert the resized image to a NumPy array
        seg_img_resized = np.array(seg_img_resized)

        # Check if the image is RGB (3 channels) or RGBA (4 channels) or grayscale (1 channel)
        if len(seg_img_resized.shape) == 3:
            if seg_img_resized.shape[2] == 3:  # RGB image
                # Compare each pixel to the tag color (e.g., RGB triplet)
                seg_img_resized = np.all(seg_img_resized == tag_seg_color[:3], axis=-1)  # Create binary mask for RGB image
            elif seg_img_resized.shape[2] == 4:  # RGBA image
                # Compare each pixel to the tag color (RGBA)
                seg_img_resized = np.all(seg_img_resized == tag_seg_color, axis=-1)  # Create binary mask for RGBA image
        else:  # If it's a single channel (grayscale), use it directly
            seg_img_resized = seg_img_resized == tag_seg_color  # Compare pixel values directly

        # Convert the binary mask to uint8 type (0 or 1)
        seg_img_resized = (seg_img_resized).astype(np.uint8) * 255  # Multiply by 255 to match image range

        # Convert the binary mask back to an image
        seg_img_resized = Image.fromarray(seg_img_resized)

        return seg_img_resized

    def save_preprocessed_images(self, frac_train=0.8):
        """Loop through train and val datapoints and save preprocessed images and segmentation masks."""
        dir_train_rgb, dir_train_seg, dir_val_rgb, dir_val_seg = self.create_directories()

        for i, dp in enumerate(self.datapoints_train): 
            img = self.preprocess_rgb(dp.rgb_filepath) 
            seg = self.preprocess_seg_img(dp.seg_png_filepath, dp.seg_json_filepath) 
            img.save(os.path.join(dir_train_rgb, f"img_{i}.png")) 
            seg.save(os.path.join(dir_train_seg, f"seg_{i}.png"))

        for i, dp in enumerate(self.datapoints_val):
            img = self.preprocess_rgb(dp.rgb_filepath) 
            seg = self.preprocess_seg_img(dp.seg_png_filepath, dp.seg_json_filepath) 
            img.save(os.path.join(dir_val_rgb, f"img_{i}.png")) 
            seg.save(os.path.join(dir_val_seg, f"seg_{i}.png"))


In [4]:
# data_folders = [
#     "/home/anegi/abhay_ws/marker_detection_failure_recovery/output/markers_20250225-151250/", # 4K 
#     "/home/anegi/abhay_ws/marker_detection_failure_recovery/output/markers_20250224-105046/", # 36K 
#     "/home/anegi/abhay_ws/marker_detection_failure_recovery/output/markers_20250223-110933/", # 19K 
#     "/home/anegi/abhay_ws/marker_detection_failure_recovery/output/markers_20250222-220540/", # 16K 
# ]

data_folders = [
    "/media/rp/Elements/abhay_ws/marker_detection_failure_recovery/data/marker_obj_sdg/markers_20250222-220217/" 
]

# define OUT_DIR based on current date and time 
# OUT_DIR = f"/home/anegi/abhay_ws/marker_detection_failure_recovery/segmentation_model/data/data_{time.strftime('%Y%m%d-%H%M%S')}"
OUT_DIR = os.path.join(data_folders[0], "out_dir") 
os.makedirs(OUT_DIR, exist_ok=True) 

# Create an instance of the DataProcessor class
processor = DataProcessor(data_folders, OUT_DIR)

# Process the folders to create the datapoint list
processor.process_folders()

# Retrieve and print length of the datapoints before and after filtering 
print(f"Number of datapoints: {len(processor.datapoints)}") 
processor.filter_datapoints() 
print(f"Number of filtered datapoints: {len(processor.datapoints_filtered)}")  

# Retrieve filtered datapoints
datapoints = processor.get_datapoints_filtered()

# Split the datapoints into training and validation sets
frac_train = 0.8
processor.split_train_val(filter=True, frac_train=frac_train) 
processor.save_preprocessed_images()


Number of datapoints: 3
Number of filtered datapoints: 3


In [None]:

# def world_to_pixel(camera_intrinsics, pose_SE3, world_points):
#     """
#     Converts a list of 3D world points to 2D pixel coordinates using the SE(3) pose and camera intrinsics.

#     Parameters:
#     - camera_intrinsics: A 3x3 matrix containing the camera intrinsic parameters.
#     - pose_SE3: A 4x4 transformation matrix representing the SE(3) pose.
#     - world_points: A list of 3D points in world space, where each point is a 3-element array or list [x, y, z].

#     Returns:
#     - pixel_points: A list of 2D points in pixel space [(x_pixel, y_pixel), ...].
#     """
#     pixel_points = []
    
#     for world_point in world_points:
#         # Convert world point to homogeneous coordinates
#         world_point_homogeneous = np.append(world_point, 1)  # Shape (4,)

#         # Apply the SE(3) transformation (rotation + translation)
#         camera_point_homogeneous = np.dot(pose_SE3, world_point_homogeneous)

#         # Extract the 3D camera coordinates (x, y, z)
#         x_cam, y_cam, z_cam = camera_point_homogeneous[:3]

#         # If the point is behind the camera, skip it
#         if z_cam <= 0:
#             pixel_points.append(None)  # Add None to indicate invalid point
#             continue

#         # Project the 3D point onto the 2D image plane
#         pixel_point_homogeneous = np.dot(camera_intrinsics, np.array([x_cam, y_cam, z_cam]))

#         # Convert homogeneous coordinates to 2D (divide by z_cam for perspective division)
#         x_pixel = pixel_point_homogeneous[0] / pixel_point_homogeneous[2]
#         y_pixel = pixel_point_homogeneous[1] / pixel_point_homogeneous[2]

#         # Add the pixel coordinates to the result list
#         pixel_points.append((x_pixel, y_pixel))

#     return pixel_points


def overlay_points_on_image(image, pixel_points, radius=5, color=(0, 0, 255), thickness=-1):
    """
    Overlays a list of pixel points on the input image.

    Parameters:
    - image: The input image (a NumPy array).
    - pixel_points: A list of 2D pixel coordinates [(x1, y1), (x2, y2), ...].
    - radius: The radius of the circle to draw around each point. Default is 5.
    - color: The color of the circle (BGR format). Default is red (0, 0, 255).
    - thickness: The thickness of the circle. Default is -1 to fill the circle.

    Returns:
    - The image with points overlaid.
    """
    # Iterate over each pixel point and overlay it on the image
    for point in pixel_points:
        if point is not None:  # Only overlay valid points
            x, y = int(point[0]), int(point[1])
            # Draw a filled circle at the pixel coordinates
            cv2.circle(image, (x, y), radius, color, thickness)

    return image

import numpy as np

def marker_to_pixel(camera_intrinsics, pose_SE3, marker_corners_marker_space):
    """
    Converts the marker corners from marker space to image space using camera intrinsics and SE(3) pose.

    Parameters:
    - camera_intrinsics: A 3x3 camera intrinsic matrix.
    - pose_SE3: A 4x4 SE(3) pose matrix of the marker with respect to the camera.
    - marker_corners_marker_space: A list of 3D points in marker space (4 points, each [x, y, z]).

    Returns:
    - marker_corners_image_space: A list of 2D points (x_pixel, y_pixel) in image space.
    """
    
    marker_corners_image_space = []

    # Loop through each marker corner in marker space
    for corner in marker_corners_marker_space:
        # Convert corner to homogeneous coordinates (x, y, z, 1)
        marker_point_homogeneous = np.append(corner, 1)

        # Apply the SE(3) pose transformation to the marker corner
        camera_point_homogeneous = np.dot(pose_SE3, marker_point_homogeneous)

        # Extract the 3D camera coordinates (x, y, z)
        x_cam, y_cam, z_cam = camera_point_homogeneous[:3]

        # If the point is behind the camera, skip this point
        if z_cam <= 0:
            marker_corners_image_space.append(None)
            continue

        # Project the 3D camera point to 2D image point using camera intrinsics
        pixel_point_homogeneous = np.dot(camera_intrinsics, np.array([x_cam, y_cam, z_cam]))

        # Convert homogeneous coordinates to 2D (divide by z_cam for perspective division)
        x_pixel = pixel_point_homogeneous[0] / pixel_point_homogeneous[2]
        y_pixel = pixel_point_homogeneous[1] / pixel_point_homogeneous[2]

        # Add the pixel coordinates to the result list
        marker_corners_image_space.append((x_pixel, y_pixel))

    return marker_corners_image_space

dp = processor.datapoints[2] 

fx = 400 
fy = 400 
cx = 640/2 
cy = 480/2 
camera_intrinsics = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])  

tf_w_t = dp.tag_pose  
tf_c_c = np.array([
    [1, 0, 0, 0],
    [0, -1, 0, 0],
    [0, 0, -1, 0],
    [0, 0, 0, 1]
])
# tf_c_c = np.eye(4)  

tf_w_c = tf_c_c @ dp.cam_pose  

tf_t_t = np.array([
    [-1,0,0,0],
    [0,0,-1,0],
    [0,-1,0,0],
    [0,0,0,1] 
])

tf_c_t = np.linalg.inv(tf_w_c) @ tf_t_t @ tf_w_t  

# tf_c_t = np.array([
#     [1, 0, 0, 0],
#     [0, 1, 0, 0],
#     [0, 0, 1, 1],
#     [0, 0, 0, 1] 
# ])

world_points = [
    [0,0,0],
    [.1,.1,0],
    [-.1,.1,0],
    [-.1,-.1,0],
    [.1,-.1,0],
]

marker_corners_marker_space = world_points


pixel_points = marker_to_pixel(camera_intrinsics=camera_intrinsics, pose_SE3=tf_c_t, marker_corners_marker_space=marker_corners_marker_space) 


print(pixel_points) 

image = cv2.imread(dp.rgb) 
output_image = overlay_points_on_image(image, pixel_points)

# Show the output image with overlaid points
cv2.imshow("Image with Points", output_image)
cv2.waitKey(0)
cv2.destroyAllWindows()

[None, None, None, None, None]


In [22]:
print(tf_c_t[:3,:3]) 
print(tf_c_t[:3,3]) 

[[-0.15463849  0.98682839  0.04750464]
 [-0.66287751 -0.13928818  0.73565769]
 [ 0.73258472  0.08227124  0.67568565]]
[ 1.59152025e-16 -3.18304050e-16 -7.16756999e-01]


In [24]:
print(tf_w_c[:3,:3]) 
print(tf_w_c[:3,3])

[[ 4.93038066e-32  1.00000000e+00 -2.22044605e-16]
 [ 2.22044605e-16 -2.22044605e-16 -1.00000000e+00]
 [-1.00000000e+00  0.00000000e+00 -2.22044605e-16]]
[0. 0. 0.]
