# 🤖 Robot Odometry & Event Analysis

Clean, minimal analysis of robot trajectory and experiment feedback events.

### Actual Data Sources
- **Odometry** (`/robomaster/odom`): position (x, y), orientation quaternion (converted to yaw), linear velocities (vx, vy), angular velocity (wz). Yaw (deg) and planar speed are derived here.
- **Experiment Events** (`/experiment/event`): JSON messages (via `std_msgs/String`) containing `event_type`, `action`, `details`, etc.
- **Fallback Events** (if topic missing): Matching `.jsonl` file in `experiment_logs/` whose filename corresponds to the bag directory name.

### Derived Columns & Structures
- `time_rel`: seconds since first odometry sample
- `yaw_deg`: yaw in degrees derived from odometry quaternion
- `speed`: √(vx² + vy²)
- `event_positions`: each relevant event mapped to nearest odometry pose (≤1 s difference)

### Outputs
- Static trajectory plots (with and without event overlays)
- Speed and heading statistics
- Filtered interactive trajectory explorer (dual-handle time slider) highlighting ONLY good / bad feedback events

### Feedback Event Filtering Logic (Interactive Plot)
Shown markers include only:
- `positive_feedback`
- `feedback_blink` with details containing *good* or *bad*
- Any action whose name contains *negative* or *bad*

### Next Extensions (optional)
- Smarter downsampling (RDP/event-aware)
- Export selected window stats

---
Run the cells below after setting `BAG_FILE` to your experiment bag directory.

In [1]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from datetime import datetime

# ROS2 bag reading with rosbags library (pure Python, no ROS required)
try:
    from rosbags.rosbag2 import Reader
    from rosbags.serde import deserialize_cdr
    print("rosbags library loaded (pure Python)")
except ImportError:
    print("rosbags library not found!")
    print("Install with: pip install rosbags")

# Configure plotting
plt.rcParams['figure.figsize'] = [15, 10]
plt.rcParams['font.size'] = 11
plt.style.use('default')

print("Ready for odometry analysis!")

rosbags library loaded (pure Python)
Ready for odometry analysis!


In [2]:
# Configuration - Update your bag file path
BAG_FILE = "experiment_bags/exp_pregassona_1_2025-09-13T07-16-09-907Z"
# BAG_FILE = "experiment_bags/exp_pregassona_2_2025-09-13T07-36-14-321Z"

# Verify bag exists
if os.path.exists(BAG_FILE):
    print(f"Bag found: {BAG_FILE}")
else:
    print(f"Bag not found: {BAG_FILE}")
    print("Update BAG_FILE path above")

print("\nExpected topics:")
print("   • /robomaster/odom (odometry data)")
print("   • /robomaster/imu (orientation)")

Bag found: experiment_bags/exp_pregassona_1_2025-09-13T07-16-09-907Z

Expected topics:
   • /robomaster/odom (odometry data)
   • /robomaster/imu (orientation)


In [3]:
# Read odometry data from bag
def read_odometry_data(bag_path):
    """Extract odometry data from ROS2 bag using rosbags library"""
    if not os.path.exists(bag_path):
        return []
    
    odom_data = []
    
    try:
        with Reader(bag_path) as reader:
            # Filter for odometry topic
            connections = [x for x in reader.connections if x.topic == '/robomaster/odom']
            
            if not connections:
                print("No /robomaster/odom topic found in bag")
                return []
            
            for connection, timestamp, rawdata in reader.messages(connections=connections):
                # Deserialize the message
                odom_msg = deserialize_cdr(rawdata, connection.msgtype)
                
                # Extract position
                x = odom_msg.pose.pose.position.x
                y = odom_msg.pose.pose.position.y
                
                # Extract orientation (quaternion to yaw)
                qx = odom_msg.pose.pose.orientation.x
                qy = odom_msg.pose.pose.orientation.y
                qz = odom_msg.pose.pose.orientation.z
                qw = odom_msg.pose.pose.orientation.w
                yaw = np.arctan2(2.0 * (qw * qz + qx * qy), 1.0 - 2.0 * (qy * qy + qz * qz))
                
                # Extract velocities
                vx = odom_msg.twist.twist.linear.x
                vy = odom_msg.twist.twist.linear.y
                wz = odom_msg.twist.twist.angular.z
                
                odom_data.append({
                    'timestamp': timestamp / 1e9,  # Convert nanoseconds to seconds
                    'x': x,
                    'y': y,
                    'yaw': yaw,
                    'yaw_deg': np.degrees(yaw),
                    'vx': vx,
                    'vy': vy,
                    'wz': wz,
                    'speed': np.sqrt(vx*vx + vy*vy)
                })
                
    except Exception as e:
        print(f"Error reading odometry: {e}")
        return []
    
    print(f"Read {len(odom_data)} odometry points")
    return odom_data

print("Odometry reader ready")

Odometry reader ready


In [4]:
# Read event data from bag
def read_event_data(bag_path):
    """Extract event data from ROS2 bag using rosbags library"""
    if not os.path.exists(bag_path):
        return []
    
    event_data = []
    
    try:
        with Reader(bag_path) as reader:
            # Filter for experiment event topic
            connections = [x for x in reader.connections if x.topic == '/experiment/event']
            
            if not connections:
                print("No /experiment/event topic found in bag")
                return []
            
            for connection, timestamp, rawdata in reader.messages(connections=connections):
                try:
                    # Deserialize the message
                    event_msg = deserialize_cdr(rawdata, connection.msgtype)
                    
                    # Parse event message (assuming it's a JSON string in std_msgs/String)
                    if hasattr(event_msg, 'data'):
                        import json
                        try:
                            event_json = json.loads(event_msg.data)
                            event_data.append({
                                'timestamp': timestamp / 1e9,  # Convert nanoseconds to seconds
                                'event_type': event_json.get('event_type', 'unknown'),
                                'action': event_json.get('action', 'unknown'),
                                'details': event_json.get('details', {}),
                                'session_id': event_json.get('session_id', '')
                            })
                        except json.JSONDecodeError:
                            # If not JSON, treat as plain string
                            event_data.append({
                                'timestamp': timestamp / 1e9,
                                'event_type': 'raw_message',
                                'action': 'message',
                                'details': {'message': event_msg.data},
                                'session_id': ''
                            })
                            
                except Exception as e:
                    print(f"Error parsing event message: {e}")
                    continue
                
    except Exception as e:
        print(f"Error reading events: {e}")
        return []
    
    print(f"Read {len(event_data)} event messages")
    return event_data

