In [1]:
import pandas as pd
import json
from dateutil import parser
import math
import numpy as np
import os
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

from my_utils import *

## metadata.json

In [None]:
metadata = {}
for img_id in [f"{i:06d}" for i in range(100000)]:
    fp = f"infos/single_frames/{img_id}/metadata.json"
    with open(fp, "r") as file:
        metadata[img_id] = json.load(file)

In [None]:
# Format into dataframe first
keys = ["frame_id", "time", "country_code", "scraped_weather", "road_type", "road_condition", "time_of_day", "num_vehicles", "longitude", "latitude", "solar_angle_elevation"]
rows = []
for _, item in metadata.items():
    row = []
    for key in keys:
        row.append(item[key])
    rows.append(row)

metadata_df = pd.DataFrame(rows, columns=keys)
metadata_df.head()

0. Start a new dataframe to hold the converted data

In [None]:
metadata_converted_df = pd.DataFrame()
metadata_converted_df["image_id"] = metadata_df["frame_id"]

1. Convert time from ISO 8601 format to Unix time

In [None]:
def iso_to_unix(iso_str):
    dt = parser.isoparse(iso_str)  # Parse ISO 8601 format
    unix_timestamp = dt.timestamp()  # Convert to Unix time (float)
    return unix_timestamp

# Perform conversion on the "time" column
metadata_converted_df["time"] = metadata_df["time"].apply(iso_to_unix)

metadata_converted_df.head()

2. Convert country_code to numerical labels

In [None]:
metadata_converted_df["country_code"], encoding_cc = pd.factorize(metadata_df["country_code"])

metadata_converted_df.head()

In [None]:
print("Category Mapping:", dict(enumerate(encoding_cc)))

3. Convert scraped_weather to numerical value

In [None]:
print(metadata_df["scraped_weather"].unique())

In [None]:
'''
A mannual encoding is performed here, with the following rationale:
- 0 = Clear-day: Ideal lighting, no obstructions.
- 1 = Clear-night: Less ideal than daytime due to reduced lighting, but still clear.
- 2 = Partly-cloudy-day: Slightly reduced contrast, but mostly clear.
- 3 = Partly-cloudy-night: Similar to clear-night but with clouds reducing moonlight.
- 4 = Cloudy: Diffused lighting, lower contrast but no major obstructions.
- 5 = Wind: Motion blur can be an issue, especially for lightweight objects.
- 6 = Rain: Water droplets on the lens, reflections, and reduced visibility.
- 7 = Snow: More occlusion than rain, with objects blending into the white background.
- 8 = Fog: The most challenging: heavy occlusion, low contrast, and objects may be completely invisible.
'''
def weather_encoding(weather):
    encoding = {
        "clear-day": 0,
        "clear-night": 1,
        "partly-cloudy-day": 2,
        "partly-cloudy-night": 3,
        "cloudy": 4,
        "wind": 5,
        "rain": 6,
        "snow": 7,
        "fog": 8 
    }

    return encoding[weather]

metadata_converted_df["weather"] = metadata_df["scraped_weather"].apply(weather_encoding)

metadata_converted_df.head()

In [None]:
encoding_weather = {
        0: "clear-day",
        1: "clear-night",
        2: "partly-cloudy-day",
        3: "partly-cloudy-night",
        4: "cloudy",
        5: "wind",
        6: "rain",
        7: "snow",
        8: "fog" 
}

3. Convert road type and road condition

In [None]:
metadata_converted_df["road_type"], encoding_rt = pd.factorize(metadata_df["road_type"])
metadata_converted_df["road_condition"], encoding_rc = pd.factorize(metadata_df["road_condition"])

metadata_converted_df.head()

In [None]:
print("Category Mapping:", dict(enumerate(encoding_rt)))
print("Category Mapping:", dict(enumerate(encoding_rc)))

4. Convert time of day

In [None]:
metadata_converted_df["time_of_day"], encoding_td = pd.factorize(metadata_df["time_of_day"])

metadata_converted_df.head()

In [None]:
print("Category Mapping:", dict(enumerate(encoding_td)))

5. Copy the rest, which does not need conversion

In [None]:
metadata_converted_df["num_vehicles"] = metadata_df["num_vehicles"]
metadata_converted_df["longitude"] = metadata_df["longitude"]
metadata_converted_df["latitude"] = metadata_df["latitude"]
metadata_converted_df["solar_angle_elevation"] = metadata_df["solar_angle_elevation"]

