In [None]:
import json
import math
import os

import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import seaborn.objects as so
from sklearn.metrics import mean_squared_error

data_folder = "../data"


def read_metadata(data_folder):
    all_data = []

    for subdir in sorted(os.listdir(data_folder)):
        subdir_path = os.path.join(data_folder, subdir)

        if os.path.isdir(subdir_path):
            metadata_file = os.path.join(subdir_path, "metadata.json")

            if os.path.exists(metadata_file):
                with open(metadata_file, "r") as f:
                    metadata = json.load(f)

                    for snapshot in metadata.get("snapshots", []):
                        all_data.append({
                            "subdir": subdir,
                            "image_file": snapshot.get("filename", ""),
                            "x": snapshot.get("x", 0),
                            "y": snapshot.get("y", 0),
                            "theta": snapshot.get("theta", 0)
                        })

    return all_data


def get_full_path(row):
    image_path_in_subdir = os.path.join(row["subdir"], row["image_file"])
    return os.path.join(data_folder, image_path_in_subdir)


data = read_metadata(data_folder)

df = pd.DataFrame(data)

output_file = "data/compiled_metadata.csv"
df.to_csv(output_file, index=False)

print(f"Metadata compiled and saved to {output_file}")


In [None]:
df = pd.read_csv("data/compiled_metadata.csv")
df["full_path"] = df.apply(lambda row: os.path.join(data_folder, row["subdir"], row["image_file"]), axis=1)
missing_images_count = sum(~df["full_path"].apply(os.path.exists))
print(f"Number of missing images: {missing_images_count}")

In [None]:
background_img = mpimg.imread("assets/gametable.png")

In [None]:
image_height, image_width, _ = background_img.shape

In [None]:
len(df)

In [None]:
def plot_scatter_plot(dataframe):
    plt.figure(figsize=(8, 6))
    plt.imshow(background_img, extent=[0, image_width, image_height, 0], aspect='auto', alpha=0.9)
    plt.scatter(dataframe["x"], dataframe["y"], alpha=0.2, s=15, c="darkblue")
    plt.xlim(0, image_width)
    plt.ylim(image_height, 0)
    plt.xlabel("X Position")
    plt.ylabel("Y Position")
    plt.title("Scatter Plot of (x, y) Positions")
    plt.grid(True, linestyle="--", alpha=0.0)
    plt.show()


plot_scatter_plot(df)

In [None]:
def plot_density(df):
    plt.figure(figsize=(8, 6))
    plt.imshow(background_img, extent=[0, image_width, image_height, 0], aspect='auto', alpha=0.8)

    ax = sns.kdeplot(x=df["x"], y=df["y"], cmap="magma", fill=True, alpha=0.8)

    plt.xlim(0, image_width)
    plt.ylim(image_height, 0)
    plt.xlabel("X Position")
    plt.ylabel("Y Position")

    cbar = plt.colorbar(ax.collections[0], label="Density")

    plt.title("Density Heatmap of (x, y) Positions")
    plt.grid(True, linestyle="--", alpha=0.0)
    plt.show()

plot_density(df)

In [None]:
so.Plot(data=df, x="x").add(so.Bars(), so.Hist(bins=50))

In [None]:
so.Plot(data=df, x="y").add(so.Bars(), so.Hist(bins=50))

In [None]:
so.Plot(data=df, x="theta").add(so.Bars(), so.Hist(bins=50))

In [None]:
import numpy as np
import pandas as pd

def clean_df_by_density(df, grid_size=5, max_points_per_cell=1):
    x_min, x_max = df['x'].min(), df['x'].max()
    y_min, y_max = df['y'].min(), df['y'].max()

    x_bins = np.arange(x_min, x_max + grid_size, grid_size)
    y_bins = np.arange(y_min, y_max + grid_size, grid_size)

    df['x_bin'] = np.digitize(df['x'], x_bins) - 1
    df['y_bin'] = np.digitize(df['y'], y_bins) - 1
    df['cell'] = list(zip(df['x_bin'], df['y_bin']))

    sampled_indices = []
    grouped = df.groupby('cell')
    for cell, group in grouped:
        if len(group) <= max_points_per_cell:
            sampled_indices.extend(group.index.tolist())
        else:
            sampled_indices.extend(group.index[:max_points_per_cell])

    cleaned_df = df.loc[sampled_indices].copy()
    cleaned_df = cleaned_df.drop(columns=['x_bin', 'y_bin', 'cell'])

    cleaned_df.reset_index(drop=True, inplace=True)
    cleaned_df.to_csv("data/cleaned_metadata.csv", index=False)

    print(f"Original size: {len(df)}, Cleaned size: {len(cleaned_df)}")
    return cleaned_df

cleaned_df = clean_df_by_density(df, grid_size=3, max_points_per_cell=1)