# Alternative: Read from JSONL file if bag reading fails
def read_event_data_from_jsonl(bag_path):
    """Fallback: read events from JSONL file"""
    import json
    from pathlib import Path
    
    # Try to find matching JSONL file
    bag_name = Path(bag_path).name
    jsonl_path = Path(bag_path).parent.parent / "experiment_logs" / f"{bag_name}.jsonl"
    
    if not jsonl_path.exists():
        print(f"No JSONL file found at: {jsonl_path}")
        return []
    
    event_data = []
    try:
        with open(jsonl_path, 'r') as f:
            for line in f:
                if line.strip():
                    event_json = json.loads(line.strip())
                    # Convert ISO timestamp to Unix timestamp
                    from datetime import datetime
                    iso_time = event_json.get('timestamp', '')
                    unix_time = datetime.fromisoformat(iso_time.replace('Z', '+00:00')).timestamp()
                    
                    event_data.append({
                        'timestamp': unix_time,
                        'event_type': event_json.get('event_type', 'unknown'),
                        'action': event_json.get('action', 'unknown'),
                        'details': event_json.get('details', {}),
                        'session_id': event_json.get('session_id', '')
                    })
    except Exception as e:
        print(f"Error reading JSONL: {e}")
        return []
        
    print(f"Read {len(event_data)} events from JSONL file")
    return event_data

print("Event data readers ready")

Event data readers ready


In [5]:
import numpy as np

def normalize_odometry(df, method="start", rotate_to_initial_heading=True):
    """
    Returns a *new* DataFrame with x_norm, y_norm columns.
    method: "start" -> subtract first pose; "mean" -> subtract centroid.
    rotate_to_initial_heading: if True, rotate so initial yaw becomes 0.
    """
    d = df.copy()

    if method == "start":
        x0, y0 = float(d['x'].iloc[0]), float(d['y'].iloc[0])
    elif method == "mean":
        x0, y0 = float(d['x'].mean()), float(d['y'].mean())
    else:
        raise ValueError("method must be 'start' or 'mean'")

    # shift
    d['x'] = d['x'] - x0
    d['y'] = d['y'] - y0

    # optional rotate so the initial yaw aligns with +X
    # if rotate_to_initial_heading and len(d) > 0:
    #     th0 = float(d['yaw'].iloc[0])  # radians
    #     c, s = np.cos(-th0), np.sin(-th0)
    #     xn = c * d['x'] - s * d['y']
    #     yn = s * d['x'] + c * d['y']
    #     d['x'], d['y'] = xn, yn

    return d

In [6]:
# import numpy as np

# def normalize_odometry(df, method="start", rotate_to_initial_heading=True):
#     """
#     Returns a *new* DataFrame with x_norm, y_norm columns.
#     method: "start" -> subtract first pose; "mean" -> subtract centroid.
#     rotate_to_initial_heading: if True, rotate so initial yaw becomes 0.
#     """
#     d = df.copy()

#     if method == "start":
#         x0, y0 = float(d['x'].iloc[0]), float(d['y'].iloc[0])
#     elif method == "mean":
#         x0, y0 = float(d['x'].mean()), float(d['y'].mean())
#     else:
#         raise ValueError("method must be 'start' or 'mean'")

#     # shift
#     d['x_norm'] = d['x'] - x0
#     d['y_norm'] = d['y'] - y0

#     # optional rotate so the initial yaw aligns with +X
#     if rotate_to_initial_heading and len(d) > 0:
#         th0 = float(d['yaw'].iloc[0])  # radians
#         c, s = np.cos(-th0), np.sin(-th0)
#         xn = c * d['x_norm'] - s * d['y_norm']
#         yn = s * d['x_norm'] + c * d['y_norm']
#         d['x_norm'], d['y_norm'] = xn, yn

#     return d

In [7]:
# Load and analyze odometry data
print("Loading odometry data...")

odom_data = read_odometry_data(BAG_FILE)

if not odom_data:
    print("No odometry data found!")
    print("Make sure /robomaster/odom is in your bag file")
else:
    # Convert to DataFrame
    df = pd.DataFrame(odom_data)
    df["time"] = pd.to_datetime(df["timestamp"], unit="s")
    df["time_rel"] = df["timestamp"] - df["timestamp"].iloc[0]

    # # Normalize: start at (0,0) and align heading
    # df = normalize_odometry(df, method="start", rotate_to_initial_heading=True)

    # Calculate trajectory metrics
    duration = df["time_rel"].iloc[-1]

    # Calculate distance traveled
    distances = []
    for i in range(1, len(df)):
        dx = df["x"].iloc[i] - df["x"].iloc[i - 1]
        dy = df["y"].iloc[i] - df["y"].iloc[i - 1]
        distances.append(np.sqrt(dx * dx + dy * dy))
    total_distance = sum(distances)

    # Summary statistics
    print(f"\nTRAJECTORY SUMMARY:")
    print(f"   Duration: {duration:.1f} seconds")
    print(f"   Data points: {len(df)}")
    print(f"   Total distance: {total_distance:.2f} meters")
    print(f"   X range: {df['x'].min(): .2f} to {df['x'].max(): .2f} m")
    print(f"   Y range: {df['y'].min(): .2f} to {df['y'].max(): .2f} m")
    print(
        f"   Heading range: {df['yaw_deg'].min(): .0f}° to {df['yaw_deg'].max(): .0f}°"
    )
    print(f"   Max speed: {df['speed'].max(): .2f} m/s")
    print(f"   Avg speed: {df['speed'].mean(): .2f} m/s")

    print(f"Speed: {df['speed'].tolist()}")

    print(f"\nFirst few data points:")
    print(df[["time_rel", "x", "y", "yaw_deg", "speed"]].head())

Loading odometry data...


explicit typestores.

If you are deserializing messages from an AnyReader instance, simply
use its `.deserialize(data, typename)` method.

Otherwise instantiate a type store and use its methods:

from rosbags.typesys import Stores, get_typestore

typestore = get_typestore(Stores.ROS2_FOXY)
typestore.deserialize_cdr(data, typename)
  odom_msg = deserialize_cdr(rawdata, connection.msgtype)


Read 10358 odometry points

TRAJECTORY SUMMARY:
   Duration: 1055.6 seconds
   Data points: 10358
   Total distance: 73.68 meters
   X range: -4.53 to  0.22 m
   Y range:  1.23 to  4.92 m
   Heading range: -180° to  180°
   Max speed:  0.84 m/s
   Avg speed:  0.12 m/s
