In [None]:
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
import pedpy
import sqlite3
import pandas as pd
from shapely import GeometryCollection, Polygon

In [None]:
filename = "example1_out.sqlite"
p1 = Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])
p2 = Polygon([(10, 4), (20, 4), (20, 6), (10, 6)])
area = GeometryCollection(p1.union(p2))
walkable_Area = pedpy.WalkableArea(area.geoms[0])

In [None]:
def sqlite_to_df(trajectory_file: str)->tuple[pd.DataFrame, int]:
       """extract dataframe and fps from sqlite.""" 
       with sqlite3.connect(trajectory_file) as conn:
              cursor = conn.cursor()
              cursor.execute(f"SELECT * FROM trajectory_data")
              rows = cursor.fetchall()
              cursor.execute("SELECT * FROM metadata WHERE key = 'fps'")
              row = cursor.fetchone()
         
       return pd.DataFrame(rows,columns=["frame", "id", "x", "y", "ox", "oy"]),  int(float(row[1]))

In [None]:
data, fps = sqlite_to_df(filename)
trajectory_data = pedpy.TrajectoryData(data=data, frame_rate=fps)
speed = pedpy.compute_individual_speed(traj_data=trajectory_data, frame_step=5)
speed = speed.merge(trajectory_data.data, on=['id', 'frame'], how='left')
speed["radius"] = 0.2

In [None]:
def speed_to_color(speed, min_speed, max_speed, midpoint):
    colorscale = px.colors.diverging.RdBu_r[::-1]
    
    # Normalize speed based on the midpoint
    if speed >= midpoint:
        normalized_speed = 0.5 + 0.5 * (speed - midpoint) / (max_speed - midpoint)
    else:
        normalized_speed = 0.5 * (speed - min_speed) / (midpoint - min_speed)

    # Clip to ensure the value is between 0 and 1
    normalized_speed = np.clip(normalized_speed, 0, 1)

    # Find the corresponding color in the colorscale
    color_idx = int(normalized_speed * (len(colorscale) - 1))
    return colorscale[color_idx]

In [None]:
def get_geometry_traces(area):
    geometry_traces = []
    x, y = area.geoms[0].exterior.xy
    geometry_traces.append(go.Scatter(x=np.array(x), y=np.array(y), mode='lines', line={"color": "grey"}))
    for inner in area.geoms[0].interiors:
        xi,yi = zip(*inner.coords[:])
        geometry_traces.append(go.Scatter(x=np.array(xi), y=np.array(yi), mode='lines', line={"color": "grey"}))
    return geometry_traces

In [None]:
def get_shapes_for_frame(frame_data, min_speed, max_speed, midpoint):
    def create_shape(row):
        if row["speed"] == -1000:  # Check for dummy speed
            return go.layout.Shape(
                type="circle",
                xref="x",
                yref="y",
                x0=row['x'] - row['radius'],
                y0=row['y'] - row['radius'],
                x1=row['x'] + row['radius'],
                y1=row['y'] + row['radius'],
                line=dict(width=0),
                fillcolor="rgba(255,255,255,0)"  # Transparent fill
            )
        color = speed_to_color(row["speed"], min_speed, max_speed, midpoint)
        return go.layout.Shape(
            type="circle",
            xref="x",
            yref="y",
            x0=row['x'] - row['radius'],
            y0=row['y'] - row['radius'],
            x1=row['x'] + row['radius'],
            y1=row['y'] + row['radius'],
            line_color=color,
            fillcolor=color
        )

    shapes = frame_data.apply(create_shape, axis=1).tolist()
    return shapes

In [None]:
def create_fig(geometry_traces, frames, steps, width=800, height=800):
        
    fig = go.Figure(data=geometry_traces, frames=frames)
    fig.data = []
    fig = go.Figure(data=geometry_traces, frames=frames)
    fig.update_layout(
        updatemenus=[{
            'buttons': [{
                'args': [None, {'frame': {'duration': 100, 'redraw': True}, 'fromcurrent': True}],
                'label': 'Play',
                'method': 'animate'
            }],
            'direction': 'left',
            'pad': {'r': 10, 't': 87},
            'showactive': False,
            'type': 'buttons',
            'x': 0.1,
            'xanchor': 'right',
            'y': 0,
            'yanchor': 'top'
        }],
        sliders=[{
            'active': 0,
            'yanchor': 'top',
            'xanchor': 'left',
            'currentvalue': {
                'font': {'size': 20},
                'prefix': 'Frame:',
                'visible': True,
                'xanchor': 'right'
            },
            'transition': {'duration': 100, 'easing': 'cubic-in-out'},
            'pad': {'b': 10, 't': 50},
            'len': 0.9,
            'x': 0.1,
            'y': 0,
            'steps': steps
            }],
        autosize=False,
        width=800,          
        height=800,   
        yaxis=dict(
        scaleanchor="x",
        scaleratio=1
        )    
    )
    return fig

In [None]:
def animate(data_df, area):
    min_speed = data_df["speed"].min()
    max_speed = data_df["speed"].max()
    midpoint = np.mean(data_df["speed"])
    max_agents = data_df.groupby('frame').size().max()
    dummy_agent_data = {
        'x': 0, 
        'y': 0, 
        'radius': 0, 
        'speed': -1000  
    }
    frames = []
    steps = []
    unique_frames = data_df['frame'].unique()
    selected_frames = unique_frames[::50]
    for frame_num in selected_frames:
        frame_data = data_df[data_df['frame'] == frame_num]
        while len(frame_data) < max_agents:
            dummy_df = pd.DataFrame([dummy_agent_data])
            frame_data = pd.concat([frame_data, dummy_df], ignore_index=True)
       
        shapes = get_shapes_for_frame(frame_data, min_speed, max_speed, midpoint)
        frame = go.Frame(
            name=str(frame_num),
            layout=go.Layout(shapes=shapes)
        )
        frames.append(frame)
        step = {
            'args': [
                [str(frame_num)],
                {
                    'frame': {'duration': 100, 'redraw': True},
                    'mode': 'immediate',
                    'transition': {'duration': 500}
                }
            ],
            'label': str(frame_num),
            'method': 'animate'
        }   
        steps.append(step)

    geometry_traces = get_geometry_traces(area)
    return create_fig(geometry_traces, frames, steps, width=800, height=800)

In [None]:
animate(speed, area)