In [None]:
plot_scatter_plot(cleaned_df)
plot_density(cleaned_df)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def plot_density_comparison(df_uncleaned, df_cleaned):
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))

    title_fontsize = 16
    label_fontsize = 14
    tick_fontsize = 12
    cbar_fontsize = 14

    axes[0].imshow(background_img, extent=[0, image_width, image_height, 0], aspect='auto', alpha=0.8)
    sns.kdeplot(x=df_uncleaned["x"], y=df_uncleaned["y"], cmap="magma", fill=True, alpha=0.8, ax=axes[0])
    axes[0].set_xlim(0, image_width)
    axes[0].set_ylim(300, 100)
    axes[0].set_title("Uncleaned Data Density", fontsize=title_fontsize)
    axes[0].set_xlabel("X Position", fontsize=label_fontsize)
    axes[0].set_ylabel("Y Position", fontsize=label_fontsize)
    axes[0].tick_params(axis='both', which='major', labelsize=tick_fontsize)

    axes[1].imshow(background_img, extent=[0, image_width, image_height, 0], aspect='auto', alpha=0.8)
    kde_plot = sns.kdeplot(x=df_cleaned["x"], y=df_cleaned["y"], cmap="magma", fill=True, alpha=0.8, ax=axes[1])
    axes[1].set_xlim(0, image_width)
    axes[1].set_ylim(300, 100)
    axes[1].set_title("Cleaned Data Density", fontsize=title_fontsize)
    axes[1].set_xlabel("X Position", fontsize=label_fontsize)
    axes[1].set_ylabel("Y Position", fontsize=label_fontsize)
    axes[1].tick_params(axis='both', which='major', labelsize=tick_fontsize)

    cbar = fig.colorbar(kde_plot.collections[0], ax=axes, orientation='vertical', fraction=0.05, pad=0.02)
    cbar.ax.set_ylabel("Probability Density", fontsize=cbar_fontsize)
    cbar.ax.tick_params(labelsize=tick_fontsize)

    plt.suptitle("Density Heatmap Comparison: Uncleaned vs Cleaned Data", fontsize=title_fontsize + 2, fontweight='bold')
    plt.show()

plot_density_comparison(df, cleaned_df)

In [None]:
cleaned_df["x"].min() - cleaned_df["x"].max(), cleaned_df["y"].min() - cleaned_df["y"].max(),

In [None]:
cleaned_df["x"] = (cleaned_df["x"] - cleaned_df["x"].mean()) / cleaned_df["x"].std()
cleaned_df["y"] = (cleaned_df["y"] - cleaned_df["y"].mean()) / cleaned_df["y"].std()
cleaned_df["theta"] = (cleaned_df["theta"] - cleaned_df["theta"].mean()) / cleaned_df["theta"].std()

In [None]:
so.Plot(data=cleaned_df, x='x', y='y').add(so.Dots())

In [None]:
import cv2
import os
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

def plot_random_images_grid(df, grid_size=(3, 3)):
    sample_data = df.sample(n=min(grid_size[0] * grid_size[1], len(df)))
    fig, axes = plt.subplots(grid_size[0], grid_size[1], figsize=(12, 12))

    for ax, (_, sample) in zip(axes.flat, sample_data.iterrows()):
        path = get_full_path(sample)
        if os.path.exists(path):
            img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
            if img is not None:
                img = cv2.resize(img, (160, 120))
                ax.imshow(img, cmap='gray')
                ax.set_title(f"x: {sample['x']:.3f}, y: {sample['y']:.3f}, θ: {sample['theta']:.3f}", fontsize=8)
            else:
                ax.set_title("Failed to Load", fontsize=8)
        else:
            print(f"Image not found: {path}")
            ax.set_title("Image Not Found", fontsize=8)

        ax.axis("off")

    plt.tight_layout()
    plt.show()

plot_random_images_grid(cleaned_df)

In [None]:
from sklearn.model_selection import train_test_split

def load_image(image_path):
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img, (160, 120))
    img = img / 255.0
    return img

X = np.array([load_image(path) for path in df["full_path"]])

X = X.reshape(X.shape[0], 120, 160, 1)

y = df[["x", "y", "theta"]].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train.shape, X_test.shape, y_train.shape, y_test.shape

In [None]:
from keras.src.losses import Huber

huber_loss = Huber(delta=1.0)

y_random_train = np.random.uniform(y_train.min(), y_train.max(), size=(len(y_train), 3))
train_score = huber_loss(y_train, y_random_train).numpy()

train_score


In [None]:
X_train.shape

In [None]:
from keras.src.layers import SeparableConv2D, GlobalAveragePooling2D, Dense, BatchNormalization, Dropout
from keras.src.layers import MaxPooling2D, Flatten
from keras import Sequential, Input

model = Sequential([
    Input((120, 160, 1)),

    SeparableConv2D(32, (3,3), activation='relu', padding='same'),
    MaxPooling2D(2,2),
    BatchNormalization(),

    SeparableConv2D(64, (3,3), activation='relu', padding='same'),
    MaxPooling2D(2,2),
    BatchNormalization(),

    SeparableConv2D(128, (3,3), activation='relu', padding='same'),
    MaxPooling2D(2,2),
    BatchNormalization(),

    GlobalAveragePooling2D(),

    Dense(64, activation='relu'),
    Dropout(0.2),
    Dense(32, activation='relu'),
    Dense(3, activation='linear')
])


