In [None]:
# Simulated Object class

from dataclasses import dataclass
from typing import Optional

from src.dynamics.solver import DynamicsSolver

import torch

@dataclass
class SimulatedObject():
    mass: float
    coordinates: tuple
    velocity: tuple
    acceleration: tuple
    solver: DynamicsSolver
    save_past: int = 0
    past: list = None

    def update(self, dt: float, force: Optional[tuple] = None):
        if self.past is None and self.save_past > 0:
            self.past = []

        if self.past is not None:
            self.past.append((self.coordinates, self.velocity, self.acceleration))
            if len(self.past) > self.save_past:
                self.past.pop(0)

        self.acceleration = tuple([f / self.mass for f in force])
        x, v = self.solver.apply_force(x=torch.tensor(self.coordinates).view(1,1,-1), v=torch.tensor(self.velocity).view(1,1,-1), force=torch.tensor(force).view(1,1,-1), dt=dt)
        self.coordinates = tuple(x[0,-1,:].tolist())
        self.velocity = tuple(v[0,-1,:].tolist())

    def get_past_coordinates(self, dt: Optional[float] = None) -> tuple:
        if self.past is None:
            return [self.coordinates]
        
        if dt is None:
            dt = len(self.past)
        
        return [x for x, v, a in self.past[dt:]] + [self.coordinates]
    
    def get_past_velocity(self, dt: Optional[float] = None) -> tuple:
        if dt is None:
            dt = len(self.past)

        if self.past is None:
            return [self.velocity]
        return [v for x, v, a in self.past[dt:]] + [self.velocity]

    

In [None]:
# Agent class

from src.model.dynamics_model import DYNAMIC_MODELS

model_type = "dynamical_lstm"
model_save = "lightning_logs/version_3/checkpoints/epoch=9-step=8790.ckpt"

dynamical_predictor = DYNAMIC_MODELS[model_type].load_from_checkpoint(checkpoint_path=model_save)

def dynamical_agent(state: SimulatedObject) -> tuple:
    x = state.get_past_coordinates()
    x = torch.tensor(x).unsqueeze(0).to(dynamical_predictor.device)
    if x.size(1) < 2:
        x = x.repeat(1, 2, 1)
    
    force = dynamical_predictor(x)
    force = force * 0.001
    force = tuple(force[0,-1,:].tolist())

    return force
    


In [None]:
# Simulation parameters

minX, maxX = -10, 10
minY, maxY = -10, 10
minV, maxV = -0.005, 0.005
maxT = 200
nbObjects = 0
nbGravObjects = 0
nbAgents = 3
nbGtAgents = 3
mass = 1
gravity_field = -9.81 * 1e-5
tau = 5
ground_truth = True

assert (nbGtAgents == 0 and not ground_truth) or (nbGtAgents > 0 and ground_truth)

In [None]:
# Set ground truth movements

