## Transformations Code
this code have all transformations done to the rosbags topics saved in .txt files

In [23]:
import yaml
import json
from collections import OrderedDict

In [24]:
# Keypoint order as specified
CORRECT_ORDER = [
    "Nose", "LEye", "REye", "LEar", "REar",
    "LShoulder", "RShoulder", "LElbow", "RElbow", "LWrist", "RWrist",
    "LHip", "RHip", "LKnee", "RKnee", "LAnkle", "RAnkle"
]

def parse_yaml_file(filename):
    """Yield each non-empty YAML document."""
    with open(filename, 'r') as f:
        for doc in yaml.safe_load_all(f):
            if doc is not None:
                yield doc

def extract_keypoints(person):
    """
    Return keypoints in the CORRECT_ORDER.
    Missing keypoints are [0.0, 0.0, 0.0].
    """
    part_map = {bp['part_id']: [float(bp['x']), float(bp['y']), float(bp['confidence'])]
                for bp in person.get('body_parts', [])}
    keypoints = []
    for part in CORRECT_ORDER:
        keypoints.extend(part_map.get(part, [0.0, 0.0, 0.0]))
    return keypoints

def compute_bbox(keypoints):
    """
    Compute bounding box [x_min, y_min, x_max, y_max] from valid keypoints.
    Only considers keypoints with confidence > 0.
    """
    xs, ys = [], []
    for i in range(0, len(keypoints), 3):
        x, y, c = keypoints[i], keypoints[i+1], keypoints[i+2]
        if c > 0:
            xs.append(x)
            ys.append(y)
    if not xs or not ys:
        return [0.0, 0.0, 0.0, 0.0]
    x_min, x_max = min(xs), max(xs)
    y_min, y_max = min(ys), max(ys)
    return [x_min, y_min, x_max, y_max]

import yaml

def extract_poses(seq, filename):
    with open(filename, 'r') as f:
        for doc in yaml.safe_load_all(f):
            if doc is None:
                continue
            header = doc.get('header', {})
            if header.get('seq', None) == seq:
                poses = doc.get('poses', [])
                result = []
                for pose in poses:
                    position = pose.get('position', {})
                    orientation = pose.get('orientation', {})
                    result.append({
                        "position": {
                            "x": float(position.get('x', 0.0)),
                            "y": float(position.get('y', 0.0)),
                            "z": float(position.get('z', 0.0)),
                        },
                        "orientation": {
                            "x": float(orientation.get('x', 0.0)),
                            "y": float(orientation.get('y', 0.0)),
                            "z": float(orientation.get('z', 0.0)),
                            "w": float(orientation.get('w', 0.0)),
                        }
                    })
                return result
    # If not found, return empty list
    return []


In [25]:
keypoints_file = 'image_detections.txt'
positions_file = 'raw_bodies.txt'
output_file = 'keypoints_output.json'

In [26]:
from collections import defaultdict

# Group data by frame (seq)
frame_data = defaultdict(list)

# Process all documents to group by frame
for doc in parse_yaml_file(keypoints_file):
    seq = doc.get('header', {}).get('seq', None)
    persons = doc.get('persons', [])
    
    if seq is not None:  # Include all frames, even those with empty persons
        if persons:  # Only process if there are persons detected
            # Get poses for this frame
            poses_result = extract_poses(seq, positions_file)
            
            # Process each person in this frame
            for person_idx, person in enumerate(persons):
                keypoints = extract_keypoints(person)
                bbox = compute_bbox(keypoints)
                
                # Extract position coordinates from corresponding pose if available
                x, y, z = 0.0, 0.0, 0.0
                if poses_result and person_idx < len(poses_result):
                    position = poses_result[person_idx].get('position', {})
                    x = position.get('x', 0.0)
                    y = position.get('y', 0.0)
                    z = position.get('z', 0.0)
                
                # Create coordinate entry for this person
                coordinate_entry = OrderedDict([
                    ("id", person_idx + 1),  # Person ID starting from 1
                    ("x", x),
                    ("y", y),
                    ("z", z),
                    ("bbox", bbox),
                    ("keypoints", keypoints)
                ])
                
                frame_data[seq].append(coordinate_entry)
        else:
            # Frame exists but no persons detected - ensure frame is included with empty coordinates
            frame_data[seq] = []

# Write output in EPFL format
with open(output_file, 'w') as out_f:
    for frame_number in sorted(frame_data.keys()):
        coordinates = frame_data[frame_number]
        
        # Create frame entry
        frame_entry = OrderedDict([
            ("frame", frame_number),
            ("coordinates", coordinates)
        ])
        
        json_string = json.dumps(frame_entry, sort_keys=False)
        out_f.write(json_string + '\n')