<a href="https://colab.research.google.com/github/harry-graves/Aria_ORI/blob/main/lexis_proxy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LEXIS Proxy
This notebook uses the conversion given in trajectory_conversion.ipynb to convert trajectories into the TUM file format, before sampling the poses by euclidean distance, and using this information to sample images taken along the trajectory at equal distances.

These sampled images are ran through CLIP, with text prompts of several different room labels, such as office, kitchen, corridor etc. The room label with the highest probability according to CLIP is then written onto the image and saved for inspection.

## Converting to TUM

In [None]:
import os

# To convert OpenVINS trajectories (.txt) to .tum format

def convert_ov_to_tum(input_file):
    # Change the extension of the input file from .txt to .tum
    output_file = os.path.splitext(input_file)[0] + '.tum'

    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        first_line = True  # Flag to skip the header
        for line in infile:
            if first_line:
                first_line = False  # Skip the first line (header)
                continue

            if line.startswith("#") or not line.strip():
                continue  # Skip comments and empty lines

            data = line.strip().split()
            timestamp = data[0]
            # Extract quaternion and position components
            q_x, q_y, q_z, q_w = data[1:5]  # Quaternion
            p_x, p_y, p_z = map(float, data[5:8])  # Position

            # Write in TUM format: timestamp, p_x, p_y, p_z, q_x, q_y, q_z, q_w
            outfile.write(f"{timestamp} {p_x} {p_y} {p_z} {q_x} {q_y} {q_z} {q_w}\n")


# To convert Aria MPS trajectories (.csv) to .tum format

def convert_mps_to_tum(input_file,loop="closed"):
    # Change the extension of the input file from .txt or .csv to .tum
    output_file = os.path.splitext(input_file)[0] + '.tum'

    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        first_line = True  # Flag to skip the header
        for line in infile:
            if first_line:
                first_line = False  # Skip the first line (header)
                continue

            if not line.strip():
                continue  # Skip empty lines

            data = line.strip().split(',')

            # The closed loop and open loop outputs have slightly differnet formats
            # For open loop, the timestamp is in column 0, vs column 1 for closed loop
            if loop == "open":
                timestamp_us = float(data[0])  # tracking_timestamp_us
            else:
                timestamp_us = float(data[1])  # tracking_timestamp_us
            # Convert microseconds to seconds for TUM format
            timestamp_s = timestamp_us / 1e6

            # Extract position components
            p_x, p_y, p_z = map(float, data[3:6])  # Convert to float

            # Extract quaternion components
            q_x, q_y, q_z, q_w = data[6:10]

            # Write in TUM format: timestamp, p_x, p_y, p_z, q_x, q_y, q_z, q_w
            outfile.write(f"{timestamp_s:.6f} {p_x} {p_y} {p_z} {q_x} {q_y} {q_z} {q_w}\n")


# Usage
input_filepath = 'ov_estimate.txt'
convert_ov_to_tum(input_filepath)

input_filepath = 'open_loop_trajectory.csv'
convert_mps_to_tum(input_filepath,loop="open")

input_filepath = 'closed_loop_trajectory.csv'
convert_mps_to_tum(input_filepath,loop="closed")

## Sampling images by distance

In [None]:
import os
import zipfile
import numpy as np
import re
from shutil import copyfile

def extract_timestamp_from_image_name(image_name):
    """
    Extracts the timestamp from the image filename.
    Assumes the format: image_builtin_interfaces.msg.Time(sec=<sec>, nanosec=<nanosec>).jpg
    """
    match = re.search(r"sec=(\d+), nanosec=(\d+)", image_name)
    if match:
        sec = int(match.group(1))
        nanosec = int(match.group(2))
        timestamp_s = sec + nanosec / 1e9  # Convert to seconds
        return timestamp_s
    else:
        return None