if ground_truth:

        from src.data.format_data import PandasFormatter
        from src.data.dataset import SeriesDataset
        import numpy as np
        import pandas as pd

        data_file = "data/test/22-10-20_C2_20.csv"

        formatter = PandasFormatter(pd.read_csv(data_file))
        sequences = formatter.format(output_format="dataclass").movements
        sequences = {ind : coords.to_numpy(dtype=np.float64).tolist() for ind, coords in sequences.items()}

        too_short_sequences = [ind for ind, seq in sequences.items() if len(seq) < maxT]
        for ind in too_short_sequences:
                del sequences[ind]
        assert len(sequences) >= nbGtAgents, f"Number of sequences ({len(sequences)}) is less than the number of ground truth agents ({nbGtAgents})"

        min_max_coords = tuple([(min(val), max(val)) for val in zip(*[sample for seq in sequences.values() for sample in seq])]) # min and max values for each dimension: [(min_x, max_x), (min_y, max_y), (min_z, max_z)]
        min_coord_t = torch.tensor([min_max_coords[0][0], min_max_coords[1][0], min_max_coords[2][0]])
        max_coord_t = torch.tensor([min_max_coords[0][1], min_max_coords[1][1], min_max_coords[2][1]])

        solver = DynamicsSolver(mass=mass, dimensions=3)
        def transform(sample):
                x, y, ind = sample

                x = (x - min_coord_t) / (max_coord_t - min_coord_t) # normalize data
                y = (y - min_coord_t) / (max_coord_t - min_coord_t) # normalize data
                
                y = solver.compute_acceleration(y.unsqueeze(0)) # target data is force applied on target step (t+1), corresponds to acceleration when setting mass=1
                y = y.squeeze(0)

                return x, y, ind

        dataset = SeriesDataset(sequences, lookback=tau+1, target_offset_start=1, target_offset_end=3, transform=transform) # add 2 to offset to compute acceleration of target step (t+1)
        trajectories = {}
        initial_states = {}
        authorised_agents = []
        for x, y, ind in dataset:
                if ind not in authorised_agents and len(authorised_agents) < nbGtAgents:
                        authorised_agents.append(ind)

                if ind in authorised_agents:
                        tx = tuple(x[0].tolist())
                        ta = tuple(y[0].tolist())
                        tv = tuple((np.array(ta) / mass).tolist())
                        if ind not in trajectories:
                                trajectories[ind] = []
                        trajectories[ind].append(ta)
                        if ind not in initial_states:
                                initial_states[ind] = (tx,tv,ta)
        
        class GtForceAttributor():
                def __init__(self, trajectories):
                        self.trajectories = trajectories
                        self.individuals = list(trajectories.keys())
                        self.nb_individuals = len(self.individuals)
                        self.idx = [0] * self.nb_individuals
                        self.current_ind = 0

                def __call__(self, state: SimulatedObject) -> tuple:
                        force = self.trajectories[self.individuals[self.current_ind]][self.idx[self.current_ind]]

                        self.idx[self.current_ind] += 1
                        self.current_ind = (self.current_ind + 1) % self.nb_individuals

                        return force
                
        gt_force_attributor = GtForceAttributor(trajectories)
        def ground_truth_agent(state: SimulatedObject) -> tuple:
                return gt_force_attributor(state)

In [None]:
# Set forces

import math

def bounceWall(object: SimulatedObject, dimension: int, is_max: bool, wall_coordinate: int):
    force = [0,0]
    if is_max and object.coordinates[dimension] > wall_coordinate:
        force[dimension] = -2 * object.mass * abs(object.velocity[dimension])
    elif not is_max and object.coordinates[dimension] < wall_coordinate:
        force[dimension] = 2 * object.mass * abs(object.velocity[dimension])
    return force + [0]

def gravity(object: SimulatedObject, gravity_field: float):
    return (0, gravity_field * object.mass, 0)

def radialGravity(object: SimulatedObject, gravity_field: float):
    center = (maxX + minX) / 2, (maxY + minY) / 2
    angle = math.atan2(object.coordinates[1] - center[1], object.coordinates[0] - center[0])
    return (math.cos(angle) * gravity_field * object.mass, math.sin(angle) * gravity_field * object.mass, 0)


object_forces = [
    lambda object, dt: bounceWall(object, 0, True, maxX),
    lambda object, dt: bounceWall(object, 0, False, minX),
    lambda object, dt: bounceWall(object, 1, True, maxY),
    lambda object, dt: bounceWall(object, 1, False, minY)
]

gravobject_forces = object_forces + [lambda object, dt: radialGravity(object, gravity_field)]

agent_forces = object_forces + [lambda object, dt: dynamical_agent(object)]

if ground_truth:
    gtagents_forces = object_forces + [lambda object, dt: ground_truth_agent(object)]

In [None]:
# Set initial states

