# Lilsim Python SDK Test Suite

This notebook comprehensively tests all communication features of the lilsim simulator.

**Prerequisites:**
1. Start the lilsim application: `./build/debug/app/lilsim`
2. The tests will automatically enable/disable ZMQ and switch modes
3. Watch the GUI Status panel to see communication state updates

**What's Being Tested:**
- Connection and state streaming
- Admin commands (pause, run, reset, step)
- Asynchronous control mode
- Synchronous control mode
- Mode switching
- Client disconnection and timeout behavior
- Control period configuration (milliseconds → ticks)
- Marker visualization
- State updates continuity

In [1]:
# Setup and imports
import sys
import time
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output

# Add the SDK to path
sys.path.insert(0, '.')

from lilsim import LilsimClient, AdminCommandType, MarkerType
from lilsim.utils import state_to_dict

print("✓ Imports successful!")

✓ Imports successful!




In [2]:
client = LilsimClient(host="localhost")
client.connect()

INFO:lilsim.client:Connecting to lilsim at localhost...
INFO:lilsim.client:Connected to state stream (port 5556)
INFO:lilsim.client:Connected to admin command endpoint (port 5558)
INFO:lilsim.client:Connected to async control stream (port 5559)
INFO:lilsim.client:Connected to marker stream (port 5560)


In [6]:
client.run()

INFO:lilsim.client:Admin command succeeded: Simulation running


True

In [13]:
client.set_mode(sync=False)
client.send_control_async(steer_angle=-0.00, ax=0.0)

INFO:lilsim.client:Admin command succeeded: Asynchronous mode enabled


In [4]:
client.set_mode(sync=True, control_period_ms=100)
def tracking_controller(request):
    tick = request.header.tick
    sim_time = request.header.sim_time
    
    # Sinusoidal steering
    steer_angle = 0.3 * np.sin(tick * 0.02)
    ax = 2.0
    return (steer_angle, ax)
client.register_sync_controller(tracking_controller)
client.start()

INFO:lilsim.client:Admin command succeeded: Synchronous mode enabled
INFO:lilsim.client:Registered sync controller: tracking_controller
INFO:lilsim.client:Connected to sync control endpoint (port 5557)
INFO:lilsim.client:State listener thread started
INFO:lilsim.client:Control responder thread started
INFO:lilsim.client:Client started


## Test 1: Basic Connection and State Subscription

This test verifies:
- Connection to all sockets (state, admin, async control, markers)
- State updates are received continuously
- Background threads work correctly

In [2]:
print("Test 1: Creating client and connecting...")

# Create and connect client
client = LilsimClient(host="localhost")
client.connect()

# Subscribe to state updates
state_history = []

def state_callback(state):
    state_dict = state_to_dict(state)
    state_history.append(state_dict)

client.subscribe_state(state_callback)
client.start()

# Wait and check state updates
time.sleep(2)
print(f"✓ Received {len(state_history)} state updates")
if len(state_history) > 0:
    latest = state_history[-1]
    print(f"  Latest state: tick={latest['tick']}, pos=({latest['x']:.2f}, {latest['y']:.2f}), v={latest['v']:.2f} m/s")
else:
    print("⚠ No state updates received - is the simulator running and unpaused?")

INFO:lilsim.client:Connecting to lilsim at localhost...
INFO:lilsim.client:Connected to state stream (port 5556)
INFO:lilsim.client:Connected to admin command endpoint (port 5558)
INFO:lilsim.client:Connected to async control stream (port 5559)
INFO:lilsim.client:Connected to marker stream (port 5560)


Test 1: Creating client and connecting...


INFO:lilsim.client:Registered state callback: state_callback
INFO:lilsim.client:State listener thread started
INFO:lilsim.client:Client started


✓ Received 503 state updates
  Latest state: tick=53350, pos=(0.00, 0.00), v=0.00 m/s


## Test 2: Admin Commands

This test verifies all admin commands work correctly.

In [None]:
print("\nTest 2: Testing admin commands...")

# Pause
print("  - Pausing simulation...")
client.pause()
time.sleep(0.5)
print("    ✓ Pause command sent")

