# Synthetic Motion Data Generator for Digital Twin Camera Project

This notebook provides an interactive interface to generate synthetic JSON data representing people (actors) moving within a defined 2D environment. The output format matches the specified storage protocol for motion activity information.

**Features:**
- Configure environment dimensions and obstacles.
- Set simulation parameters like framerate, duration, and sampling rate.
- Define the number of actors and their properties.
- Simulates basic movement with boundary and obstacle avoidance.
- Outputs data in the required JSON format.

**Instructions:**
1. Ensure `ipywidgets` and `numpy` are installed (`pip install ipywidgets numpy`).
2. Run all cells.
3. Adjust parameters using the interactive widgets below.
4. Click the "Generate Data" button.
5. The generated JSON data will be displayed in the output area.

In [6]:
# %% Imports
import ipywidgets as widgets
from IPython.display import display, clear_output, Javascript
import json
import random
import numpy as np
import datetime
import time
import math
import uuid # Using UUIDs for more robust actor IDs, though the example uses integers

In [2]:
# %% Helper Functions

def is_within_bounds(x, y, env_dims):
    """Check if coordinates are inside the environment."""
    width, height = env_dims
    return 0 <= x < width and 0 <= y < height

def check_obstacle_collision(actor_x, actor_y, actor_w, actor_h, obstacles):
    """Check if the actor's bounding box collides with any obstacle."""
    actor_rect = (actor_x, actor_y, actor_w, actor_h)
    for obs_w, obs_h, obs_x, obs_y in obstacles:
        obs_rect = (obs_x, obs_y, obs_w, obs_h)
        # Simple AABB collision check
        if (actor_rect[0] < obs_rect[0] + obs_rect[2] and
            actor_rect[0] + actor_rect[2] > obs_rect[0] and
            actor_rect[1] < obs_rect[1] + obs_rect[3] and
            actor_rect[1] + actor_rect[3] > obs_rect[1]):
            return True # Collision detected
    return False # No collision

def generate_random_point_near_edge(env_dims, margin=5):
    """Generates a random (x, y) coordinate near the edge of the environment."""
    width, height = env_dims
    edge = random.choice(['top', 'bottom', 'left', 'right'])
    if edge == 'top':
        x = random.uniform(0, width)
        y = random.uniform(0, margin)
    elif edge == 'bottom':
        x = random.uniform(0, width)
        y = random.uniform(height - margin, height)
    elif edge == 'left':
        x = random.uniform(0, margin)
        y = random.uniform(0, height)
    else: # right
        x = random.uniform(width - margin, width)
        y = random.uniform(0, height)
    # Ensure point is strictly within bounds if margin is 0
    x = max(0, min(x, width - 1))
    y = max(0, min(y, height - 1))
    return int(x), int(y)

def generate_random_point_anywhere(env_dims):
    """Generates a random (x, y) coordinate anywhere in the environment."""
    width, height = env_dims
    x = random.uniform(0, width)
    y = random.uniform(0, height)
    return int(x), int(y)

In [3]:
# %% Simulation Core Logic