metadata_converted_df.head()

6. Record the encoding schema

In [None]:
metadata_encoding_schema = {
    "country_code": dict(enumerate(encoding_cc)),
    "weather": encoding_weather,
    "time_of_day": dict(enumerate(encoding_td)),
    "road_type": dict(enumerate(encoding_rt)),
    "road_condition": dict(enumerate(encoding_rc))
}

# same as a json file
with open("outputs/metadata_encoding.json", "w") as file:
    json.dump(metadata_encoding_schema, file, indent=4)

7. Save the converted metadata to the corresponding folder

In [None]:
starts = [1 + 5000 * i for i in range(10)]

for start in starts:
    end = start + 5000 - 1
    target_imgs = pd.read_csv(
        f"outputs/{start}_{end}/predictions.csv", usecols=["image_id"], dtype=str
    ).squeeze()

    selector = metadata_converted_df["image_id"].isin(target_imgs)

    metadata_converted_df[selector].to_csv(
        f"outputs/{start}_{end}/metadata/metadata.csv",
        index=False
    )

## calibration.json

In [None]:
calibration = {}
for img_id in [f"{i:06d}" for i in range(100000)]:
    fp = f"infos/single_frames/{img_id}/calibration.json"
    with open(fp, "r") as file:
        calibration[img_id] = json.load(file)

In [None]:
rows = []
for img_id, calibration_img in calibration.items():
    calibration_img = pd.json_normalize(calibration_img, sep='_')
    calibration_img.insert(0, "image_id", img_id)
    rows.append(calibration_img)

calibration_df = pd.concat(rows, ignore_index=True, sort=False)

In [None]:
calibration_df.head()

0. Prepare a new dataframe for the converted data

In [None]:

calibration_converted_df = pd.DataFrame()
calibration_converted_df["image_id"] = calibration_df["image_id"]

calibration_converted_df.head()

1. Extract features from FC_intrinsics

In [None]:
"""
Explanation of camera intrinsics: https://ksimek.github.io/2013/08/13/intrinsic/

The format of intrinsics:
First row: fx, 0, cx, 0 (focal length in x, principal point x)
Second row: 0, fy, cy, 0 (focal length in y, principal point y)
Third row: 0, 0, 1, 0 (homogeneous coordinates)

"""


def extract_focal_length_x(matrix):
    return matrix[0][0]


def extract_focal_length_y(matrix):
    return matrix[1][1]


def extract_principle_point_x(matrix):
    return matrix[0][2]


def extract_principle_point_y(matrix):
    return matrix[0][2]


calibration_converted_df["focal_length_x"] = calibration_df["FC_intrinsics"].apply(
    extract_focal_length_x
)
calibration_converted_df["focal_length_y"] = calibration_df["FC_intrinsics"].apply(
    extract_focal_length_y
)
calibration_converted_df["principle_point_x"] = calibration_df["FC_intrinsics"].apply(
    extract_principle_point_x
)
calibration_converted_df["principle_point_y"] = calibration_df["FC_intrinsics"].apply(
    extract_principle_point_x
)

calibration_converted_df.head()

2. Extract features from FC_extrinsics

In [None]:
'''
Raw data:
The first 3Ã—3 part represents the rotation matrix.
The last column (first three rows) represents the translation vector (x, y, z) in meters.

Conversion:
1. Convert the rotation matrix to Euler angles: yaw, pitch, roll
2. The translation vector directly gives the position of the camera in the world coordinate system.
'''

def extract_pose_x(matrix):
    return matrix[0][-1]

def extract_pose_y(matrix):
    return matrix[1][-1]

def extract_pose_z(matrix):
    return matrix[2][-1]

def extract_pose_yaw(matrix):
    yaw = math.atan2(matrix[1][0], matrix[0][0])
    return yaw

def extract_pose_pitch(matrix):
    pitch = math.asin(-matrix[2][0])
    return pitch

def extract_pose_roll(matrix):
    roll = math.atan2(matrix[2][1], matrix[2][2])
    return roll