# Run
print("  - Resuming simulation...")
client.run()
time.sleep(0.5)
print("    ✓ Run command sent")

# Step
print("  - Stepping 10 ticks...")
client.step(10)
time.sleep(0.5)
print("    ✓ Step command sent")

# Resume after step
client.run()
time.sleep(0.5)

# Reset
print("  - Resetting simulation...")
client.reset()
time.sleep(0.5)
print("    ✓ Reset command sent")

print("✓ All admin commands completed successfully")

## Test 3: Asynchronous Control Mode

This test verifies:
- Async control commands are published correctly (port 5559)
- Simulator responds to controls without blocking
- Multiple rapid control updates work

In [None]:
print("\nTest 3: Testing asynchronous control mode...")

# Switch to async mode
print("  - Switching to async mode...")
client.set_mode(sync=False)
time.sleep(0.5)
print("    ✓ Async mode enabled")

# Send control commands in a circle pattern
print("  - Sending circular control pattern (50 commands)...")
client.run()  # Make sure sim is running
time.sleep(0.2)

for i in range(50):
    angle = 2 * np.pi * i / 50
    steer_angle = 0.3 * np.sin(angle)
    ax = 3.0
    client.send_control_async(steer_angle=steer_angle, ax=ax)
    time.sleep(0.05)  # 20 Hz control rate

print("✓ Async control test complete")

# Stop the car
client.send_control_async(steer_angle=0.0, ax=-5.0)
time.sleep(1.0)
client.send_control_async(steer_angle=0.0, ax=0.0)
print("  Car stopped")

In [14]:
client.set_mode(sync=False)
# client.send_control_async(steer_angle=0.0, ax=2.0)

AttributeError: 'AdminCommand' object has no attribute 'control_period_ms'

## Test 4: Synchronous Control Mode

This test verifies:
- Sync mode control request/response works (port 5557)
- Control period in milliseconds is respected
- Controller callback is called at correct rate

In [None]:
print("\nTest 4: Testing synchronous control mode...")

# Switch to sync mode with 100ms control period
print("  - Switching to sync mode (100ms period)...")
client.set_mode(sync=True, control_period_ms=100)
time.sleep(0.5)
print("    ✓ Sync mode enabled")

# Track control requests
control_requests = []

def tracking_controller(request):
    tick = request.header.tick
    sim_time = request.header.sim_time
    control_requests.append({'tick': tick, 'time': sim_time})
    
    # Sinusoidal steering
    steer_angle = 0.3 * np.sin(tick * 0.02)
    ax = 2.0
    return (steer_angle, ax)

print("  - Registering controller and running for 5 seconds...")
client.register_sync_controller(tracking_controller)
time.sleep(5)

print(f"✓ Sync control test complete")
print(f"  Received {len(control_requests)} control requests")
if len(control_requests) > 1:
    time_diffs = [control_requests[i+1]['time'] - control_requests[i]['time'] 
                  for i in range(len(control_requests)-1)]
    avg_period = np.mean(time_diffs) * 1000  # Convert to ms
    print(f"  Average control period: {avg_period:.1f} ms (expected: ~100 ms)")

## Test 5: Mode Switching

This test verifies switching between async and sync modes works seamlessly.

In [None]:
print("\nTest 5: Testing mode switching...")

# Start in async mode
print("  - Starting in async mode...")
client.set_mode(sync=False)
time.sleep(0.3)
for i in range(10):
    client.send_control_async(steer_angle=0.2, ax=2.0)
    time.sleep(0.05)
print("    ✓ Async mode working")

# Switch to sync mode
print("  - Switching to sync mode...")
client.set_mode(sync=True, control_period_ms=100)
time.sleep(0.3)

def switch_test_controller(request):
    return (0.1, 1.5)

client.register_sync_controller(switch_test_controller)
time.sleep(1.0)
print("    ✓ Sync mode working")

# Switch back to async
print("  - Switching back to async mode...")
client.set_mode(sync=False)
time.sleep(0.3)
for i in range(10):
    client.send_control_async(steer_angle=-0.2, ax=2.0)
    time.sleep(0.05)