class ActorSimulator:
    """Simulates a single actor's movement."""
    def __init__(self, actor_id, descriptor, actor_dims, env_dims, obstacles, start_frame, max_frames, framerate):
        self.id = actor_id
        self.descriptor = descriptor
        self.width, self.height = actor_dims
        self.env_width, self.env_height = env_dims
        self.obstacles = obstacles
        self.framerate = framerate

        # Movement parameters
        self.max_speed = random.uniform(10, 50) / self.framerate # units per frame (scaled by framerate)
        self.target_change_interval = random.uniform(2, 5) * self.framerate # Change target every few seconds
        self.frames_since_target_change = 0

        # State
        self.active = False
        self.start_frame = start_frame
        self.current_frame_index = 0 # Relative to simulation start
        self.max_frames = max_frames
        self.x, self.y = -1, -1 # Start off-screen/inactive
        self.vx, self.vy = 0, 0 # Velocity
        self.target_x, self.target_y = -1, -1
        self.entry_point = None # Actual first valid (x, y)
        self.exit_point = None # Actual last valid (x, y) before exit

        # Output data
        self.frames_data = [] # List of (width, height, x, y) tuples for recorded frames
        self.postures_data = [] # List of posture labels

        # Choose initial entry and target exit strategy
        self.entry_strategy() # Sets initial position and target
        self.exit_strategy()  # Sets final target


    def entry_strategy(self):
        # Start near an edge and set initial target towards the center or another edge
        self.x, self.y = generate_random_point_near_edge((self.env_width, self.env_height))
        self.entry_point = [int(self.x), int(self.y)] # Store planned entry coords per example format
        # Ensure initial position is not inside an obstacle
        while check_obstacle_collision(self.x, self.y, self.width, self.height, self.obstacles):
             self.x, self.y = generate_random_point_near_edge((self.env_width, self.env_height))
             self.entry_point = [int(self.x), int(self.y)]

        # Set initial target (e.g., towards center or opposite edge)
        center_x, center_y = self.env_width / 2, self.env_height / 2
        if random.random() < 0.5:
             # Target somewhere near the center
             self.target_x = random.uniform(center_x - self.env_width/4, center_x + self.env_width/4)
             self.target_y = random.uniform(center_y - self.env_height/4, center_y + self.env_height/4)
        else:
             # Target near a different edge
             self.target_x, self.target_y = generate_random_point_near_edge((self.env_width, self.env_height))

        self.update_velocity()


    def exit_strategy(self):
         # The 'exit' in the example format seems to be a coordinate, perhaps the intended goal?
         # Let's set a random target near an edge as the 'intended' exit.
         # Actual exit happens when actor leaves bounds or simulation ends.
         self.final_target_x, self.final_target_y = generate_random_point_near_edge((self.env_width, self.env_height))
         # Use this final target when nearing the end of simulation? For now, just store it.
         # Store planned exit coords per example format
         self.exit_point = [int(self.final_target_x), int(self.final_target_y)]


    def update_velocity(self):
        """Update velocity towards the current target."""
        if self.target_x == -1: # No target set yet
            return
        dx = self.target_x - self.x
        dy = self.target_y - self.y
        dist = math.sqrt(dx**2 + dy**2)

        if dist < 10: # Reached close enough to target
            # Set a new random target within bounds
            self.target_x, self.target_y = generate_random_point_anywhere((self.env_width, self.env_height))
            # Re-calculate dx, dy, dist for the new target
            dx = self.target_x - self.x
            dy = self.target_y - self.y
            dist = math.sqrt(dx**2 + dy**2)
            self.frames_since_target_change = 0 # Reset counter

        if dist > 0:
            # Normalize and scale by max speed
            self.vx = (dx / dist) * self.max_speed
            self.vy = (dy / dist) * self.max_speed
            # Add some randomness/jitter to the movement
            self.vx += random.uniform(-0.1, 0.1) * self.max_speed
            self.vy += random.uniform(-0.1, 0.1) * self.max_speed
            # Clamp speed
            current_speed = math.sqrt(self.vx**2 + self.vy**2)
            if current_speed > self.max_speed:
                scale = self.max_speed / current_speed
                self.vx *= scale
                self.vy *= scale

        self.frames_since_target_change += 1
        if self.frames_since_target_change >= self.target_change_interval:
             # Time to pick a new intermediate target randomly
             self.target_x, self.target_y = generate_random_point_anywhere((self.env_width, self.env_height))
             self.frames_since_target_change = 0 # Reset counter


    def step(self, current_frame_index):
        """Simulate one time step (frame)."""
        self.current_frame_index = current_frame_index

        if not self.active and self.current_frame_index >= self.start_frame:
            self.active = True # Actor becomes active now
            # Initial position was set in entry_strategy

        if not self.active:
            return # Actor not yet in simulation or already exited

        # Update velocity towards target
        self.update_velocity()

        # Calculate potential next position
        next_x = self.x + self.vx
        next_y = self.y + self.vy

        # --- Collision Detection and Response ---
        collision_obstacle = check_obstacle_collision(next_x, next_y, self.width, self.height, self.obstacles)
        collision_boundary_x = not (0 <= next_x <= self.env_width - self.width)
        collision_boundary_y = not (0 <= next_y <= self.env_height - self.height)

        if collision_obstacle or collision_boundary_x or collision_boundary_y:
            # Simple collision response: Stop or pick a new random direction away from obstacle/boundary
            # For simplicity, let's just pick a new random target and recalculate velocity instantly
            # A better approach would involve reflection or sliding
            self.target_x, self.target_y = generate_random_point_anywhere((self.env_width, self.env_height))
            self.update_velocity() # Update velocity based on new target

            # Try moving with the new velocity for this frame
            next_x = self.x + self.vx
            next_y = self.y + self.vy

            # Final check after adjusting - if still colliding, just stop for this frame
            if check_obstacle_collision(next_x, next_y, self.width, self.height, self.obstacles) or \
               not (0 <= next_x <= self.env_width - self.width) or \
               not (0 <= next_y <= self.env_height - self.height):
                next_x, next_y = self.x, self.y # Stay in the current position
                self.vx, self.vy = 0, 0 # Stop moving

        # Update position
        self.x = next_x
        self.y = next_y

        # Check if actor has moved outside bounds (exited)
        if not is_within_bounds(self.x + self.width / 2, self.y + self.height / 2, (self.env_width, self.env_height)):
           self.active = False
           # self.exit_point = [int(self.x), int(self.y)] # Record actual exit coords (optional)
           return # Stop simulation for this actor

        # Record frame data if active and within bounds
        if self.active:
             frame_data = (int(self.width), int(self.height), int(self.x), int(self.y))
             self.frames_data.append(frame_data)
             self.postures_data.append(0) # Placeholder for posture

    def get_data(self):
        """Return actor data in the required format."""
        if not self.frames_data: # Actor never became active or exited immediately
             return None

        return {
            'descriptor': self.descriptor,
            'entry': self.entry_point, # Using the planned entry point as per example
            'exit': self.exit_point,   # Using the planned exit point as per example
            'frames': self.frames_data,
            'postures': self.postures_data
        }


