## Test slam_manager

In [1]:
import os
from pathlib import Path
home = os.path.expanduser("~")
os.environ['LOCALIZATION_DATA_PATH'] = f'{home}/My/lovot-locxxx/MLaN/runtime/data/localization'

print(f"Environment variable set: {os.environ['LOCALIZATION_DATA_PATH']}")
assert Path(os.environ['LOCALIZATION_DATA_PATH']).exists()

# from importlib import reload

Environment variable set: /home/gx-shu/My/lovot-locxxx/MLaN/runtime/data/localization


In [2]:
from lovot_slam.slam_manager import SlamManager
from contextlib import AsyncExitStack
import anyio

# Create a minimal test subclass
class TestSlamManager(SlamManager):
    async def process_command(self, req: str):
        print(f"Command: {req}")
    
    async def _run_main(self):
        print("Main running...")
        await anyio.sleep(1)
    
    async def _stop(self):
        print("Stopping...")
    
    async def _setup_context(self, stack: AsyncExitStack):
        print("Setup context")


In [3]:
# Test instantiation
manager = TestSlamManager(debug=True)
print("✓ SlamManager instantiated successfully")
print(f"Ghost ID: {manager.ghost_id}")

# Test parse_request
cmd, req_id, args = manager.parse_request("test_cmd 123 arg1 arg2")
print(f"Parsed: cmd={cmd}, id={req_id}, args={args}")


✓ SlamManager instantiated successfully
Ghost ID: bobu0be0p7n5c1bjsnm0
Parsed: cmd=test_cmd, id=123, args=['arg1', 'arg2']


In [None]:
from lovot_slam.redis.clients import create_device_client, create_stm_client, create_ltm_client

# Set the debug flag FIRST (before importing debug_params)
ltm_client = create_ltm_client()
ltm_client.set("slam:debug:disable_removing_files", "true")
print("✓ Debug flag set in Redis")

# Setup Shaun/Lovot model (lv101)
device_client = create_device_client()
device_client.set('robot:model', 'lv101')

# Setup Redis state for map building
stm_client = create_stm_client()
stm_client.set('neodm:physical_state', 'ON_NEST')
stm_client.set('neodm:intention', 'INFINITE_EXPLORE')

# Also set ghost status and repair flag
stm_client.set('cloud:ghost:status', 'identified')
stm_client.set('under_repair', '0')

print("✓ Redis configured for Shaun/Lovot model with map building enabled")

# NOW reload the debug_params module to pick up the Redis value
from importlib import reload
import lovot_slam.flags.debug_params
reload(lovot_slam.flags.debug_params)

from lovot_slam.flags.debug_params import PARAM_DISABLE_REMOVING_FILES
print(f"PARAM_DISABLE_REMOVING_FILES = {PARAM_DISABLE_REMOVING_FILES}")



✓ Redis configured for Shaun/Lovot model with map building enabled
PARAM_DISABLE_REMOVING_FILES = False


In [5]:
from lovot_slam.nest_slam_manager import NestSlamManager
from lovot_slam.map_build.request_queue import BuildSingleMapOption, RequestTypes
import anyio

async def test_nest_slam_manager():
    print("Instantiating NestSlamManager...")
    manager = None
    try:
        manager = NestSlamManager(debug=True)
        print("✓ NestSlamManager instantiated successfully")
        print(f"Ghost ID: {manager.ghost_id}")
        print(f"Model: {manager._model}")
        print(f"Can build map (before run): {manager._can_build_map}")

          # Start the manager in the background with timeout
        with anyio.fail_after(30):  # 30 second timeout
            async with anyio.create_task_group() as tg:
                tg.start_soon(manager.run)

                  # Wait for _can_build_map to become True (poll every 0.5s)
                print("Waiting for _can_build_map to become True...")
                for i in range(20):  # Wait up to 10 seconds
                    await anyio.sleep(0.5)
                    if manager._can_build_map:
                        print(f"✓ Can build map became True after {(i+1)*0.5:.1f} seconds")
                        break
                else:
                    print("⚠ Timeout waiting for _can_build_map")

                print(f"Can build map (after run): {manager._can_build_map}")

                # Now you can push build requests
                # Example: manager.requests.push(RequestTypes.BuildMap, BuildSingleMapOption('20000101_000000'))

                # Let it run for a bit
                print("Running for 5 more seconds...")
                await anyio.sleep(5)

                # Cancel to exit
                print("Cancelling task group...")
                tg.cancel_scope.cancel()

    except TimeoutError:
        print("✗ Test timed out after 30 seconds")
    except Exception as eg:
        print(f"✗ Failed with exception group: {eg}")
        for exc in eg.exceptions:
            import traceback
            traceback.print_exception(type(exc), exc, exc.__traceback__)
    except Exception as e:
        print(f"✗ Failed: {e}")
        import traceback
        traceback.print_exc()
    finally:
        print("Test completed")

# Run the async test
await test_nest_slam_manager()



falling back to lovot model: LovotModel.LV101


Instantiating NestSlamManager...
✓ NestSlamManager instantiated successfully
Ghost ID: bobu0be0p7n5c1bjsnm0
Model: LovotModel.LV101
Can build map (before run): False
Waiting for _can_build_map to become True...
✓ Can build map became True after 0.5 seconds
Can build map (after run): True
Running for 5 more seconds...
Cancelling task group...
Test completed
