In [1]:
import glob
import pathlib
from typing import List, Tuple

from IPython.display import display, HTML
from PIL import Image
from loguru import logger
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
import trimesh
from src.utils.load import load_game_state


def load_sample_file(
    sample_file_path_pair: Tuple[pathlib.Path, pathlib.Path]
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Loads the JPEG and corresponding binary file from the specified sample file path,
    and returns the loaded images as NumPy arrays.
    """
    jpeg_file, binary_file = sample_file_path_pair
    jpeg_image = np.array(Image.open(jpeg_file))
    game_state = load_game_state(binary_file)
    return jpeg_image, game_state


def get_sample_file_paths(directory: str) -> List[Tuple[pathlib.Path, pathlib.Path]]:
    """
    Finds all the JPEG and corresponding binary files in the specified directory,
    and returns a list of tuples containing the file paths.
    """
    jpeg_files = list(pathlib.Path(directory).glob("*.jpeg"))
    jpeg_files.sort(key=lambda path: int(path.stem))

    sample_file_paths = []
    for jpeg_file in jpeg_files:
        binary_file = jpeg_file.with_suffix(".bin")
        if binary_file.exists():
            sample_file_paths.append((jpeg_file, binary_file))
        else:
            logger.warning(
                f"Could not find corresponding .bin file for {jpeg_file}. Skipping file."
            )

    return sample_file_paths

In [2]:
from src.track_gen.track_gen import Monza

track = Monza('../../tracks/monza/physics_mesh_object_groups.obj')
if len(track.group_names) > 0:
    logger.success(f"Loaded: {track.group_names}")
else:
    raise FileNotFoundError("No meshes loaded")

recording_path = "../../recordings/monza_audi_r8_lms_1"
sample_file_paths = get_sample_file_paths(recording_path)

# ax = plt.axes(projection="3d")
# min_val, max_val = -1000, 1000
# ax.set_xlim3d(min_val, max_val)
# ax.set_ylim3d(min_val, max_val)
# ax.set_zlim3d(min_val, max_val)
# old_x, old_y, old_z = None, None, None
# for i, frame in enumerate(positions):
#     x, y, z = frame

#     if old_x is None:
#         old_x, old_y, old_z = x, y, z
#         continue

#     else:
#         ax.plot3D([old_x, x], [old_z, z], [old_y, y], "gray")
#         old_x, old_y, old_z = x, y, z

# plt.show()

2023-02-22 18:34:43.337 | SUCCESS  | __main__:<module>:5 - Loaded: {'asph', 'wall', 'sand', 'penasphd', 'pencnca', 'concrete', 'carpet', 'tarmou', 'road', 'out', 'pencncd', 'curb', 'penaspha', 'grill', 'pitsmnz', 'penasphb', 'cncou', 'illconc', 'kerb', 'pengrsb', 'pengrsd', 'grass', 'penasphc'}


In [5]:
# Extract relevant data from state dictionary at each time step
positions_data = []

for sample in sample_file_paths[:100]:
    img, state = load_sample_file(sample)
    x, y, z = (
        state["ego_location_x"],
        state["ego_location_y"],
        state["ego_location_z"],
    )
    positions_data.append([x, y, z])

In [6]:
# Set up the figure and axis   
def gen_frame(dic) -> None:
    i, positions = dic.get('i'), dic.get('positions')
    fig, ax = plt.subplots(dpi=200)
    ax.set_aspect("equal")

    x, y, z = positions[i]

    # Select the last 60 positions
    historical_positions = positions[max(0, i - (30*3)) : i]

    # Select the future positions, up to the next 60
    future_positions = positions[i+1 : min(len(positions), i + (30*3) + 1)]

    # Concatenate historical and future positions

    if len(historical_positions) > 0:
        historical_and_future_positions = np.concatenate(
            [historical_positions, future_positions], axis=0
        )
    else:
        historical_and_future_positions = future_positions

    # Select the x and z coordinates
    trajectory = np.arange(len(historical_and_future_positions))[:, np.newaxis]
    trajectory = np.hstack([trajectory, trajectory])

    for mesh_id, mesh in [(m_id, m) for m_id, m in track.named_meshes.get("asph").items() if any(trimesh.bounds.contains(m.bounds, historical_and_future_positions))]:
        vertices_to_display = mesh.vertices

        ax.scatter(vertices_to_display[:, 0], vertices_to_display[:, 2], color='purple', s=1, alpha=0.2)




    # Draw the line gradient
    # for j in range(len(trajectory) - 1):
    #     idx1, idx2 = trajectory[j], trajectory[j + 1]
    #     x1, z1 = historical_and_future_positions[idx1, [0, 2]]
    #     x2, z2 = historical_and_future_positions[idx2, [0, 2]]
    #     color = colors[j]
    #     ax.plot([x1, x2], [z1, z2], color=color, linewidth=1, alpha=0.4)

    # Draw the dots for the historical positions
    for j in range(len(historical_positions)):
        _x, _, _z = historical_positions[j]
        ax.scatter(_x, _z, color=plt.cm.get_cmap("gray")(np.linspace(1, 0, len(historical_positions)))[j], s=1, alpha=0.4)


    # Draw the dots for the future positions
    # for j in range(len(future_positions)):
    #     x_, _, z_ = future_positions[j]
    #     ax.scatter(x_, z_, color=plt.cm.get_cmap("cool")(np.linspace(0, 1, len(future_positions)))[j], s=1, alpha=0.4)

    # Draw the solid dot for the current position
    ax.scatter(x, z, color="black", s=10)  

    # Set the camera view to show anything within 50 units of the current position
    ax.set_xlim([x - 500, x + 500])
    ax.set_ylim([z - 500, z + 500])
    plt.savefig(f'imgs/foo_{i}.png', bbox_inches='tight')
    plt.close()
    
    return 'done'

# for i in range(0, len(positions)):  #(500, 500+(30*10), 30): # 
#     gen_frame(i)

from concurrent.futures import ProcessPoolExecutor, as_completed

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Submit tasks to the executor
        tasks = []
        for i in range(10): # len(positions_data)
            tasks.append(executor.submit(gen_frame, {'i': i, 'positions': positions_data}))

        # Wait for tasks to complete and print progress
        for i, future in enumerate(as_completed(tasks)):
            print(future.result())
            print(f"Processed frame {i} of {len(positions_data)}")

Process SpawnProcess-5:
Traceback (most recent call last):
  File "/Users/adrian/work/assetto-corsa-interface/envs/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/Users/adrian/work/assetto-corsa-interface/envs/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/adrian/work/assetto-corsa-interface/envs/lib/python3.11/concurrent/futures/process.py", line 244, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/adrian/work/assetto-corsa-interface/envs/lib/python3.11/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'gen_frame' on <module '__main__' (built-in)>
Process SpawnProcess-6:
Traceback (most recent call last):
  File "/Users/adrian/work/assetto-corsa-interface/envs/lib/python3.11/multiprocessing/p

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [None]:
vertices_to_display[:, 0]