calibration_converted_df["camera_pose_x"] = calibration_df["FC_extrinsics"].apply(extract_pose_x)
calibration_converted_df["camera_pose_y"] = calibration_df["FC_extrinsics"].apply(extract_pose_y)
calibration_converted_df["camera_pose_z"] = calibration_df["FC_extrinsics"].apply(extract_pose_z)
calibration_converted_df["camera_pose_yaw"] = calibration_df["FC_extrinsics"].apply(extract_pose_yaw)
calibration_converted_df["camera_pose_pitch"] = calibration_df["FC_extrinsics"].apply(extract_pose_pitch)
calibration_converted_df["camera_pose_roll"] =  calibration_df["FC_extrinsics"].apply(extract_pose_roll)

calibration_converted_df.head()

3. Extract features from FC_field_of_view

In [None]:
'''
The original format is: horizontal FOV, vertical FOV
Simply flatten the data.
'''
calibration_converted_df["horizontal_fov"] = calibration_df["FC_field_of_view"].apply(lambda x: x[0])
calibration_converted_df["vertical_fov"] = calibration_df["FC_field_of_view"].apply(lambda x: x[1])

calibration_converted_df.head()

4. Save the converted calibrations to the corresponding folder.

In [None]:
starts = [1 + 5000 * i for i in range(10)]

for start in starts:
    end = start + 5000 - 1
    target_imgs = pd.read_csv(
        f"outputs/{start}_{end}/predictions.csv", usecols=["image_id"], dtype=str
    ).squeeze()

    selector = calibration_converted_df["image_id"].isin(target_imgs)

    calibration_converted_df[selector].to_csv(
        f"outputs/{start}_{end}/metadata/calibration.csv",
        index=False
    )

## ego_motion.json

In [None]:
ego_motion = {}
for img_id in [f"{i:06d}" for i in range(50000)]:
    fp = f"infos/single_frames/{img_id}/ego_motion.json"
    with open(fp, "r") as file:
        ego_motion[img_id] = json.load(file)
    break

In [None]:
keys = ["timestamps", "poses", "velocities", "accelerations", "angular_rates"]
rows = []
for image_id, item in ego_motion.items():
    row = [image_id]
    for key in keys:
        row.append(item[key])
    rows.append(row)
    
ego_motion_df = pd.DataFrame(rows, columns=["image_id"]+keys)
ego_motion_df.head()

1. Prepare a new dataframe for the converted data

In [None]:
ego_motion_converted_df = pd.DataFrame()
ego_motion_converted_df["image_id"] = ego_motion_df["image_id"]

ego_motion_converted_df.head()

2. Select the pose at the middle timestamp, and convert to x, y, z, yaw, pitch, and roll

In [None]:
def extract_pose_x(matrices):
    middle_idx = int(len(matrices) / 2)
    matrix = matrices[middle_idx]
    return matrix[0][-1]

def extract_pose_y(matrices):
    middle_idx = int(len(matrices) / 2)
    matrix = matrices[middle_idx]
    return matrix[1][-1]

def extract_pose_z(matrices):
    middle_idx = int(len(matrices) / 2)
    matrix = matrices[middle_idx]
    return matrix[2][-1]

def extract_pose_yaw(matrices):
    middle_idx = int(len(matrices) / 2)
    matrix = matrices[middle_idx]
    yaw = math.atan2(matrix[1][0], matrix[0][0])
    return yaw

def extract_pose_pitch(matrices):
    middle_idx = int(len(matrices) / 2)
    matrix = matrices[middle_idx]
    pitch = math.asin(-matrix[2][0])
    return pitch

def extract_pose_roll(matrices):
    middle_idx = int(len(matrices) / 2)
    matrix = matrices[middle_idx]
    roll = math.atan2(matrix[2][1], matrix[2][2])
    return roll

ego_motion_converted_df["ego_pose_x"] = ego_motion_df["poses"].apply(extract_pose_x)
ego_motion_converted_df["ego_pose_y"] = ego_motion_df["poses"].apply(extract_pose_y)
ego_motion_converted_df["ego_pose_z"] = ego_motion_df["poses"].apply(extract_pose_z)
ego_motion_converted_df["ego_pose_yaw"] = ego_motion_df["poses"].apply(extract_pose_yaw)
ego_motion_converted_df["ego_pose_pitch"] = ego_motion_df["poses"].apply(extract_pose_pitch)
ego_motion_converted_df["ego_pose_roll"] = ego_motion_df["poses"].apply(extract_pose_roll)