print("    ✓ Async mode working again")

# Stop
client.send_control_async(steer_angle=0.0, ax=0.0)

print("✓ Mode switching test complete")

## Test 6: Visualization Markers

This test verifies marker publishing works correctly (port 5560).

## Test 7: Client Disconnection in Sync Mode

This test verifies that the simulator properly handles client disconnection.
**Note:** This will cause a timeout - simulation should pause and show "NO CLIENT" in red in the GUI.


In [None]:
print("\nTest 7: Testing client disconnection behavior...")
print("  This test will intentionally stop responding to control requests.")
print("  Watch the GUI - it should show 'NO CLIENT' in red after ~500ms timeout.")

# Switch to sync mode
client.set_mode(sync=True, control_period_ms=100)
time.sleep(0.3)

# Register a controller that stops responding
responded = [0]

def disconnect_test_controller(request):
    responded[0] += 1
    # Only respond to first 5 requests, then "disconnect"
    if responded[0] <= 5:
        print(f"    Responding to request {responded[0]}...")
        return (0.0, 0.0)
    else:
        # Simulate disconnect by taking too long
        print(f"    Ignoring request {responded[0]} (simulating disconnect)...")
        time.sleep(10)  # This will timeout
        return (0.0, 0.0)

client.register_sync_controller(disconnect_test_controller)

print("\n  Waiting for timeout (this will take ~2 seconds)...")
time.sleep(2)

print("\n  ✓ Timeout test complete")
print("  Check GUI - simulation should be paused with 'NO CLIENT' status")
print("  The Resume button should be disabled")


## Test Summary and Cleanup

Clean up and display test results.


In [None]:
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)

print("\n✓ All tests completed!")
print(f"\nTotal state updates received: {len(state_history)}")

if len(state_history) > 0:
    final_state = state_history[-1]
    print(f"Final state:")
    print(f"  Tick: {final_state['tick']}")
    print(f"  Position: ({final_state['x']:.2f}, {final_state['y']:.2f})")
    print(f"  Velocity: {final_state['v']:.2f} m/s")
    print(f"  Yaw: {final_state['yaw']:.2f} rad")

print("\nTests verified:")
print("  ✓ Connection and state subscription")
print("  ✓ Admin commands (pause, run, step, reset)")
print("  ✓ Asynchronous control mode (port 5559)")
print("  ✓ Synchronous control mode (port 5557)")
print("  ✓ Mode switching")
print("  ✓ Client disconnection/timeout handling")
print("  ✓ Visualization markers (port 5560)")

print("\nCleaning up...")
# Switch to async mode before closing (to unpause if stuck)
client.set_mode(sync=False)
time.sleep(0.3)
client.send_control_async(steer_angle=0.0, ax=0.0)  # Stop the car
time.sleep(0.5)
client.stop()
client.close()
print("✓ Client disconnected and cleaned up")

print("\n" + "="*60)
print("All tests complete! You can now disable ZMQ in the GUI.")
print("="*60)


In [None]:
print("\nTest 6: Testing visualization markers...")

# Circle marker
print("  - Publishing red circle at (5, 5)...")
client.publish_marker(
    ns="test", id=1, marker_type=MarkerType.CIRCLE,
    x=5.0, y=5.0, scale_x=2.0, scale_y=2.0,
    r=255, g=0, b=0, a=200
)

# Line strip forming a square
print("  - Publishing green square at (10, 10)...")
square = [(10,10,0), (12,10,0), (12,12,0), (10,12,0), (10,10,0)]
client.publish_marker(
    ns="test", id=2, marker_type=MarkerType.LINE_STRIP,
    points=square, r=0, g=255, b=0, a=255
)

# Circle at origin
print("  - Publishing blue circle at origin...")
client.publish_marker(
    ns="test", id=3, marker_type=MarkerType.CIRCLE,
    x=0.0, y=0.0, scale_x=1.5, scale_y=1.5,
    r=0, g=0, b=255, a=200
)

print("✓ Markers published - check viewport for red circle, green square, and blue circle")