def generate_synthetic_data(env_dims, num_obstacles, max_obstacle_size, num_actors, actor_min_max_size, duration_seconds, framerate, sampling_rate):
    """Generates the full synthetic dataset."""

    start_time = datetime.datetime.now(datetime.timezone.utc)
    end_time = start_time + datetime.timedelta(seconds=duration_seconds)
    total_frames = int(duration_seconds * framerate)
    sampling_interval = framerate // sampling_rate # How many frames to skip between samples

    # Generate Environment
    env_width, env_height = env_dims
    obstacles = []
    for _ in range(num_obstacles):
        obs_w = random.uniform(max_obstacle_size[0] * 0.2, max_obstacle_size[0])
        obs_h = random.uniform(max_obstacle_size[1] * 0.2, max_obstacle_size[1])
        # Ensure obstacle placement allows some space around edges maybe?
        margin = 10
        obs_x = random.uniform(margin, env_width - obs_w - margin)
        obs_y = random.uniform(margin, env_height - obs_h - margin)
        obstacles.append((int(obs_w), int(obs_h), int(obs_x), int(obs_y)))

    environment_data = {
        'dims': env_dims,
        'obs': obstacles,
        'framerate': framerate,
        'sampling': sampling_rate,
        'start_time': start_time.isoformat(),
        'end_time': end_time.isoformat(),
        'chunk_time': 600 # Example value, chunking logic not implemented here
    }

    # Generate and Simulate Actors
    actors = []
    for i in range(num_actors):
        actor_w = random.uniform(actor_min_max_size[0], actor_min_max_size[1])
        actor_h = actor_w * random.uniform(1.5, 2.5) # Assume people are taller than wide
        actor_dims = (int(actor_w), int(actor_h))
        # Randomly assign descriptors (example)
        descriptors = random.sample(['person', 'male', 'female', 'staff', 'visitor', 'red_shirt', 'blue_jeans'], k=random.randint(1,3))
        # Stagger actor entry times
        start_frame = random.randint(0, total_frames // 2) # Enter within the first half

        actor_sim = ActorSimulator(
            actor_id=str(uuid.uuid4()), # Using UUID for uniqueness
            # actor_id=i, # Or use simple integer IDs like the example
            descriptor=descriptors,
            actor_dims=actor_dims,
            env_dims=env_dims,
            obstacles=obstacles,
            start_frame=start_frame,
            max_frames=total_frames,
            framerate=framerate
        )
        actors.append(actor_sim)

    # Run simulation frame by frame
    all_frame_data = {actor.id: {'frames': [], 'postures': []} for actor in actors}

    for frame_idx in range(total_frames):
        for actor in actors:
            actor.step(frame_idx) # Simulate one step

        # --- Sampling ---
        # Store data only at sampling intervals
        # Note: The actor.step() method already stores data internally on every step it's active.
        # We need to collect the *final* data from actors after the simulation.


    # Collect data from actors post-simulation
    actor_output_data = {}
    actor_id_map = {} # For the Vector to ID map (if needed later)
    final_actor_index = 0 # Use integer index for JSON keys as per example

    for actor in actors:
        actor_data = actor.get_data()
        if actor_data and len(actor_data['frames']) > 0: # Only include actors that actually appeared
            
             # --- Apply Sampling ---
             sampled_frames = []
             sampled_postures = []
             # Actor's internal frame data corresponds to *every* frame they were active at the simulation's framerate.
             # We need to select frames based on the sampling rate.
             # The actor starts at actor.start_frame. Their internal frame list index 0 corresponds to simulation frame actor.start_frame.
             
             # Calculate the simulation frame number for each recorded frame by the actor
             actor_simulation_frames = list(range(actor.start_frame, actor.start_frame + len(actor_data['frames'])))

             for i, sim_frame_num in enumerate(actor_simulation_frames):
                 if sim_frame_num % sampling_interval == 0:
                     sampled_frames.append(actor_data['frames'][i])
                     sampled_postures.append(actor_data['postures'][i])
             
             # Only add actor if they have data after sampling
             if sampled_frames:
                 actor_data['frames'] = sampled_frames
                 actor_data['postures'] = sampled_postures
                 
                 # Use integer index as key in the final JSON, but keep track of original ID
                 actor_output_data[final_actor_index] = actor_data
                 actor_id_map[str(actor.id)] = final_actor_index # Map original UUID to integer index
                 final_actor_index += 1


    # Final JSON structure
    output_json = {
        'environment': environment_data,
        'actor': actor_output_data
    }

    return output_json, actor_id_map

In [None]:
# %% Interactive Widgets Setup

style = {'description_width': 'initial'} # Ensure labels aren't cut off

# Environment Widgets
env_width_widget = widgets.IntSlider(value=1920, min=100, max=4000, step=10, description='Env Width (px):', style=style)
env_height_widget = widgets.IntSlider(value=1080, min=100, max=4000, step=10, description='Env Height (px):', style=style)
num_obstacles_widget = widgets.IntSlider(value=5, min=0, max=50, step=1, description='Num Obstacles:', style=style)
max_obstacle_width_widget = widgets.IntSlider(value=200, min=10, max=1000, step=5, description='Max Obstacle Width:', style=style)
max_obstacle_height_widget = widgets.IntSlider(value=200, min=10, max=1000, step=5, description='Max Obstacle Height:', style=style)

# Simulation Widgets
duration_widget = widgets.IntSlider(value=60, min=5, max=1200, step=5, description='Duration (s):', style=style)
framerate_widget = widgets.IntSlider(value=30, min=1, max=120, step=1, description='Framerate (fps):', style=style)
sampling_rate_widget = widgets.IntSlider(value=10, min=1, max=120, step=1, description='Sampling Rate (Hz):', style=style)

# Actor Widgets
num_actors_widget = widgets.IntSlider(value=10, min=1, max=100, step=1, description='Num Actors:', style=style)
actor_min_width_widget = widgets.IntSlider(value=20, min=5, max=100, step=1, description='Actor Min Width:', style=style)
actor_max_width_widget = widgets.IntSlider(value=40, min=5, max=150, step=1, description='Actor Max Width:', style=style)


# Button and Output
generate_button = widgets.Button(description="Generate Data", button_style='success')
copy_button = widgets.Button(description="Copy JSON", button_style='info', tooltip='Copy JSON to clipboard')
button_row = widgets.HBox([generate_button, copy_button])
output_area = widgets.Output() # To display logs and results
json_output_widget = widgets.Textarea(
    value='',
    placeholder='Generated JSON will appear here...',
    description='Output JSON:',
    layout={'height': '400px', 'width': '95%'},
    disabled=False,
    style=style
)


# Function to run on button click
def on_generate_button_clicked(b):
    with output_area:
        clear_output(wait=True) # Clear previous logs
        print("Starting data generation...")

        # Validate inputs (e.g., sampling rate <= framerate)
        if sampling_rate_widget.value > framerate_widget.value:
             print("Error: Sampling Rate cannot be greater than Framerate.")
             json_output_widget.value = "Error: Sampling Rate cannot be greater than Framerate."
             return
        if framerate_widget.value % sampling_rate_widget.value != 0:
             print(f"Warning: Framerate ({framerate_widget.value}) is not perfectly divisible by Sampling Rate ({sampling_rate_widget.value}). Effective sampling interval might vary slightly.")
             # Adjust sampling interval calculation if needed, or just proceed with integer division.

        if actor_min_width_widget.value > actor_max_width_widget.value:
             print("Error: Actor Min Width cannot be greater than Actor Max Width.")
             json_output_widget.value = "Error: Actor Min Width cannot be greater than Actor Max Width."
             return


        env_dims = (env_width_widget.value, env_height_widget.value)
        max_obstacle_size = (max_obstacle_width_widget.value, max_obstacle_height_widget.value)
        actor_min_max_size = (actor_min_width_widget.value, actor_max_width_widget.value)

        try:
            generated_data, actor_id_map = generate_synthetic_data(
                env_dims=env_dims,
                num_obstacles=num_obstacles_widget.value,
                max_obstacle_size=max_obstacle_size,
                num_actors=num_actors_widget.value,
                actor_min_max_size=actor_min_max_size,
                duration_seconds=duration_widget.value,
                framerate=framerate_widget.value,
                sampling_rate=sampling_rate_widget.value
            )

            # Display formatted JSON
            json_string = json.dumps(generated_data, indent=4)
            json_output_widget.value = json_string
            print(f"Data generation complete. {len(generated_data['actor'])} actors included in output.")
            # print("Actor UUID to JSON Index Mapping:", actor_id_map) # Optional: show the mapping

        except Exception as e:
            print(f"An error occurred during generation: {e}")
            import traceback
            traceback.print_exc()
            json_output_widget.value = f"Error during generation: {e}"

def on_copy_button_clicked(b):
    with output_area:
        clear_output(wait=True)
        if json_output_widget.value:
            # More reliable JavaScript to copy the content
            js_code = """
            (async function() {
                try {
                    // Try to use the modern Clipboard API first
                    if (navigator.clipboard && navigator.clipboard.writeText) {
                        const jsonText = document.querySelector(".jp-OutputArea-output textarea").value;
                        await navigator.clipboard.writeText(jsonText);
                        console.log("Copied using Clipboard API");
                        return;
                    }
                    
                    // Fallback to the older method
                    const jsonTextarea = document.querySelector(".jp-OutputArea-output textarea");
                    if (!jsonTextarea) {
                        throw new Error("Could not find the JSON textarea");
                    }
                    
                    const textArea = document.createElement('textarea');
                    textArea.value = jsonTextarea.value;
                    textArea.style.position = 'fixed';
                    textArea.style.left = '0';
                    textArea.style.top = '0';
                    textArea.style.opacity = '0';
                    document.body.appendChild(textArea);
                    textArea.focus();
                    textArea.select();
                    
                    const successful = document.execCommand('copy');
                    document.body.removeChild(textArea);
                    
                    if (!successful) {
                        throw new Error("Copy command was unsuccessful");
                    }
                    console.log("Copied using execCommand");
                } catch (err) {
                    console.error("Failed to copy text: ", err);
                    // Provide an alternative method for the user
                    alert("Copy failed. Please select all text in the JSON output box (Ctrl+A) and copy manually (Ctrl+C)");
                }
            })();
            """
            display(Javascript(js_code))
            print("JSON copied to clipboard!")
        else:
            print("No JSON data to copy.")

# Link copy button click to function
copy_button.on_click(on_copy_button_clicked)

# Link button click to function
generate_button.on_click(on_generate_button_clicked)

# %% Display Interface

# Organize widgets for better layout
env_box = widgets.VBox([env_width_widget, env_height_widget, num_obstacles_widget, max_obstacle_width_widget, max_obstacle_height_widget])
sim_box = widgets.VBox([duration_widget, framerate_widget, sampling_rate_widget])
actor_box = widgets.VBox([num_actors_widget, actor_min_width_widget, actor_max_width_widget])

# Use Accordion for organization
accordion = widgets.Accordion(children=[env_box, sim_box, actor_box])
accordion.set_title(0, 'Environment Configuration')
accordion.set_title(1, 'Simulation Parameters')
accordion.set_title(2, 'Actor Configuration')

# Display all parts
display(accordion)
display(button_row)
display(output_area) # For logs
display(json_output_widget) # For the final JSON

Accordion(children=(VBox(children=(IntSlider(value=1920, description='Env Width (px):', max=4000, min=100, ste…

HBox(children=(Button(button_style='success', description='Generate Data', style=ButtonStyle()), Button(button…

Output()

Textarea(value='', description='Output JSON:', layout=Layout(height='400px', width='95%'), placeholder='Genera…