ego_motion_converted_df.head()

3. Derive speed variance

In [None]:
def compute_speed_variance(velocities):
    speeds = np.linalg.norm(velocities, axis=1)  # Compute speed magnitude
    variance = np.var(speeds)  # Compute variance
    return variance

ego_motion_converted_df["speed_var"] = ego_motion_df["velocities"].apply(compute_speed_variance)

ego_motion_converted_df.head()

4. Compute jerk (rate of acceleration change)

In [None]:
def compute_jerk(accelerations, timestamps):
    accelerations = np.array(accelerations)
    timestamps = np.array(timestamps)

    dt = np.diff(timestamps)  # Time differences
    da = np.diff(accelerations, axis=0)  # Acceleration differences

    jerk = da / dt[:, np.newaxis]  # Compute jerk
    return np.mean(jerk), np.max(jerk), np.std(jerk)


jerk_ls = []

for _, row in ego_motion_df.iterrows():
    jerk = compute_jerk(row["accelerations"], row["timestamps"])
    jerk_ls.append(jerk)


ego_motion_converted_df[["mean_jerk", "max_jerk", "st_jerk"]] = jerk_ls

ego_motion_converted_df.head()

5. Derive angular acceleration

In [None]:
def compute_angular_acceleration(angular_rates, timestamps):
    angular_rates = np.array(angular_rates)
    timestamps = np.array(timestamps)

    dt = np.diff(timestamps)  # Time differences
    dw = np.diff(angular_rates, axis=0)  # Angular rate differences

    angular_acceleration = dw / dt[:, np.newaxis]
    return (
        np.mean(angular_acceleration),
        np.max(angular_acceleration),
        np.std(angular_acceleration),
    )


angular_acc_ls = []

for _, row in ego_motion_df.iterrows():
    angular_acc = compute_angular_acceleration(row["angular_rates"], row["timestamps"])
    angular_acc_ls.append(angular_acc)


ego_motion_converted_df[["mean_angular_acc", "max_angular_acc", "st_angular_acc"]] = (
    angular_acc_ls
)

ego_motion_converted_df.head()

5. Compute lateral acceleration

In [None]:
def compute_lateral_acceleration(velocities, angular_rates):
    speeds = np.linalg.norm(velocities, axis=1)  # Compute speed magnitude
    yaw_rates = np.array(angular_rates)[:, 2]  # Extract yaw rate (z-axis rotation)

    lateral_acceleration = speeds * yaw_rates
    return (
        np.mean(lateral_acceleration),
        np.max(lateral_acceleration),
        np.std(lateral_acceleration),
    )

lateral_acc_ls = []

for _, row in ego_motion_df.iterrows():
    lateral_acc = compute_lateral_acceleration(row["velocities"], row["angular_rates"])
    lateral_acc_ls.append(lateral_acc)


ego_motion_converted_df[["mean_lateral_acc", "max_lateral_acc", "st_lateral_acc"]] = (
    lateral_acc_ls
)

ego_motion_converted_df.head()

6. Save the converted ego motion data to the corresponding folder

In [None]:
starts = [1 + 5000 * i for i in range(10)]

for start in starts:
    end = start + 5000 - 1
    target_imgs = pd.read_csv(
        f"outputs/{start}_{end}/predictions.csv", usecols=["image_id"], dtype=str
    ).squeeze()

    selector = ego_motion_converted_df["image_id"].isin(target_imgs)

    ego_motion_converted_df[selector].to_csv(
        f"outputs/{start}_{end}/metadata/ego_motion.csv",
        index=False
    )

## Color Features

1. Data Preparation (image paths)

In [4]:
for i in range(10): 
    start = 5000 * i + 1
    end = start + 5000 - 1

    # Step 1: get image ids
    image_ids = pd.read_csv(
        f"outputs/{start}_{end}/predictions.csv",
        usecols=["image_id"],
    )

    image_ids = image_ids.drop_duplicates()
    image_ids = image_ids.to_numpy()

    image_ids = image_ids.flatten()

    # Step 2: compose image paths
    image_folders = [
        f"single_frames_img/{image_id:06d}/camera_front_blur" for image_id in image_ids
    ]
    image_paths = [
        os.path.join(image_folder, os.listdir(image_folder)[0])
        for image_folder in image_folders
    ]

    # Step 3: calculate image features
    rows = []

    for image_path in image_paths:
        row = compute_image_features(image_path)
        rows.append(row)

    # Step 4: format results as a dataframe with image ids and image paths
    image_feature_df = pd.DataFrame(rows)
    image_feature_df["image_id"] = image_ids
    image_feature_df["image_path"] = image_paths

    image_feature_df.head()
    image_feature_df.to_csv(f"outputs/{start}_{end}/metadata/image_features.csv", index=False)

    print(f"Finished: {start}_{end}")