if ground_truth:
    objects = [SimulatedObject(
        mass=mass,
        coordinates=initial_states[authorised_agents[i % len(authorised_agents)]][0],
        velocity=initial_states[authorised_agents[i % len(authorised_agents)]][1],
        acceleration=initial_states[authorised_agents[i % len(authorised_agents)]][2],
        solver=DynamicsSolver(mass=mass,dimensions=3),
        save_past=tau,
    ) for i in range(nbObjects)]
    gravobjects = [SimulatedObject(
        mass=mass,
        coordinates=initial_states[authorised_agents[i % len(authorised_agents)]][0],
        velocity=initial_states[authorised_agents[i % len(authorised_agents)]][1],
        acceleration=initial_states[authorised_agents[i % len(authorised_agents)]][2],
        solver=DynamicsSolver(mass=mass,dimensions=3),
        save_past=tau,
    ) for i in range(nbGravObjects)]
    agents = [SimulatedObject(
        mass=mass,
        coordinates=initial_states[authorised_agents[i % len(authorised_agents)]][0],
        velocity=initial_states[authorised_agents[i % len(authorised_agents)]][1],
        acceleration=initial_states[authorised_agents[i % len(authorised_agents)]][2],
        solver=DynamicsSolver(mass=mass,dimensions=3),
        save_past=tau,
    ) for i in range(nbAgents)]
    gtagents = [SimulatedObject(
        mass=mass,
        coordinates=initial_states[authorised_agents[i]][0],
        velocity=initial_states[authorised_agents[i]][1],
        acceleration=initial_states[authorised_agents[i]][2],
        solver=DynamicsSolver(mass=mass,dimensions=3),
        save_past=tau,
    ) for i in range(nbGtAgents)]
    individuals = objects + gravobjects + agents + gtagents

else:
    individuals = [SimulatedObject(
        mass=mass,
        coordinates=(np.random.uniform(minX, maxX), np.random.uniform(minY, maxY), 0),
        velocity=(np.random.uniform(minV, maxV), np.random.uniform(minV, maxV), 0),
        acceleration=(0, 0, 0),
        solver=DynamicsSolver(mass=mass,dimensions=3),
        save_past=tau,
    ) for _ in range(nbObjects + nbGravObjects + nbAgents + nbGtAgents)]

    objects = individuals[:nbObjects]
    gravobjects = individuals[nbObjects:nbObjects+nbGravObjects]
    agents = individuals[nbObjects+nbGravObjects:nbObjects+nbGravObjects+nbAgents]
    gtagents = individuals[nbObjects+nbGravObjects+nbAgents:]

In [None]:
%matplotlib notebook

# Run and display simulation

import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np

fig, ax = plt.subplots()
ax.axis([minX, maxX, minY, maxY])
lo,     = ax.plot([], [], "ro")
lgo,    = ax.plot([], [], "bo")
la,     = ax.plot([], [], "go")
lga,    = ax.plot([], [], "yo") 

def animate(i):
    for i, individual in enumerate(individuals):
        force = [0, 0, 0]
        if i < nbObjects:
            forces = object_forces
        elif i < nbObjects + nbGravObjects:
            forces = gravobject_forces
        elif i < nbObjects + nbGravObjects + nbAgents:
            forces = agent_forces
        elif i < nbObjects + nbGravObjects + nbAgents + nbGtAgents:
            forces = gtagents_forces
        else:
            raise ValueError("Too many individuals")

        for f in forces:
            dx, dy, dz = f(individual, 1)
            force[0] += dx
            force[1] += dy
        individual.update(1, tuple(force))
    
    lo.set_data([object.coordinates[0] for object in objects], [object.coordinates[1] for object in objects])
    lgo.set_data([gravobject.coordinates[0] for gravobject in gravobjects], [gravobject.coordinates[1] for gravobject in gravobjects])
    la.set_data([agent.coordinates[0] for agent in agents], [agent.coordinates[1] for agent in agents])
    lga.set_data([gtagent.coordinates[0] for gtagent in gtagents], [gtagent.coordinates[1] for gtagent in gtagents])
    
ani = matplotlib.animation.FuncAnimation(fig, animate, frames=maxT, interval=10)

from IPython.display import HTML
HTML(ani.to_jshtml())