Speed: [0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.0, 0.01414213562373095, 0.01414213562373095, 0.0, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01414213562373095, 0.01, 0.01414213562373095, 0.01414213562373095, 0.01, 0.01414213562373095, 0.01414213562373095, 0.01, 0.01, 0.01414213562373095, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01414213562373095, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.

In [8]:
# Load and analyze event data
print("Loading event data...")

# Try reading from ROS2 bag first, then fallback to JSONL
event_data = read_event_data(BAG_FILE)
if not event_data:
    print("No events from bag, trying JSONL file...")
    event_data = read_event_data_from_jsonl(BAG_FILE)

if not event_data:
    print("No event data found!")
    events_df = pd.DataFrame()  # Empty dataframe
else:
    # Convert to DataFrame
    events_df = pd.DataFrame(event_data)
    
    # Align timestamps with odometry (relative to start)
    if 'df' in locals() and not df.empty:
        odom_start_time = df['timestamp'].iloc[0]
        events_df['time_rel'] = events_df['timestamp'] - odom_start_time
        
        # Filter events to trajectory timespan
        trajectory_end = df['timestamp'].iloc[-1]
        events_df = events_df[
            (events_df['timestamp'] >= odom_start_time) & 
            (events_df['timestamp'] <= trajectory_end)
        ].copy()
    else:
        events_df['time_rel'] = events_df['timestamp'] - events_df['timestamp'].iloc[0]
    
    # Categorize events by type for visualization
    feedback_events = events_df[events_df['event_type'] == 'system'].copy()
    led_events = events_df[events_df['event_type'] == 'led_control'].copy()
    sound_events = events_df[events_df['event_type'] == 'sound_control'].copy()
    movement_events = events_df[events_df['event_type'] == 'movement'].copy()

    # Arm events: either arm_control or movement events with action_name '/robomaster/move_arm_pose'
    # arm events could be in 'movement' since it was missclassified by the ros-server publisher
    arm_events = events_df[(events_df['event_type'] == 'arm_control') | 
                           ((events_df['event_type'] == 'movement') & 
                            (events_df['details'].apply(lambda d: isinstance(d, dict) and d.get('action_name') == '/robomaster/move_arm_pose')))
                          ].copy()
    
    # Identify box open (pose_type=4) and box close (pose_type=2) events
    def _pose_type(ev_details):
        if isinstance(ev_details, dict):
            return ev_details.get('goal', {}).get('pose_type')
        return None
    arm_events['pose_type'] = arm_events['details'].apply(_pose_type)
    open_box_events = arm_events[arm_events['pose_type'] == 4].copy()
    close_box_events = arm_events[arm_events['pose_type'] == 2].copy()

    print(f"   Box opening events (pose_type=4): {len(open_box_events)}")
    print(f"   Box closing events (pose_type=2): {len(close_box_events)}")

    # Summary statistics
    print(f"\nEVENT SUMMARY:")
    print(f"   Total events: {len(events_df)}")
    print(f"   System feedback: {len(feedback_events)}")
    print(f"   LED controls: {len(led_events)}")
    print(f"   Sound events: {len(sound_events)}")
    print(f"   Movement events: {len(movement_events)}")
    print(f"   Arm events: {len(arm_events)}")

    # Show interesting events
    positive_feedback = events_df[events_df['action'] == 'positive_feedback']
    if len(positive_feedback) > 0:
        print(f"   Positive feedback events: {len(positive_feedback)}")
        print(f"      Times: {positive_feedback['time_rel'].round(1).tolist()[:5]} seconds")
    
    negative_feedback = events_df[events_df['action'].str.contains('negative|bad', na=False)]
    if len(negative_feedback) > 0:
        print(f"   Negative feedback events: {len(negative_feedback)}")
    
    print(f"\n📋 Event types found:")
    print(events_df['action'].value_counts().head(10))

Loading event data...
Read 1783 event messages
   Box opening events (pose_type=4): 60
   Box closing events (pose_type=2): 58

EVENT SUMMARY:
   Total events: 1783
   System feedback: 0
   LED controls: 220
   Sound events: 0
   Movement events: 1563
   Arm events: 118

📋 Event types found:
action
joystick_move               745
joystick_start              350
joystick_end                350
set_color                   200
generic_action               59
generic_action_completed     59
feedback_blink               20
Name: count, dtype: int64


explicit typestores.

If you are deserializing messages from an AnyReader instance, simply
use its `.deserialize(data, typename)` method.

Otherwise instantiate a type store and use its methods:

from rosbags.typesys import Stores, get_typestore

typestore = get_typestore(Stores.ROS2_FOXY)
typestore.deserialize_cdr(data, typename)
  event_msg = deserialize_cdr(rawdata, connection.msgtype)


In [9]:
# Function to find robot position at event times
def get_robot_position_at_events(odom_df, events_df):
    """Find robot position when each event occurred"""
    event_positions = []
    
    for _, event in events_df.iterrows():
        event_time = event['timestamp']
        
        # Find closest odometry point
        time_diffs = np.abs(odom_df['timestamp'] - event_time)
        closest_idx = time_diffs.idxmin()
        
        if time_diffs.iloc[closest_idx] < 1.0:  # Within 1 second
            robot_pos = {
                'event_time': event['time_rel'],
                'event_type': event['event_type'],
                'action': event['action'],
                'x': odom_df.loc[closest_idx, 'x'],
                'y': odom_df.loc[closest_idx, 'y'],
                'yaw': odom_df.loc[closest_idx, 'yaw'],
                'details': event.get('details', {})
            }
            event_positions.append(robot_pos)
    
    return pd.DataFrame(event_positions)

# Get event positions
if 'df' in locals() and 'events_df' in locals() and not events_df.empty:
    event_positions = get_robot_position_at_events(df, events_df)
    print(f"Mapped {len(event_positions)} events to robot positions")
    
    if len(event_positions) > 0:
        print(f"\nEvents on trajectory:")
        for event_type in event_positions['event_type'].unique():
            count = len(event_positions[event_positions['event_type'] == event_type])
            print(f"   • {event_type}: {count} events")
else:
    event_positions = pd.DataFrame()
    print("No events to map to positions")

Mapped 1783 events to robot positions

Events on trajectory:
   • movement: 1563 events
   • led_control: 220 events


In [10]:
# Detailed Speed Analysis
def analyze_robot_speed_patterns(odom_df, events_df=None, event_positions=None):
    """Analyze robot speed patterns and correlate with events"""
    
    if odom_df.empty:
        print("No odometry data available for speed analysis")
        return
    
    # Calculate speed statistics
    speed_stats = {
        'mean_speed': odom_df['speed'].mean(),
        'max_speed': odom_df['speed'].max(),
        'min_speed': odom_df['speed'].min(),
        'std_speed': odom_df['speed'].std()
    }
    
    # Define speed thresholds
    high_speed_threshold = speed_stats['mean_speed'] + speed_stats['std_speed']
    low_speed_threshold = speed_stats['mean_speed'] - speed_stats['std_speed']
    
    # Identify speed periods
    high_speed_periods = odom_df[odom_df['speed'] > high_speed_threshold]
    low_speed_periods = odom_df[odom_df['speed'] < low_speed_threshold]
    
    print("Speed Analysis Summary:")
    print(f"   • Average speed: {speed_stats['mean_speed']:.3f} m/s")
    print(f"   • Max speed: {speed_stats['max_speed']:.3f} m/s")
    print(f"   • Min speed: {speed_stats['min_speed']:.3f} m/s")
    print(f"   • Speed std dev: {speed_stats['std_speed']:.3f} m/s")
    print(f"   • High speed periods (>{high_speed_threshold:.3f} m/s): {len(high_speed_periods)} points")
    print(f"   • Low speed periods (<{low_speed_threshold:.3f} m/s): {len(low_speed_periods)} points")
    
    # Analyze speed near events
    if event_positions is not None and not event_positions.empty:
        print(f"\nSpeed near events:")
        
        # Analyze speed around each event type
        for event_type in event_positions['event_type'].unique():
            type_events = event_positions[event_positions['event_type'] == event_type]
            speeds_near_events = []
            
            for _, event in type_events.iterrows():
                # Find robot speed within 2 seconds of event
                time_window = 2.0  # seconds
                event_time = event['event_time']
                
                nearby_data = odom_df[
                    (odom_df['time_rel'] >= event_time - time_window) & 
                    (odom_df['time_rel'] <= event_time + time_window)
                ]
                
                if not nearby_data.empty:
                    speeds_near_events.extend(nearby_data['speed'].tolist())
            
            if speeds_near_events:
                avg_speed_near = np.mean(speeds_near_events)
                print(f"   • {event_type}: avg speed {avg_speed_near:.3f} m/s (vs overall {speed_stats['mean_speed']:.3f})")
                
                # Check if significantly slower
                if avg_speed_near < speed_stats['mean_speed'] * 0.8:
                    print(f"     → Robot slows down significantly near {event_type} events!")
    
    return speed_stats, high_speed_threshold, low_speed_threshold

# Run speed analysis
if 'df' in locals() and not df.empty:
    speed_stats, high_thresh, low_thresh = analyze_robot_speed_patterns(
        df, 
        events_df if 'events_df' in locals() else None,
        event_positions if 'event_positions' in locals() else None
    )
else:
    print("No trajectory data available for speed analysis")

Speed Analysis Summary:
   • Average speed: 0.125 m/s
   • Max speed: 0.840 m/s
   • Min speed: 0.000 m/s
   • Speed std dev: 0.141 m/s
   • High speed periods (>0.265 m/s): 1577 points
   • Low speed periods (<-0.016 m/s): 0 points

Speed near events:
   • movement: avg speed 0.147 m/s (vs overall 0.125)
   • led_control: avg speed 0.087 m/s (vs overall 0.125)
     → Robot slows down significantly near led_control events!


In [11]:
# Global speed range calculation for consistent scaling across multiple trajectories
# Run this cell FIRST, then load each trajectory and run the visualization

def calculate_global_speed_range(bag_files):
    """Calculate min/max speed across multiple bag files for consistent color scaling"""
    global_speeds = []
    
    for bag_file in bag_files:
        print(f"Analyzing speeds in: {bag_file}")
        odom_data = read_odometry_data(bag_file)
        if odom_data:
            df_temp = pd.DataFrame(odom_data)
            global_speeds.extend(df_temp['speed'].tolist())
    
    if global_speeds:
        vmin_global = min(global_speeds)
        vmax_global = max(global_speeds)
        
        # Add 5% padding
        speed_range = vmax_global - vmin_global
        if speed_range < 0.001:
            vmin_global = max(0.0, vmin_global - 0.1)
            vmax_global = vmin_global + 0.2
        else:
            padding = speed_range * 0.05
            vmin_global = max(0.0, vmin_global - padding)
            vmax_global = vmax_global + padding
        
        print(f"Global speed range: {vmin_global:.3f} - {vmax_global:.3f} m/s")
        return vmin_global, vmax_global
    else:
        print("No speed data found!")
        return 0.0, 1.0

# Define your bag files
BAG_FILES = [
    "experiment_bags/exp_pregassona_1_2025-09-13T07-16-09-907Z",
    "experiment_bags/exp_pregassona_2_2025-09-13T07-36-14-321Z"
]

# Calculate global range (run this once)
GLOBAL_VMIN, GLOBAL_VMAX = calculate_global_speed_range(BAG_FILES)
print(f"Use these values for consistent scaling: vmin={GLOBAL_VMIN:.3f}, vmax={GLOBAL_VMAX:.3f}")

Analyzing speeds in: experiment_bags/exp_pregassona_1_2025-09-13T07-16-09-907Z
Read 10358 odometry points
Analyzing speeds in: experiment_bags/exp_pregassona_2_2025-09-13T07-36-14-321Z


explicit typestores.

If you are deserializing messages from an AnyReader instance, simply
use its `.deserialize(data, typename)` method.

Otherwise instantiate a type store and use its methods:

from rosbags.typesys import Stores, get_typestore

typestore = get_typestore(Stores.ROS2_FOXY)
typestore.deserialize_cdr(data, typename)
  odom_msg = deserialize_cdr(rawdata, connection.msgtype)


Read 14378 odometry points
Global speed range: 0.000 - 1.386 m/s
Use these values for consistent scaling: vmin=0.000, vmax=1.386


In [None]:
# Hardcoded timing configuration
# Set these values to automatically focus on specific time windows
# Leave as None to use full trajectory range

# HARDCODED_START_TIME = 385
# HARDCODED_END_TIME = 480

HARDCODED_START_TIME = 195
HARDCODED_END_TIME = 240

# Examples:
# HARDCODED_START_TIME = 5.0   # Start at 5 seconds
# HARDCODED_END_TIME = 15.0    # End at 15 seconds

print(f"Hardcoded timing: Start={HARDCODED_START_TIME}, End={HARDCODED_END_TIME}")
if HARDCODED_START_TIME is not None or HARDCODED_END_TIME is not None:
    print("   → Visualization will automatically focus on specified time window")
else:
    print("   → Visualization will show full trajectory range")

Hardcoded timing: Start=197, End=240
   → Visualization will automatically focus on specified time window


In [13]:
# Interactive trajectory visualization with a SIMPLE dual-handle time slider
# Shows GOOD/BAD feedback + BOX OPEN/CLOSE events with clearer symbols.
# Added precision controls: step size dropdown + manual start/end inputs.
# Slider width increased for finer control.
# Line-only speed coloring implemented via tiny segments (no markers); separate dummy colorbar trace.

try:
    import plotly.io as pio
    import plotly.graph_objects as go
    import numpy as np
    import pandas as pd
    from ipywidgets import (
        FloatRangeSlider,
        VBox,
        HTML,
        HBox,
        Button,
        Layout,
        Dropdown,
        FloatText,
        Checkbox,
    )
    from IPython.display import display
    import pytz
    import tzlocal
    from matplotlib import cm
    import matplotlib.colors as mcolors

    PLOTLY_OK = True
except ImportError as e:
    print("Required libraries missing.")
    print("Install with: pip install plotly ipywidgets pytz tzlocal kaleido matplotlib")
    PLOTLY_OK = False


def create_time_range_slider_widget(df, event_positions=None, max_points=15000, global_vmin=None, global_vmax=None, hardcoded_start=None, hardcoded_end=None):
    if df.empty:
        return HTML("<b>No data</b>")

    # Downsample
    if len(df) > max_points:
        step = int(np.ceil(len(df) / max_points))
        base_df = df.iloc[::step].copy()
        print(f"⚡ Downsampled from {len(df)} to {len(base_df)} points for interaction speed")
    else:
        base_df = df.copy()

    # Cols
    required_cols = {"time_rel", "speed", "x", "y", "yaw_deg", "timestamp"}
    missing = required_cols - set(base_df.columns)
    if missing:
        return HTML(f"<b>Missing columns:</b> {sorted(missing)}")

    time_rel = base_df["time_rel"].to_numpy()
    speed = base_df["speed"].to_numpy()
    x_vals = base_df["x"].to_numpy()
    y_vals = base_df["y"].to_numpy()
    yaw_deg_vals = base_df["yaw_deg"].to_numpy()
    timestamp_vals = base_df["timestamp"].to_numpy()

    local_tz = tzlocal.get_localzone()
    time_dt = pd.to_datetime(timestamp_vals, unit="s").tz_localize("UTC").tz_convert(local_tz)

    # Layout
    plot_width, plot_height = 1200, 800
    fig = go.FigureWidget(
        layout=dict(
            width=plot_width,
            height=plot_height,
            plot_bgcolor="white",
            paper_bgcolor="white",
            margin=dict(l=60, r=40, t=90, b=60),
            xaxis=dict(
                title=dict(text="Lateral motion (m)", font=dict(color="black", size=16)),
                tickfont=dict(size=18, color="black"),
                showgrid=True,
                gridcolor="lightgray",
                zeroline=True,
                zerolinecolor="lightgray",
                zerolinewidth=1,
            ),
            yaxis=dict(
                title=dict(text="Forward motion (m)", font=dict(color="black", size=16)),
                tickfont=dict(size=18, color="black"),
                showgrid=True,
                gridcolor="lightgray",
                zeroline=True,
                zerolinecolor="lightgray",
                zerolinewidth=1,
                scaleanchor="x",
                scaleratio=1,
            ),
            hovermode="closest",
            legend=dict(orientation="v", font=dict(size=14)),
            shapes=[
                dict(
                    type="rect", xref="paper", yref="paper",
                    x0=0, y0=0, x1=1, y1=1,
                    line=dict(color="black", width=1),
                    fillcolor="rgba(0,0,0,0)",
                )
            ],
        )
    )

    # Full path
    full_path = go.Scatter(
        # x=x_vals, y=y_vals,
        x=[], y=[],
        mode="lines",
        line=dict(color="rgba(150,150,150,0.30)", width=3),
        name="Full Path",
        hoverinfo="skip",
    )

    # Segments (line-only, colored by speed)
    if global_vmin is not None and global_vmax is not None:
        # Use provided global range for consistent scaling across multiple trajectories
        vmin, vmax = float(global_vmin), float(global_vmax)
        print(f"Using global speed range: {vmin:.3f} - {vmax:.3f} m/s")
    else:
        # Calculate range from current data
        vmin, vmax = float(speed.min()), float(speed.max())
        # Add small padding to avoid edge cases
        speed_range = vmax - vmin
        if speed_range < 0.001:  # If all speeds are nearly the same
            vmin = max(0.0, vmin - 0.1)
            vmax = vmin + 0.2
        else:
            padding = speed_range * 0.05  # 5% padding
            vmin = max(0.0, vmin - padding)
            vmax = vmax + padding
        print(f"Using local speed range: {vmin:.3f} - {vmax:.3f} m/s")
    
    cmap = cm.get_cmap("plasma")
    norm = mcolors.Normalize(vmin=vmin, vmax=vmax)

    segment_traces = []
    n = len(x_vals)
    for i in range(n - 1):
        seg_speed = float(speed[i])
        hex_color = mcolors.to_hex(cmap(norm(seg_speed)))
        segment_traces.append(
            go.Scatter(
                x=[x_vals[i], x_vals[i + 1]],
                y=[y_vals[i], y_vals[i + 1]],
                mode="lines",
                line=dict(color=hex_color, width=4),
                hovertemplate=(
                    f"t: {time_rel[i]:.2f}s → {time_rel[i+1]:.2f}s<br>"
                    f"X: {x_vals[i]:.2f} → {x_vals[i+1]:.2f} m<br>"
                    f"Y: {y_vals[i]:.2f} → {y_vals[i+1]:.2f} m<br>"
                    f"Speed@i: {seg_speed:.3f} m/s<extra></extra>"
                ),
                showlegend=False,
                name="",
            )
        )

    # Dummy colorbar
    colorbar_trace = go.Scatter(
        x=[None], y=[None], mode="markers",
        marker=dict(
            colorscale="Plasma", cmin=vmin, cmax=vmax, color=[vmin, vmax],
            colorbar=dict(
                title=dict(text="Speed (m/s)", font=dict(color="black", size=12)),
                tickfont=dict(color="black", size=14),
                len=0.6, thickness=15, orientation="v",
                x=0.95, xanchor="right", y=0.95, yanchor="top",
                outlinewidth=1, outlinecolor="black",
                bgcolor="rgba(255,255,255,0.85)", ticklen=5,
            ),
        ),
        hoverinfo="none", showlegend=False,
    )

    # Events
    good_x = good_y = bad_x = bad_y = open_x = open_y = close_x = close_y = np.array([])
    good_times = bad_times = open_times = close_times = None

    if event_positions is not None and not event_positions.empty:
        ev = event_positions.copy()
        ev["action_str"] = ev["action"].astype(str).str.lower()
        details_str = ev.get("details", pd.Series([{}] * len(ev))).astype(str).str.lower()

        good_mask = (ev["action_str"] == "positive_feedback") | (
            (ev["action_str"] == "feedback_blink") & details_str.str.contains("good")
        )
        bad_mask = ev["action_str"].str.contains("negative|bad") | (
            (ev["action_str"] == "feedback_blink") & details_str.str.contains("bad")
        )

        global open_box_events, close_box_events
        if "open_box_events" in globals() and len(open_box_events) > 0:
            open_times = (
                open_box_events["time_rel"].to_numpy(dtype=float)
                if "time_rel" in open_box_events.columns
                else (open_box_events["timestamp"] - df["timestamp"].iloc[0]).to_numpy(dtype=float)
            )
        if "close_box_events" in globals() and len(close_box_events) > 0:
            close_times = (
                close_box_events["time_rel"].to_numpy(dtype=float)
                if "time_rel" in close_box_events.columns
                else (close_box_events["timestamp"] - df["timestamp"].iloc[0]).to_numpy(dtype=float)
            )

        def map_box_positions(box_times):
            if box_times is None:
                return np.array([]), np.array([])
            xs, ys = [], []
            for bt in box_times:
                diffs = np.abs(ev["event_time"] - bt)
                if len(diffs):
                    idx = diffs.idxmin()
                    if diffs.loc[idx] < 0.6:
                        xs.append(ev.loc[idx, "x"])
                        ys.append(ev.loc[idx, "y"])
            return np.array(xs), np.array(ys)

        if good_mask.any():
            good_subset = ev[good_mask]
            good_times = good_subset["event_time"].to_numpy(dtype=float)
            good_x = good_subset["x"].to_numpy()
            good_y = good_subset["y"].to_numpy()
        if bad_mask.any():
            bad_subset = ev[bad_mask]
            bad_times = bad_subset["event_time"].to_numpy(dtype=float)
            bad_x = bad_subset["x"].to_numpy()
            bad_y = bad_subset["y"].to_numpy()

        open_x, open_y = map_box_positions(open_times)
        close_x, close_y = map_box_positions(close_times)

    # Event markers (created with empty data; will be filled by update_range)
    good_trace = go.Scatter(
        x=[], y=[], mode="markers",
        marker=dict(size=14, color="gold", symbol="star", line=dict(width=2, color="orange")),
        name="Good Feedback",
    )
    bad_trace = go.Scatter(
        x=[], y=[], mode="markers",
        marker=dict(size=12, color="lightcoral", symbol="x", line=dict(width=2, color="red")),
        name="Bad / Negative",
    )
    open_trace = go.Scatter(
        x=[], y=[], mode="markers",
        marker=dict(size=20, color="white", symbol="triangle-up", line=dict(width=3, color="darkgreen")),
        name="Box Open",
    )
    close_trace = go.Scatter(
        x=[], y=[], mode="markers",
        marker=dict(size=20, color="darkred", symbol="triangle-down", line=dict(width=2, color="white")),
        name="Box Close",
    )

    # Start/End markers (will be updated dynamically based on time window)
    start_marker = go.Scatter(
        x=[], y=[],
        mode="markers",
        marker=dict(size=16, color="lime", line=dict(width=3, color="darkgreen")),
        name="Start",
    )
    end_marker = go.Scatter(
        x=[], y=[],
        mode="markers",
        marker=dict(size=16, color="red", symbol="square", line=dict(width=3, color="darkred")),
        name="End",
    )

    # ---------- Add traces to fig ----------
    fig.add_traces(
        [full_path] +
        segment_traces +                    # many tiny segments
        [colorbar_trace, start_marker, end_marker, good_trace, bad_trace, open_trace, close_trace]
    )

    # ---------- IMPORTANT: Build references from fig.data ----------
    idx_full = 0
    idx_seg_start = 1
    idx_seg_end = idx_seg_start + (len(segment_traces) - 1) if len(segment_traces) else idx_seg_start - 1
    idx_colorbar = idx_seg_end + 1
    idx_start_marker = idx_colorbar + 1
    idx_end_marker = idx_colorbar + 2
    idx_good = idx_colorbar + 3
    idx_bad = idx_colorbar + 4
    idx_open = idx_colorbar + 5
    idx_close = idx_colorbar + 6

    # segment references taken from fig.data, not the originals
    seg_refs = [fig.data[idx_seg_start + i] for i in range(max(0, idx_seg_end - idx_seg_start + 1))]
    colorbar_ref = fig.data[idx_colorbar]
    start_marker_ref = fig.data[idx_start_marker]
    end_marker_ref = fig.data[idx_end_marker]
    good_ref = fig.data[idx_good]
    bad_ref = fig.data[idx_bad]
    open_ref = fig.data[idx_open]
    close_ref = fig.data[idx_close]

    # Controls
    t_min = float(time_rel.min())
    t_max = float(time_rel.max())
    
    # Use hardcoded values if provided, but clamp them to data range
    if hardcoded_start is not None:
        initial_start = float(np.clip(hardcoded_start, t_min, t_max))
    else:
        initial_start = t_min
        
    if hardcoded_end is not None:
        initial_end = float(np.clip(hardcoded_end, t_min, t_max))
    else:
        initial_end = t_max
    
    # Ensure end > start
    if initial_end <= initial_start:
        initial_end = min(t_max, initial_start + 0.1)

    step_selector = Dropdown(
        options=[("1.0 s", "1.0"), ("0.5 s", "0.5"), ("0.2 s", "0.2"),
                 ("0.1 s", "0.1"), ("0.05 s", "0.05"), ("0.02 s", "0.02"), ("0.01 s", "0.01")],
        value="0.1", description="Step", layout=Layout(width="130px"),
    )
    slider = FloatRangeSlider(
        value=(initial_start, initial_end), min=t_min, max=t_max, step=0.1,
        description="Time (s)", readout=False, continuous_update=True,
        layout=Layout(width="1000px"),
    )
    start_input = FloatText(value=initial_start, description="Start", layout=Layout(width="180px"))
    end_input = FloatText(value=initial_end, description="End", layout=Layout(width="180px"))
    stats_label = HTML()
    warn_label = HTML()
    legend_checkbox = Checkbox(value=True, description="Show Legend", layout=Layout(width="150px"))

    MIN_DELTA = 0.0005

    def clamp(v): return float(np.clip(v, t_min, t_max))

    def apply_inputs_to_slider():
        s0 = clamp(start_input.value)
        s1 = clamp(end_input.value)
        if s1 - s0 < MIN_DELTA:
            s1 = min(t_max, s0 + MIN_DELTA)
        slider.value = (s0, s1)

    step_selector.observe(lambda ch: setattr(slider, "step", float(ch["new"])), names="value")
    start_input.observe(lambda ch: apply_inputs_to_slider(), names="value")
    end_input.observe(lambda ch: apply_inputs_to_slider(), names="value")

    def on_legend_toggle(change):
        # Only control legend visibility, not the speed colorbar
        fig.layout.showlegend = change["new"]

    legend_checkbox.observe(on_legend_toggle, names="value")
    # fig.layout.showlegend = legend_checkbox.value
    fig.layout.showlegend = False

    def update_range(_=None):
        t0, t1 = slider.value
        start_input.value = t0
        end_input.value = t1

        mask = (time_rel >= t0) & (time_rel <= t1)

        # segments: show only if both endpoints are in the window
        if seg_refs:
            up_to = min(len(seg_refs), len(mask) - 1)
            for i in range(up_to):
                if mask[i] and mask[i + 1]:
                    seg_refs[i].x = [x_vals[i], x_vals[i + 1]]
                    seg_refs[i].y = [y_vals[i], y_vals[i + 1]]
                    seg_refs[i].visible = True
                else:
                    seg_refs[i].x = []
                    seg_refs[i].y = []
                    seg_refs[i].visible = False

        # stats
        if mask.any():
            # Update start and end markers to show beginning and end of selected window
            first_idx = np.where(mask)[0][0]
            last_idx = np.where(mask)[0][-1]
            
            start_marker_ref.x = [x_vals[first_idx]]
            start_marker_ref.y = [y_vals[first_idx]]
            end_marker_ref.x = [x_vals[last_idx]]
            end_marker_ref.y = [y_vals[last_idx]]
            
            dx = np.diff(x_vals[mask]); dy = np.diff(y_vals[mask])
            seg_len = float(np.sqrt(dx * dx + dy * dy).sum())
            abs_start = time_dt[mask][0]; abs_end = time_dt[mask][-1]
            stats_label.value = (
                f"<span style='color:gray; font-size:14px'>"
                f"Window: {t0:.4f}s – {t1:.4f}s (Δ={(t1-t0):.4f}s) | "
                f"Absolute: {abs_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} – "
                f"{abs_end.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}<br>"
                f"Points: {mask.sum()} | Path length: {seg_len:.2f} m"
                f"</span>"
            )
            warn_label.value = ""
        else:
            # Hide start/end markers when no data in selection
            start_marker_ref.x = []
            start_marker_ref.y = []
            end_marker_ref.x = []
            end_marker_ref.y = []
            stats_label.value = "<span style='color:gray'>Empty selection</span>"
            warn_label.value = "<span style='color:red'>Empty selection</span>"

        # events
        if good_times is not None:
            gmask = (good_times >= t0) & (good_times <= t1)
            good_ref.x = good_x[gmask] if gmask.any() else []
            good_ref.y = good_y[gmask] if gmask.any() else []
        # if bad_times is not None:
        #     bmask = (bad_times >= t0) & (bad_times <= t1)
        #     bad_ref.x = bad_x[bmask] if bmask.any() else []
        #     bad_ref.y = bad_y[bmask] if bmask.any() else []
        # if open_times is not None:
        #     omask = (open_times >= t0) & (open_times <= t1)
        #     open_ref.x = open_x[omask] if omask.any() else []
        #     open_ref.y = open_y[omask] if omask.any() else []
        # if close_times is not None:
        #     cmask = (close_times >= t0) & (close_times <= t1)
        #     close_ref.x = close_x[cmask] if cmask.any() else []
        #     close_ref.y = close_y[cmask] if cmask.any() else []

    slider.observe(update_range, names="value")
    update_range()  # populate initial state

    # Export helpers (unchanged)
    EXPORT_W, EXPORT_H = 1200, 800

    def snapshot_current_figure(fig_widget):
        snap = go.Figure()
        for tr in fig_widget.data:
            # Skip traces that are not visible or have empty data
            if (getattr(tr, "visible", True) == False or 
                getattr(tr, "visible", True) == "legendonly" or 
                (hasattr(tr, 'x') and (tr.x is None or len(tr.x) == 0)) or
                (hasattr(tr, 'y') and (tr.y is None or len(tr.y) == 0))):
                continue
                
            tr_dict = {
                "x": list(tr.x) if tr.x is not None else [],
                "y": list(tr.y) if tr.y is not None else [],
                "mode": tr.mode,
                "name": tr.name,
                "hoverinfo": tr.hoverinfo,
                "hovertemplate": getattr(tr, "hovertemplate", None),
                "showlegend": tr.showlegend,
                "visible": tr.visible,
            }
            if getattr(tr, "line", None):
                tr_dict["line"] = {}
                if getattr(tr.line, "color", None): tr_dict["line"]["color"] = tr.line.color
                if getattr(tr.line, "width", None): tr_dict["line"]["width"] = tr.line.width
            if getattr(tr, "marker", None):
                tr_dict["marker"] = {}
                if getattr(tr.marker, "color", None) is not None: tr_dict["marker"]["color"] = tr.marker.color
                if getattr(tr.marker, "size", None): tr_dict["marker"]["size"] = tr.marker.size
                if getattr(tr.marker, "symbol", None): tr_dict["marker"]["symbol"] = tr.marker.symbol
                if getattr(tr.marker, "line", None):
                    tr_dict["marker"]["line"] = {}
                    if getattr(tr.marker.line, "color", None): tr_dict["marker"]["line"]["color"] = tr.marker.line.color
                    if getattr(tr.marker.line, "width", None): tr_dict["marker"]["line"]["width"] = tr.marker.line.width
                if getattr(tr.marker, "colorbar", None):
                    cb = tr.marker.colorbar
                    cbd = {}
                    if getattr(cb, "title", None): cbd["title"] = cb.title
                    if getattr(cb, "tickfont", None): cbd["tickfont"] = cb.tickfont
                    if getattr(cb, "len", None) is not None: cbd["len"] = cb.len
                    if getattr(cb, "thickness", None): cbd["thickness"] = cb.thickness
                    if getattr(cb, "orientation", None): cbd["orientation"] = cb.orientation
                    if getattr(cb, "x", None) is not None: cbd["x"] = cb.x
                    if getattr(cb, "xanchor", None): cbd["xanchor"] = cb.xanchor
                    if getattr(cb, "y", None) is not None: cbd["y"] = cb.y
                    if getattr(cb, "yanchor", None): cbd["yanchor"] = cb.yanchor
                    if getattr(cb, "outlinewidth", None): cbd["outlinewidth"] = cb.outlinewidth
                    if getattr(cb, "outlinecolor", None): cbd["outlinecolor"] = cb.outlinecolor
                    if getattr(cb, "bgcolor", None): cbd["bgcolor"] = cb.bgcolor
                    if getattr(cb, "ticklen", None): cbd["ticklen"] = cb.ticklen
                    tr_dict["marker"]["colorbar"] = cbd
                if getattr(tr.marker, "colorscale", None): tr_dict["marker"]["colorscale"] = tr.marker.colorscale
                if getattr(tr.marker, "cmin", None) is not None: tr_dict["marker"]["cmin"] = tr.marker.cmin
                if getattr(tr.marker, "cmax", None) is not None: tr_dict["marker"]["cmax"] = tr.marker.cmax
            snap.add_trace(go.Scatter(**{k: v for k, v in tr_dict.items() if v is not None}))
        snap.update_layout(fig_widget.layout)
        return snap

    reset_btn = Button(description="Reset", layout=Layout(width="70px"))
    export_btn = Button(description="Export PDF", layout=Layout(width="100px"))
    export_png_btn = Button(description="Export PNG", layout=Layout(width="110px"))

    def do_reset(_):
        # Reset to hardcoded values if provided, otherwise to full range
        reset_start = initial_start
        reset_end = initial_end
        slider.value = (reset_start, reset_end)
        step_selector.value = "0.1"
        slider.step = 0.1

    def export_current(fig_widget, fmt="pdf", filename="trajectory"):
        snap = snapshot_current_figure(fig_widget)
        snap.update_layout(width=EXPORT_W, height=EXPORT_H)
        snap.write_image(f"{filename}.{fmt}", format=fmt, engine="kaleido", scale=1)

    def do_export(_):
        try:
            from datetime import datetime
            t0, t1 = slider.value
            ts = datetime.now().strftime("%Y%m%d_%H%M%S")
            base = f"robot_trajectory_{ts}_t{t0:.2f}-{t1:.2f}s"
            export_current(fig, fmt="pdf", filename=base)
            warn_label.value = f"<span style='color:green; font-weight:bold'>✓ Exported: {base}.pdf</span>"
        except Exception as e:
            warn_label.value = f"<span style='color:red'>Export failed: {e}<br>Install with: pip install kaleido</span>"

    def do_export_png(_):
        try:
            from datetime import datetime
            t0, t1 = slider.value
            ts = datetime.now().strftime("%Y%m%d_%H%M%S")
            base = f"robot_trajectory_{ts}_t{t0:.2f}-{t1:.2f}s"
            export_current(fig, fmt="png", filename=base)
            warn_label.value = f"<span style='color:green; font-weight:bold'>✓ Exported: {base}.png</span>"
        except Exception as e:
            warn_label.value = f"<span style='color:red'>PNG export failed: {e}</span>"

    reset_btn.on_click(do_reset)
    export_btn.on_click(do_export)
    do_export(None)
    export_png_btn.on_click(do_export_png)

    header = HTML(
        "<b style='font-size:15px'>Select time window — Feedback + Box Open (▲) / Close (▼).<br>"
        "Use Step dropdown or manual Start/End for fine precision. Set HARDCODED_START_TIME/END_TIME above for auto-focus.<br>"
        "Export PDF/PNG saves current view with identical size & filters.</b>"
    )

    controls_row1 = HBox([slider])
    controls_row2 = HBox([step_selector, reset_btn, export_btn, export_png_btn, start_input, end_input, legend_checkbox])

    return VBox([header, controls_row1, controls_row2, stats_label, warn_label, fig])


# ---- Run (expects df in scope; optionally event_positions, open_box_events, close_box_events) ----
if PLOTLY_OK and "df" in locals() and not df.empty:
    print("Building interactive range slider visualization (feedback + box events) with precision controls ...")
    try:
        widget = create_time_range_slider_widget(
            df, 
            event_positions if "event_positions" in locals() else None,
            global_vmin=GLOBAL_VMIN if "GLOBAL_VMIN" in locals() else None,
            global_vmax=GLOBAL_VMAX if "GLOBAL_VMAX" in locals() else None,
            hardcoded_start=HARDCODED_START_TIME if "HARDCODED_START_TIME" in locals() else None,
            hardcoded_end=HARDCODED_END_TIME if "HARDCODED_END_TIME" in locals() else None
        )
        # widget = create_time_range_slider_widget(df)
        display(widget)
        print("Ready. Adjust the slider or use Start/End boxes for precise selection.")
    except Exception as e:
        print(f"Failed to build widget: {e}")
        print("If ipywidgets is not enabled, run: pip install plotly ipywidgets pytz tzlocal kaleido matplotlib")
else:
    if not PLOTLY_OK:
        print("Plotly/ipywidgets unavailable.")
    else:
        print("No trajectory data available for interactive visualization")

Building interactive range slider visualization (feedback + box events) with precision controls ...
Using global speed range: 0.000 - 1.386 m/s



The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.



Support for the 'engine' argument is deprecated and will be removed after September 2025.
Kaleido will be the only supported engine at that time.




VBox(children=(HTML(value="<b style='font-size:15px'>Select time window — Feedback + Box Open (▲) / Close (▼).…

Ready. Adjust the slider or use Start/End boxes for precise selection.