Finished: 1_5000
Finished: 5001_10000
Finished: 10001_15000
Finished: 15001_20000
Finished: 20001_25000
Finished: 25001_30000
Finished: 30001_35000
Finished: 35001_40000
Finished: 40001_45000
Finished: 45001_50000


2. Do KNN clustering to check if features are useful

In [None]:
def kmeans(data_matrix, max_clusters=10):
    """
    Perform K-means clustering on a matrix of numerical features.

    Parameters:
        data_matrix (np.ndarray): 2D array, shape (n_samples, n_features)
        max_clusters (int): Maximum k to try for elbow method

    Returns:
        dict: {
            "labels": array of cluster labels,
            "centers": cluster centers,
            "optimal_k": number of clusters chosen,
            "pca_variance_ratio": variance captured by PCA axes
        }
    """
    # Ensure numpy array
    feature_names = data_matrix.columns
    data = np.array(data_matrix)
    assert data.ndim == 2, "Input data must be a matrix (2D array)."

    # Scale features
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(data)

    # 1. Elbow Method for finding optimal number of clusters
    inertias = []
    for k in range(1, max_clusters + 1):
        kmeans = KMeans(n_clusters=k, random_state=42)
        kmeans.fit(scaled_data)
        inertias.append(kmeans.inertia_)

    # Plot elbow curve
    plt.figure(figsize=(6, 4))
    plt.plot(range(1, max_clusters + 1), inertias, marker="o")
    plt.title("Elbow Method For Optimal k")
    plt.xlabel("Number of Clusters")
    plt.ylabel("Inertia (Within-Cluster SSE)")
    plt.grid(True)
    plt.show()

    # Choose optimal_k manually or use heuristic (elbow near sharp drop)
    optimal_k = int(input("Enter optimal number of clusters (k) based on elbow plot: "))

    # 2. Fit KMeans with chosen k
    kmeans = KMeans(n_clusters=optimal_k, random_state=42)
    labels = kmeans.fit_predict(scaled_data)

    # 3. PCA for dimensionality reduction (to 2D)
    pca = PCA(n_components=2)
    reduced_data = pca.fit_transform(scaled_data)

    # Get the absolute value of the loadings
    loadings = np.abs(pca.components_)  # shape: (n_components, n_features)

    # Find the most influential feature for each component
    most_important_feature = []
    for i, component in enumerate(loadings):
        most_important_idx = np.argmax(component)
        most_important_feature.append(feature_names[most_important_idx])
        print(f"Principal Component {i + 1} is most influenced by: {most_important_feature[i]}")

    # 4. Visualization
    plt.figure(figsize=(6, 5))
    scatter = plt.scatter(reduced_data[:, 0], reduced_data[:, 1], c=labels, cmap="tab10", s=50)
    plt.title(f"K-Means Clustering (k={optimal_k}) - PCA Projection")
    plt.xlabel(most_important_feature[0])
    plt.ylabel(most_important_feature[1])
    plt.grid(True)
    plt.legend(*scatter.legend_elements(), title="Cluster")
    plt.show()

    return {
        "labels": labels,
        "centers": kmeans.cluster_centers_,
        "optimal_k": optimal_k,
        "pca_variance_ratio": pca.explained_variance_ratio_
    }

    

In [None]:
results = kmeans(image_feature_df.iloc[:, :-3])

In [None]:
# Add labels to the original dataset and save again
image_feature_df["cluster"] = results["labels"]

image_feature_df.to_csv("outputs/1_5000/metadata/image_features_k6.csv", index=False)

3. Check images by labels

In [None]:
selector = image_feature_df["cluster"] == 5
image_cluster = image_feature_df[selector]["image_path"].to_list()

navigate_images(image_cluster)