In [None]:
model.summary()

In [None]:
from keras.src.optimizers.schedules import CosineDecayRestarts
import os
import tensorflow as tf
from datetime import datetime
from keras.src.callbacks import TensorBoard, ModelCheckpoint
from keras.src.optimizers import Adam
from keras.src.losses import Huber

checkpoint_folder = "ckpt"
os.makedirs(checkpoint_folder, exist_ok=True)
batch_size = 128

batches_per_epoch = len(X_train) // batch_size
first_decay_steps = 5 * batches_per_epoch

sgdr_scheduler = CosineDecayRestarts(
    initial_learning_rate=0.01,
    first_decay_steps=first_decay_steps,
    t_mul=2.0,
    m_mul=0.5,
    alpha=1e-5
)

optimizer = Adam(learning_rate=sgdr_scheduler)

model.compile(optimizer=optimizer, loss=Huber(delta=1.0))

log_path_file = "logs/latest_run_path.txt"
if os.path.exists(log_path_file):
    with open(log_path_file, "r") as f:
        log_dir = f.read().strip()
else:
    log_dir = os.path.join("logs", datetime.now().strftime("%Y%m%d-%H%M%S"))
    with open(log_path_file, "w") as f:
        f.write(log_dir)

tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True)

latest_checkpoint = os.path.join(checkpoint_folder, "latest.keras")
best_checkpoint = os.path.join(checkpoint_folder, "best.keras")

latest_checkpoint_callback = ModelCheckpoint(
    latest_checkpoint, monitor="val_loss", save_best_only=False, save_weights_only=False, verbose=1
)

best_checkpoint_callback = ModelCheckpoint(
    best_checkpoint, monitor="val_loss", save_best_only=True, save_weights_only=False, verbose=1
)

initial_epoch = 0
if os.path.exists(latest_checkpoint):
    print("Loading previous model state...")
    model = tf.keras.models.load_model(latest_checkpoint)
    with open(os.path.join(checkpoint_folder, "epoch.txt"), "r") as f:
        initial_epoch = int(f.read().strip())
    print(f"Resuming training from epoch {initial_epoch}")

model.compile(optimizer=optimizer, loss=Huber(delta=1.0))

In [None]:
epochs = 500
history = model.fit(
    X_train,
    y_train,
    validation_data=(X_test, y_test),
    epochs=epochs,
    batch_size=batch_size,
    initial_epoch=initial_epoch,
    callbacks=[tensorboard_callback, latest_checkpoint_callback, best_checkpoint_callback],
    verbose=False
)

In [None]:
model.save(latest_checkpoint)

with open(os.path.join(checkpoint_folder, "epoch.txt"), "w") as f:
    f.write(str(epochs))

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

with open("pose_estimator.tflite", "wb") as f:
    f.write(tflite_model)

In [None]:
from keras.src.saving import load_model

model = load_model(best_checkpoint)

In [None]:
y_pred = model.predict(X_test)
y_pred = (y_pred * cleaned_df[["x", "y", "theta"]].std().values) + cleaned_df[["x", "y", "theta"]].mean().values
y_test = (y_test * cleaned_df[["x", "y", "theta"]].std().values) + cleaned_df[["x", "y", "theta"]].mean().values


In [None]:
y_pred

In [None]:
y_test

In [None]:
import matplotlib.pyplot as plt

points = 10
arrow_length = 10
plt.figure(figsize=(8, 6))

plt.imshow(background_img, extent=[0, image_width, image_height, 0], aspect='auto', alpha=0.9)

plt.scatter(y_pred[:points, 0], y_pred[:points, 1], alpha=0.7, s=100, c='red', label='Predicted')
plt.scatter(y_test[:points, 0], y_test[:points, 1], alpha=0.7, s=100, c='blue', label='Actual')

for i in range(points):
    plt.plot([y_test[i, 0], y_pred[i, 0]], [y_test[i, 1], y_pred[i, 1]], 'k--', alpha=0.6)

    dx_pred = arrow_length * np.cos(y_pred[i, 2])
    dy_pred = arrow_length * np.sin(y_pred[i, 2])
    plt.arrow(y_pred[i, 0], y_pred[i, 1], dx_pred, dy_pred, head_width=5, head_length=5, fc='green', ec='green')

    dx_test = arrow_length * np.cos(y_test[i, 2])
    dy_test = arrow_length * np.sin(y_test[i, 2])
    plt.arrow(y_test[i, 0], y_test[i, 1], dx_test, dy_test, head_width=5, head_length=5, fc='green', ec='green')

plt.xlim(0, image_width)
plt.ylim(image_height, 0)

plt.xlabel("X Position")
plt.ylabel("Y Position")
plt.title("Scatter Plot of (x, y) Prediction vs Actual")

plt.legend()
plt.grid(True, linestyle='--', alpha=0.5)
plt.show()

In [None]:
from math import sqrt

mse = sqrt(mean_squared_error(y_test, y_pred))
print(f"Mean Squared Error: {mse:.3f}")