def sample_images_by_distance(input_tum_file, images_dir, output_zipfile, sampling_distance=2.0):
    """
    Sample images every 'sampling_distance' meters along the trajectory and zip the closest images.

    input_tum_file: File containing the trajectory in TUM format.
    images_dir: Directory containing the images.
    output_zipfile: Name of the output zip file to store the sampled images.
    sampling_distance: The distance interval in meters to sample images.
    """
    with open(input_tum_file, 'r') as infile:
        lines = infile.readlines()

    sampled_timestamps = []
    prev_position = None

    for line in lines:
        if not line.strip() or line.startswith("#"):
            continue

        data = line.strip().split()
        timestamp = float(data[0])
        p_x, p_y, p_z = map(float, data[1:4])

        current_position = np.array([p_x, p_y, p_z])

        if prev_position is None:
            sampled_timestamps.append(timestamp)
            prev_position = current_position
        else:
            distance = np.linalg.norm(current_position - prev_position)

            if distance >= sampling_distance:
                sampled_timestamps.append(timestamp)
                prev_position = current_position

    # Parse all image filenames and extract their timestamps
    image_files = os.listdir(images_dir)
    image_timestamps = []
    for image_file in image_files:
        image_timestamp = extract_timestamp_from_image_name(image_file)
        if image_timestamp is not None:
            image_timestamps.append((image_timestamp, image_file))

    # Sort images by their timestamp
    image_timestamps.sort()

    # Find the closest images to the sampled timestamps
    selected_images = []
    for sampled_ts in sampled_timestamps:
        closest_image = min(image_timestamps, key=lambda x: abs(x[0] - sampled_ts))
        selected_images.append(closest_image[1])

    # Create a new zip file with the selected images
    with zipfile.ZipFile(output_zipfile, 'w') as zipf:
        for image_file in selected_images:
            image_path = os.path.join(images_dir, image_file)
            zipf.write(image_path, arcname=image_file)

    print(f"Sampled images have been zipped into {output_zipfile}")

# Usage example
input_tum_file = 'closed_loop_trajectory.tum'
images_dir = 'path/to/images'  # Directory where the images are stored
output_zipfile = 'sampled_images.zip'
sample_images_by_distance(input_tum_file, images_dir, output_zipfile, sampling_distance=2.0)


## Room labels according to CLIP

In [None]:
import torch
import clip
from PIL import Image, ImageDraw, ImageFont
import zipfile
import os

# Load CLIP model
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

# Define the classes
room_classes = ["office", "corridor", "hallway", "staircase", "meeting room", "kitchen"]
text = clip.tokenize(room_classes).to(device)

# Unzip the images from sampled_images.zip
with zipfile.ZipFile('data/sampled_images.zip', 'r') as zip_ref:
    zip_ref.extractall('images_sampled')

# Create a folder to store the labeled images
output_folder = 'images_labelled'
os.makedirs(output_folder, exist_ok=True)

# Load a font with a larger size
font_size = 48  # Increase the font size
font = ImageFont.truetype("arial.ttf", font_size)  # Replace with a valid font file path if necessary

# Iterate through the images in the unzipped folder
for image_name in os.listdir('images_sampled'):
    image_path = os.path.join('images_sampled', image_name)
    if image_path.endswith(".jpg"):
        # Preprocess the image and pass it through the CLIP model
        image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)

        with torch.no_grad():
            image_features = model.encode_image(image)
            logits_per_image, _ = model(image, text)
            probs = logits_per_image.softmax(dim=-1).cpu().numpy()[0]

        # Find the label with the highest probability
        max_prob_index = probs.argmax()
        predicted_label = room_classes[max_prob_index]

        # Load the original image
        img = Image.open(image_path)
        draw = ImageDraw.Draw(img)

        # Determine the position to place the text (bottom-right corner)
        text_size = draw.textsize(predicted_label, font=font)
        image_width, image_height = img.size
        text_position = (image_width - text_size[0] - 10, image_height - text_size[1] - 10)

        # Add the predicted label to the image
        draw.text(text_position, predicted_label, font=font, fill="white")

        # Save the labeled image to the output folder
        labeled_image_path = os.path.join(output_folder, image_name)
        img.save(labeled_image_path)

print(f"Labeled images saved to {output_folder}")


## Utils

In [None]:
# To clear all TUM files:

# Specify the directory (current working directory)
directory = '.'  # Current directory

# Loop through all files in the directory
for filename in os.listdir(directory):
    # Check if the file ends with .tum
    if filename.endswith('.tum'):
        # Construct full file path
        file_path = os.path.join(directory, filename)
        # Delete the file
        os.remove(file_path)
        print(f"Deleted: {file_path}")