From 2a9793fce7333650a2c8e3276eda4a6a624d4041 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Tue, 2 Jun 2026 15:53:59 +0800 Subject: [PATCH 1/5] otimize Signed-off-by: kerthcet --- server/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index 45e3abb..4841fb8 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -129,8 +129,6 @@ impl Server { let request_id = Uuid::new_v4().to_string(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - conn.register_shell_session(request_id.clone(), tx); - let msg = Message::StartShell { request_id: request_id.clone(), rows, @@ -141,6 +139,8 @@ impl Server { conn.send_message(msg) .map_err(|e| PyRuntimeError::new_err(format!("Failed to start shell: {}", e)))?; + conn.register_shell_session(request_id.clone(), tx); + Ok(ShellSession { session_id: request_id, daemon_id, From 92e35f10cc57eb771bad1e9aa58fadde87cdd547 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Tue, 2 Jun 2026 16:01:01 +0800 Subject: [PATCH 2/5] optimize the apis Signed-off-by: kerthcet --- .github/workflows/rust-ci.yaml | 5 +- Makefile | 7 +- README.md | 46 ++--- docs/ARCHITECTURE.md | 2 +- docs/DEVELOP.md | 16 +- docs/PROTOCOL.md | 81 ++++---- docs/QUICKSTART.md | 27 +-- examples/agent_example.py | 190 ------------------ ...imple_command_test.py => exec_commands.py} | 8 +- examples/install_htop.py | 17 +- examples/interactive_session.py | 50 +++++ pyproject.toml | 2 +- python/sandd/__init__.py | 124 +++++++++--- python/tests/test_e2e.py | 146 +++++++++++++- python/tests/test_integration.py | 126 ++++++------ python/tests/test_unit.py | 8 +- sandd/src/main.rs | 97 +++++---- sandd/src/protocol.rs | 27 +-- sandd/src/{shell.rs => session.rs} | 109 ++++++---- server/src/lib.rs | 66 +++--- server/src/protocol.rs | 77 ++++--- server/src/registry.rs | 24 +-- server/src/server.rs | 14 +- 23 files changed, 713 insertions(+), 556 deletions(-) delete mode 100644 examples/agent_example.py rename examples/{simple_command_test.py => exec_commands.py} (89%) create mode 100755 examples/interactive_session.py rename sandd/src/{shell.rs => session.rs} (58%) diff --git a/.github/workflows/rust-ci.yaml b/.github/workflows/rust-ci.yaml index b29342d..736f2f6 100644 --- a/.github/workflows/rust-ci.yaml +++ b/.github/workflows/rust-ci.yaml @@ -36,8 +36,5 @@ jobs: with: toolchain: stable - - name: Run Unit tests + - name: Run tests run: make test - - - name: Run E2E tests - run: make test-e2e diff --git a/Makefile b/Makefile index 3dddf03..b98872e 100644 --- a/Makefile +++ b/Makefile @@ -61,15 +61,12 @@ test-e2e: $(PYTEST) dev @echo "Cleaning up containers..." docker compose -f docker-compose.e2e.yml down -docker-build: - docker compose -f docker-compose.e2e.yml build +docker-up: + docker compose -f docker-compose.e2e.yml up -d docker-down: docker compose -f docker-compose.e2e.yml down -test-all: test test-e2e - @echo "All tests completed successfully" - .PHONY: lint lint: $(RUFF) $(RUFF) check . diff --git a/README.md b/README.md index 0f5c6bd..8bf5673 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ # SandD -**A Lightweight Sandbox Daemon for Secure Agent Execution in Isolated Environments.** +**A Lightweight Sandbox Daemon that Provides Secure, Isolated Execution Environments for Agents.** [![Rust](https://img.shields.io/badge/rust-1.70+-orange.svg)](https://www.rust-lang.org/) [![Python](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/) @@ -18,8 +18,8 @@ Rust-powered WebSocket server with Python API for secure command execution in is ## Features -- ✅ **Command Execution**: Execute shell commands remotely with timeout support -- ✅ **Interactive Shell (PTY)**: Full terminal sessions for debugging and manual work +- ✅ **Command Execution**: Execute commands remotely with timeout support +- ✅ **Interactive Sessions (PTY)**: Full terminal sessions for debugging and manual work - ✅ **File Transfer**: Upload/download files between agent and daemons - ✅ **High Performance**: Rust-powered WebSocket server handles 200+ concurrent connections - ✅ **Auto Reconnection**: Daemons automatically reconnect if connection drops @@ -29,25 +29,25 @@ Rust-powered WebSocket server with Python API for secure command execution in is ## Architecture ``` -┌─────────────────────────────────────────┐ -│ Python Agent Application │ -│ ┌────────────────────────────────────┐ │ -│ │ from sandd import Server │ │ -│ │ │ │ -│ │ server = Server("0.0.0.0", 8765) │ │ -│ │ result = server.execute_command( │ │ -│ │ "daemon-1", "ls -la" │ │ -│ │ ) │ │ -│ └────────────────────────────────────┘ │ -│ ▲ │ -│ │ Python bindings (PyO3) │ -│ ▼ │ -│ ┌────────────────────────────────────┐ │ -│ │ Rust WebSocket Server (tokio) │ │ -│ │ • Command routing │ │ -│ │ • Session management │ │ -│ └────────────────────────────────────┘ │ -└─────────────────────────────────────────┘ +┌──────────────────────────────────────────┐ +│ Python Agent Application │ +│ ┌────────────────────────────────────┐ │ +│ │ from sandd import Server │ │ +│ │ │ │ +│ │ server = Server("0.0.0.0", 8765) │ │ +│ │ result = server.exec( │ │ +│ │ "daemon-1", "ls -la" │ │ +│ │ ) │ │ +│ └────────────────────────────────────┘ │ +│ ▲ │ +│ │ Python bindings (PyO3) │ +│ ▼ │ +│ ┌────────────────────────────────────┐ │ +│ │ Rust WebSocket Server (tokio) │ │ +│ │ • Command routing │ │ +│ │ • Session management │ │ +│ └────────────────────────────────────┘ │ +└──────────────────────────────────────────┘ ▲ │ WebSocket (WSS) │ (Daemon initiates connection) @@ -90,7 +90,7 @@ print(f"Server listening on {server.address}") server.wait_for_daemon("worker-1", timeout=30) # Execute command -result = server.execute_command("worker-1", "hostname") +result = server.exec("worker-1", "hostname") print(f"Output: {result.stdout}") ``` diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 2514104..b8855a6 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -29,7 +29,7 @@ At 200+ connections, Python asyncio: ### Why WebSocket? - Persistent bidirectional connection -- Efficient for streaming (shell output) +- Efficient for streaming (session output) - Well-supported libraries - Can multiplex multiple sessions over one connection diff --git a/docs/DEVELOP.md b/docs/DEVELOP.md index d9d52ea..ed47616 100644 --- a/docs/DEVELOP.md +++ b/docs/DEVELOP.md @@ -24,7 +24,7 @@ SandD/ │ ├── src/ │ │ ├── main.rs # Daemon entry point │ │ ├── executor.rs # Command execution -│ │ ├── shell.rs # Shell (not implemented) +│ │ ├── session.rs # Interactive sessions (PTY) │ │ └── protocol.rs # Message protocol │ └── Cargo.toml │ @@ -124,7 +124,7 @@ command_tx: mpsc::UnboundedSender // Stored in registry **Incoming (Daemon → Python):** ```rust pending_commands: oneshot::Sender // Request/Response -shell_sessions: mpsc::Sender> // Streaming +sessions: mpsc::Sender> // Streaming file_transfers: Vec> // Chunked buffering ``` @@ -211,11 +211,7 @@ RUST_LOG=server=debug python3 examples/simple_test.py ### Not Implemented -1. **Interactive Shell**: Infrastructure exists, daemon returns "not implemented" - - Reason: `PtySystem` Sync issues - - Fix: Refactor shell manager to avoid Sync constraints - -2. **File Transfer**: Protocol defined, daemon just logs +1. **File Transfer**: Protocol defined, daemon just logs - Reason: Deferred for MVP - Fix: Implement actual file I/O in daemon @@ -278,14 +274,14 @@ Include motivation and context. - Check daemon logs: `RUST_LOG=info ./target/release/sandd ...` **Commands timing out:** -- Increase `timeout` parameter in `execute_command()` (in seconds) +- Increase `timeout` parameter in `exec()` (in seconds) - Check daemon system resources: `top`, `free -h` - Verify command actually completes when run manually - Check daemon logs for errors **High memory usage:** -- Monitor active shell sessions (they hold state) -- Close unused shell sessions +- Monitor active sessions (they hold state) +- Close unused sessions with `session.close()` - Check number of connected daemons: `server.daemon_count()` ### Development Issues diff --git a/docs/PROTOCOL.md b/docs/PROTOCOL.md index 6c031d8..559f7ee 100644 --- a/docs/PROTOCOL.md +++ b/docs/PROTOCOL.md @@ -56,8 +56,8 @@ All messages are JSON with a `type` field indicating the message type: ```json { - "type": "execute_command", - "command_id": "uuid-here", + "type": "exec", + "request_id": "uuid-here", "command": "ls -la", "timeout_secs": 300, "env": {}, @@ -130,8 +130,8 @@ All messages are JSON with a `type` field indicating the message type: ```json { - "type": "execute_command", - "command_id": "550e8400-e29b-41d4-a716-446655440000", + "type": "exec", + "request_id": "550e8400-e29b-41d4-a716-446655440000", "command": "python script.py", "timeout_secs": 300, "env": { @@ -142,7 +142,7 @@ All messages are JSON with a `type` field indicating the message type: ``` **Fields**: -- `command_id`: Unique identifier for tracking this command +- `request_id`: Unique identifier for tracking this request - `command`: Shell command to execute - `timeout_secs`: Maximum execution time (default: 300) - `env`: Environment variables (optional) @@ -155,7 +155,7 @@ All messages are JSON with a `type` field indicating the message type: ```json { "type": "command_output", - "command_id": "550e8400-e29b-41d4-a716-446655440000", + "request_id": "550e8400-e29b-41d4-a716-446655440000", "stdout": "output text...", "stderr": "", "exit_code": 0, @@ -170,20 +170,20 @@ All messages are JSON with a `type` field indicating the message type: ```json { "type": "command_error", - "command_id": "550e8400-e29b-41d4-a716-446655440000", + "request_id": "550e8400-e29b-41d4-a716-446655440000", "error": "command not found" } ``` -### Interactive Shell (PTY) +### Interactive Session (PTY) -#### StartShell +#### StartSession **Direction**: Agent → Daemon -**Purpose**: Start an interactive shell session +**Purpose**: Start an interactive session ```json { - "type": "start_shell", + "type": "start_session", "session_id": "550e8400-e29b-41d4-a716-446655440001", "rows": 24, "cols": 80, @@ -191,26 +191,26 @@ All messages are JSON with a `type` field indicating the message type: } ``` -#### ShellStarted +#### SessionStarted **Direction**: Daemon → Agent -**Purpose**: Acknowledge shell started +**Purpose**: Acknowledge session started ```json { - "type": "shell_started", + "type": "session_started", "session_id": "550e8400-e29b-41d4-a716-446655440001", "success": true, "error": null } ``` -#### ShellInput +#### SessionInput **Direction**: Agent → Daemon -**Purpose**: Send user input to shell +**Purpose**: Send user input to session ```json { - "type": "shell_input", + "type": "session_input", "session_id": "550e8400-e29b-41d4-a716-446655440001", "data": "bHMgLWxhCg==" } @@ -218,13 +218,13 @@ All messages are JSON with a `type` field indicating the message type: **Note**: `data` is base64-encoded bytes -#### ShellOutput +#### SessionOutput **Direction**: Daemon → Agent -**Purpose**: Stream shell output back to agent +**Purpose**: Stream session output back to agent ```json { - "type": "shell_output", + "type": "session_output", "session_id": "550e8400-e29b-41d4-a716-446655440001", "data": "ZmlsZTEgIGZpbGUyICBmaWxlMwo=" } @@ -232,26 +232,37 @@ All messages are JSON with a `type` field indicating the message type: **Note**: `data` is base64-encoded bytes -#### ShellResize +#### SessionResize **Direction**: Agent → Daemon **Purpose**: Resize terminal window ```json { - "type": "shell_resize", + "type": "session_resize", "session_id": "550e8400-e29b-41d4-a716-446655440001", "rows": 50, "cols": 120 } ``` -#### ShellExit +#### SessionClose +**Direction**: Agent → Daemon +**Purpose**: Close session + +```json +{ + "type": "session_close", + "session_id": "550e8400-e29b-41d4-a716-446655440001" +} +``` + +#### SessionExit **Direction**: Daemon → Agent -**Purpose**: Shell session terminated +**Purpose**: Session terminated ```json { - "type": "shell_exit", + "type": "session_exit", "session_id": "550e8400-e29b-41d4-a716-446655440001", "exit_code": 0 } @@ -367,25 +378,25 @@ All messages are JSON with a `type` field indicating the message type: ### Request/Response (Command Execution) -1. Agent generates unique `command_id` -2. Agent registers oneshot channel for this command +1. Agent generates unique `request_id` +2. Agent registers oneshot channel for this request 3. Agent sends `ExecuteCommand` message 4. Daemon executes and sends back `CommandOutput` 5. Agent resolves channel, Python receives result **Concurrency**: Multiple commands can execute in parallel -### Streaming (Shell Sessions) +### Streaming (Interactive Sessions) 1. Agent generates unique `session_id` 2. Agent registers mpsc channel for this session -3. Agent sends `StartShell` message +3. Agent sends `StartSession` message 4. Daemon starts PTY and begins streaming output -5. Agent sends `ShellInput` as user types -6. Daemon sends `ShellOutput` continuously -7. Session ends with `ShellExit` +5. Agent sends `SessionInput` as user types +6. Daemon sends `SessionOutput` continuously +7. Session ends with `SessionExit` -**Concurrency**: Multiple shell sessions per daemon supported +**Concurrency**: Multiple sessions per daemon supported ### Chunked Transfer (File Download) @@ -413,7 +424,7 @@ All messages are JSON with a `type` field indicating the message type: - Agent detects closed connection - Registry removes daemon - All pending commands fail - - Shell sessions terminate + - Terminate ``` ## Heartbeat & Connection Monitoring @@ -434,4 +445,4 @@ All messages are JSON with a `type` field indicating the message type: See `server/src/protocol.rs` for the complete Rust implementation using serde for JSON serialization. -Binary data (shell I/O, file chunks) is base64-encoded for JSON compatibility. +Binary data (session I/O, file chunks) is base64-encoded for JSON compatibility. diff --git a/docs/QUICKSTART.md b/docs/QUICKSTART.md index eb2fc3f..cf074cb 100644 --- a/docs/QUICKSTART.md +++ b/docs/QUICKSTART.md @@ -26,7 +26,7 @@ while True: if count > 0: print(f"\n{count} daemon(s) connected:") for daemon_id in server.list_daemons(): - result = server.execute_command(daemon_id, "hostname") + result = server.exec(daemon_id, "hostname") print(f" {daemon_id}: {result.stdout.strip()}") time.sleep(5) ``` @@ -51,7 +51,7 @@ from sandd import Server server = Server() server.wait_for_daemon("my-daemon-1", timeout=30) -result = server.execute_command("my-daemon-1", "uname -a") +result = server.exec("my-daemon-1", "uname -a") print(result.stdout) ``` @@ -69,21 +69,21 @@ from sandd import Server server = Server("0.0.0.0", 8765) # Simple command -result = server.execute_command("worker-1", "ls -la /tmp") +result = server.exec("worker-1", "ls -la /tmp") if result.success: print(result.stdout) else: print(f"Failed: {result.stderr}") # With environment variables -result = server.execute_command( +result = server.exec( "worker-1", "echo $MY_VAR", env={"MY_VAR": "custom_value"} ) # With timeout and working directory -result = server.execute_command( +result = server.exec( "worker-1", "python long_script.py", timeout=600, @@ -91,25 +91,28 @@ result = server.execute_command( ) ``` -### Interactive Shell +### Interactive Session ```python -# Start shell session -shell = server.start_shell("worker-1", rows=24, cols=80) +# Start session +session = server.new_session("worker-1", rows=24, cols=80) # Send commands -shell.write(b"cd /tmp\n") -shell.write(b"ls -la\n") +session.write(b"cd /tmp\n") +session.write(b"ls -la\n") # Read output import time time.sleep(0.5) -output = shell.read(timeout=1.0) +output = session.read(timeout=1.0) if output: print(output.decode()) # Resize terminal -shell.resize(rows=50, cols=120) +session.resize(rows=50, cols=120) + +# Close session +session.close() ``` ### File Transfer diff --git a/examples/agent_example.py b/examples/agent_example.py deleted file mode 100644 index d0a2f07..0000000 --- a/examples/agent_example.py +++ /dev/null @@ -1,190 +0,0 @@ -#!/usr/bin/env python3 -""" -Example: Python agent using SandD (Sandbox Daemon) - -This demonstrates how to use the server from a Python application -to execute commands on remote daemons. -""" - -import time -from sandd import Server - - -def main(): - # Start server - print("Starting sandbox server on 0.0.0.0:8765...") - server = Server(host="0.0.0.0", port=8765) - - print(f"Server started: {server.address}") - print("Waiting for daemons to connect...") - print("(Start a daemon with: ./target/release/sandd --server-url ws://localhost:8765/ws)") - print() - - # Wait for at least one daemon - while server.daemon_count() == 0: - time.sleep(1) - print(".", end="", flush=True) - - print("\n") - - # List connected daemons - daemons = server.list_daemons() - print(f"✓ Connected daemons: {len(daemons)}") - for daemon_id in daemons: - print(f" - {daemon_id}") - print() - - # Get server stats - stats = server.get_stats() - print("Server stats:") - print(f" Total daemons: {stats.total_daemons}") - print(f" By platform: {stats.by_platform}") - print(f" Oldest connection: {stats.oldest_connection_secs}s") - print() - - # Pick first daemon for demos - daemon_id = daemons[0] - print(f"Using daemon: {daemon_id}") - print("=" * 60) - print() - - # Example 1: Simple command execution - print("Example 1: Execute simple command") - print("-" * 60) - result = server.execute_command(daemon_id, "echo 'Hello from daemon!'") - print(f"Exit code: {result.exit_code}") - print(f"Duration: {result.duration_ms}ms") - print(f"Output: {result.stdout.strip()}") - print() - - # Example 2: Command with environment variables - print("Example 2: Command with environment") - print("-" * 60) - result = server.execute_command( - daemon_id, - "echo $MY_VAR", - env={"MY_VAR": "custom_value"} - ) - print(f"Output: {result.stdout.strip()}") - print() - - # Example 3: List files - print("Example 3: List files in /tmp") - print("-" * 60) - result = server.execute_command(daemon_id, "ls -lh /tmp | head -10") - if result.success: - print(result.stdout) - else: - print(f"Error: {result.stderr}") - print() - - # Example 4: System information - print("Example 4: Get system information") - print("-" * 60) - result = server.execute_command(daemon_id, "uname -a") - print(f"System: {result.stdout.strip()}") - - result = server.execute_command(daemon_id, "uptime") - print(f"Uptime: {result.stdout.strip()}") - print() - - # Example 5: Interactive shell (basic demo) - print("Example 5: Interactive shell session") - print("-" * 60) - try: - shell = server.start_shell(daemon_id) - print(f"Shell session started: {shell.session_id}") - - # Send commands - shell.write(b"pwd\n") - time.sleep(0.5) - - # Read output - output = shell.read(timeout=1.0) - if output: - print(f"Output: {output.decode().strip()}") - - shell.write(b"echo 'Interactive shell works!'\n") - time.sleep(0.5) - - output = shell.read(timeout=1.0) - if output: - print(f"Output: {output.decode().strip()}") - - print("✓ Shell session working") - print() - - except Exception as e: - print(f"Shell error: {e}") - print() - - # Example 6: File operations (mock, since we need actual files) - print("Example 6: File transfer") - print("-" * 60) - try: - # Create test file - test_data = b"Hello from agent!\nThis is a test file.\n" - server.upload_file(daemon_id, "/tmp/test_upload.txt", test_data) - print("✓ File uploaded to /tmp/test_upload.txt") - - # Verify with cat - result = server.execute_command(daemon_id, "cat /tmp/test_upload.txt") - print(f"File contents:\n{result.stdout}") - - # Download it back - downloaded = server.download_file(daemon_id, "/tmp/test_upload.txt") - print(f"✓ Downloaded {len(downloaded)} bytes") - print(f"Match: {downloaded == test_data}") - print() - - except Exception as e: - print(f"File transfer error: {e}") - print() - - # Example 7: Parallel execution (multiple commands) - print("Example 7: Parallel command execution") - print("-" * 60) - import concurrent.futures - - commands = [ - "date", - "whoami", - "hostname", - "pwd", - ] - - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - futures = { - executor.submit(server.execute_command, daemon_id, cmd): cmd - for cmd in commands - } - - for future in concurrent.futures.as_completed(futures): - cmd = futures[future] - result = future.result() - print(f"{cmd:15} -> {result.stdout.strip()}") - print() - - # Example 8: Error handling - print("Example 8: Error handling") - print("-" * 60) - result = server.execute_command(daemon_id, "ls /nonexistent/directory") - if not result.success: - print(f"Command failed with exit code: {result.exit_code}") - print(f"Error: {result.stderr.strip()}") - print() - - print("=" * 60) - print("All examples completed!") - print(f"Final daemon count: {server.daemon_count()}") - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("\n\nInterrupted by user") - except Exception as e: - print(f"\nError: {e}") - import traceback - traceback.print_exc() diff --git a/examples/simple_command_test.py b/examples/exec_commands.py similarity index 89% rename from examples/simple_command_test.py rename to examples/exec_commands.py index 2d36215..9555067 100755 --- a/examples/simple_command_test.py +++ b/examples/exec_commands.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Minimal test script for SandD""" +"""Minimal script for executing simple commands on daemons""" from sandd import Server import time @@ -23,7 +23,7 @@ for daemon_id in daemons: try: # Test 1: Python script - result = server.execute_command( + result = server.exec( daemon_id, "python3 -c 'import sys; print(f\"Python {sys.version_info.major}.{sys.version_info.minor}\")'", timeout=5 @@ -34,7 +34,7 @@ print(f"\n✗ Python test failed on {daemon_id}: exit_code={result.exit_code}") # Test 2: Wrong Python script (intentional error) - result = server.execute_command( + result = server.exec( daemon_id, "python3 -c 'undefined_variable'", timeout=5 @@ -45,7 +45,7 @@ print(f"✗ Error handling test failed on {daemon_id}: expected error but got success") # Test 3: Echo command - result = server.execute_command(daemon_id, "echo 'Hello from daemon!'", timeout=5) + result = server.exec(daemon_id, "echo 'Hello from daemon!'", timeout=5) if result.success: print(f"✓ Echo test passed on {daemon_id}: {result.stdout.strip()}") diff --git a/examples/install_htop.py b/examples/install_htop.py index e1162bb..f4b5df0 100755 --- a/examples/install_htop.py +++ b/examples/install_htop.py @@ -16,7 +16,7 @@ def check_htop_available(server, daemon_id): """Check if htop is available on a daemon""" - result = server.execute_command(daemon_id, "which htop", timeout=5) + result = server.exec(daemon_id, "which htop", timeout=5) return result.success @@ -25,7 +25,7 @@ def install_htop(server, daemon_id): print(f"Installing htop on {daemon_id}...") # Detect platform (macOS vs Linux) - platform_result = server.execute_command(daemon_id, "uname -s", timeout=5) + platform_result = server.exec(daemon_id, "uname -s", timeout=5) if not platform_result.success: print("❌ Could not detect platform") return False @@ -38,7 +38,7 @@ def install_htop(server, daemon_id): cmd = "brew install htop" else: # Linux - detect distribution - distro_result = server.execute_command( + distro_result = server.exec( daemon_id, "cat /etc/os-release 2>/dev/null || echo 'unknown'", timeout=5 @@ -64,7 +64,7 @@ def install_htop(server, daemon_id): return False # Execute installation - result = server.execute_command(daemon_id, cmd, timeout=120) + result = server.exec(daemon_id, cmd, timeout=120) if result.success: print("✓ htop installed successfully") @@ -84,6 +84,7 @@ def main(): # Wait for at least one daemon print("Waiting for daemons to connect...") + print("(Start a daemon with: ./target/release/sandd --server-url ws://127.0.0.1:8765/ws)") daemons = server.list_daemons() while not daemons: time.sleep(1) @@ -98,7 +99,7 @@ def main(): print("✓ htop is already installed") # Get htop version - result = server.execute_command(daemon_id, "htop --version", timeout=5) + result = server.exec(daemon_id, "htop --version", timeout=5) if result.success: # htop version is usually first line version_line = result.stdout.split('\n')[0] @@ -110,7 +111,7 @@ def main(): # Install htop if install_htop(server, daemon_id): # Verify installation - result = server.execute_command(daemon_id, "htop --version", timeout=5) + result = server.exec(daemon_id, "htop --version", timeout=5) if result.success: version_line = result.stdout.split('\n')[0] print(f" {version_line}") @@ -133,7 +134,7 @@ def main(): ] for cmd, description in test_commands: - result = server.execute_command(daemon_id, cmd, timeout=10) + result = server.exec(daemon_id, cmd, timeout=10) if result.success: print(f"✓ {description}") # Show first few lines of output @@ -150,7 +151,7 @@ def main(): print("Example complete!") print() print("Note: htop is an interactive tool. To use it interactively,") - print(" use server.start_shell() instead of execute_command().") + print(" use server.session() instead of exec().") if __name__ == "__main__": diff --git a/examples/interactive_session.py b/examples/interactive_session.py new file mode 100755 index 0000000..ccee3a1 --- /dev/null +++ b/examples/interactive_session.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +"""Example: Interactive session + +This example demonstrates the interactive mode where you can +type commands directly in a live terminal session. + +Usage: + python examples/interactive_session.py + make docker-up to start the server +""" + +from sandd import Server +import sys +import time + + +def main(): + print("Interactive Session Example") + print("=" * 50) + + # Connect to server + server = Server("127.0.0.1", 8765) + print(f"✓ Server started on {server.address}\n") + + # Wait for at least one daemon + print("Waiting for daemons to connect...") + daemons = server.list_daemons() + while not daemons: + time.sleep(1) + daemons = server.list_daemons() + + daemon_id = daemons[0] + print(f"✓ Found daemon: {daemon_id}\n") + + print("Starting interactive terminal...") + print() + + # Start session in interactive mode - this blocks until user exits + server.new_session(daemon_id, interactive=True) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nInterrupted by user") + sys.exit(0) + except Exception as e: + print(f"\n❌ Error: {e}") + sys.exit(1) diff --git a/pyproject.toml b/pyproject.toml index fa61d4e..a633325 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "sandd" version = "0.1.0" -description = "SandD - High-performance sandbox command execution system with 200+ concurrent agent support" +description = "SandD - A lightweight sandbox daemon that provides secure, isolated execution environments for agents." requires-python = ">=3.8" license = {text = "MIT"} classifiers = [ diff --git a/python/sandd/__init__.py b/python/sandd/__init__.py index 72edc7d..16e6f67 100644 --- a/python/sandd/__init__.py +++ b/python/sandd/__init__.py @@ -4,7 +4,7 @@ This package provides a Rust-powered WebSocket server for managing 200+ concurrent daemon connections with support for: - Command execution -- Interactive shell (PTY) +- Interactive session (PTY) - File transfer Example: @@ -12,13 +12,13 @@ >>> server = Server(host="0.0.0.0", port=8765) >>> >>> # Execute command - >>> result = server.execute_command("daemon-1", "ls -la") + >>> result = server.exec("daemon-1", "ls -la") >>> print(result.stdout) >>> - >>> # Start interactive shell - >>> shell = server.start_shell("daemon-1") - >>> shell.write(b"ls\\n") - >>> output = shell.read(timeout=1.0) + >>> # Start interactive session + >>> session = server.new_session("daemon-1") + >>> session.write(b"ls\\n") + >>> output = session.read(timeout=1.0) >>> >>> # File transfer >>> server.upload_file("daemon-1", "/remote/path", data) @@ -27,9 +27,11 @@ from typing import Optional, Dict, List import time +import sys +import select try: - from ._core import Server as _RustServer, ShellSession, PyCommandResult, PyStats + from ._core import Server as _RustServer, Session, PyCommandResult, PyStats except ImportError as e: raise ImportError( "Failed to import Rust extension. " @@ -39,7 +41,7 @@ __version__ = "0.0.0" __all__ = [ "Server", - "ShellSession", + "Session", "CommandResult", "ServerStats", ] @@ -139,7 +141,7 @@ class Server: Example: >>> server = Server("0.0.0.0", 8765) >>> server.wait_for_daemon("daemon-1", timeout=30) - >>> result = server.execute_command("daemon-1", "hostname") + >>> result = server.exec("daemon-1", "hostname") >>> print(result.stdout) """ @@ -148,7 +150,7 @@ def __init__(self, host: str = "0.0.0.0", port: int = 8765): self._host = host self._port = port - def execute_command( + def exec( self, daemon_id: str, command: str, @@ -160,7 +162,7 @@ def execute_command( Args: daemon_id: Target daemon ID - command: Command to execute (shell string) + command: Command to execute (session string) timeout: Execution timeout in seconds (default: 300) env: Environment variables to set cwd: Working directory @@ -174,45 +176,57 @@ def execute_command( RuntimeError: If command fails to execute Example: - >>> result = server.execute_command("daemon-1", "ls -la /tmp") + >>> result = server.exec("daemon-1", "ls -la /tmp") >>> if result.success: ... print(result.stdout) """ - result = self._server.execute_command( + result = self._server.exec( daemon_id, command, timeout, env, cwd ) return CommandResult(result) - def start_shell( + def new_session( self, daemon_id: str, rows: int = 24, cols: int = 80, term: str = "xterm-256color", - ) -> ShellSession: - """Start an interactive shell session + interactive: bool = False, + ) -> Session: + """Create a new interactive session Args: daemon_id: Target daemon ID rows: Terminal rows (default: 24) cols: Terminal columns (default: 80) term: TERM environment variable (default: "xterm-256color") + interactive: If True, enters interactive mode with live terminal (default: False) Returns: - ShellSession for interactive I/O + Session for interactive I/O (or None if interactive=True, runs in foreground) Raises: ValueError: If daemon not found - RuntimeError: If shell fails to start + RuntimeError: If session fails to start - Example: - >>> shell = server.start_shell("daemon-1") - >>> shell.write(b"ls -la\\n") - >>> output = shell.read(timeout=1.0) + Example (Programmatic): + >>> session = server.new_session("daemon-1") + >>> session.write(b"ls -la\\n") + >>> output = session.read(timeout=1.0) >>> if output: ... print(output.decode()) + + Example (Interactive): + >>> server.new_session("daemon-1", interactive=True) + # Enters interactive terminal session - type commands directly """ - return self._server.start_shell(daemon_id, rows, cols, term) + session = self._server.new_session(daemon_id, rows, cols, term) + + if interactive: + self._run_interactive(session) + return None + + return session def upload_file( self, @@ -310,6 +324,68 @@ def get_stats(self) -> ServerStats: """ return ServerStats(self._server.get_stats()) + def _run_interactive(self, session: Session) -> None: + """Run session in interactive mode with live terminal + + Args: + session: Session to make interactive + """ + print("Entering interactive session session. Press Ctrl+D to exit.") + print("-" * 60) + + # Set terminal to raw mode on Unix systems + if sys.platform != "win32": + import tty + import termios + old_settings = termios.tcgetattr(sys.stdin) + try: + tty.setraw(sys.stdin.fileno()) + self._interactive_loop(session) + finally: + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) + else: + # Windows - just run without raw mode + self._interactive_loop(session) + + print("\n" + "-" * 60) + print("Interactive session ended.") + session.close() + + def _interactive_loop(self, session: Session) -> None: + """Main interactive I/O loop + + Args: + session: Session for I/O + """ + try: + while True: + # Check for input from stdin + if sys.platform != "win32": + rlist, _, _ = select.select([sys.stdin], [], [], 0.1) + if rlist: + data = sys.stdin.read(1) + if not data or data == '\x04': # Ctrl+D + break + session.write(data.encode()) + else: + # Windows - simple blocking read + import msvcrt + if msvcrt.kbhit(): + data = msvcrt.getch() + if data == b'\x04': # Ctrl+D + break + session.write(data) + + # Read output from session + output = session.read(timeout=0.05) + if output: + sys.stdout.buffer.write(output) + sys.stdout.buffer.flush() + + except KeyboardInterrupt: + # Ctrl+C - exit gracefully + pass + def wait_for_daemon( self, daemon_id: str, @@ -329,7 +405,7 @@ def wait_for_daemon( Example: >>> if server.wait_for_daemon("daemon-1", timeout=60): ... print("Daemon connected!") - ... result = server.execute_command("daemon-1", "hostname") + ... result = server.exec("daemon-1", "hostname") ... else: ... print("Timeout waiting for daemon") """ diff --git a/python/tests/test_e2e.py b/python/tests/test_e2e.py index d98c20d..78d1c7f 100644 --- a/python/tests/test_e2e.py +++ b/python/tests/test_e2e.py @@ -77,7 +77,7 @@ def test_execute_on_each_daemon(self, server): "daemon-rocky-1", "daemon-rocky-2" ] for daemon_id in daemon_ids: - result = server.execute_command( + result = server.exec( daemon_id, "echo 'Hello from container'", timeout=5 @@ -94,7 +94,7 @@ def test_concurrent_execution(self, server): ] def run_cmd(daemon_id): - return server.execute_command( + return server.exec( daemon_id, f"echo 'Response from {daemon_id}'", timeout=5 @@ -145,7 +145,7 @@ class TestE2EResilience: def test_daemon_restart(self, server): """Test daemon reconnection after container restart""" # Execute command before restart - result = server.execute_command("daemon-debian-1", "echo 'before'", timeout=5) + result = server.exec("daemon-debian-1", "echo 'before'", timeout=5) assert result.success # Restart container @@ -161,7 +161,7 @@ def test_daemon_restart(self, server): assert reconnected # Execute command after restart - result = server.execute_command("daemon-debian-1", "echo 'after'", timeout=5) + result = server.exec("daemon-debian-1", "echo 'after'", timeout=5) assert result.success assert "after" in result.stdout @@ -181,40 +181,40 @@ class TestE2EDistributionSpecific: def test_package_manager_debian(self, server): """Test apt package manager on Debian daemons""" - result = server.execute_command( + result = server.exec( "daemon-debian-1", "apt-get update && apt-get install -y curl", timeout=60 ) assert result.success - result = server.execute_command("daemon-debian-1", "curl --version", timeout=5) + result = server.exec("daemon-debian-1", "curl --version", timeout=5) assert result.success assert "curl" in result.stdout def test_package_manager_alpine(self, server): """Test apk package manager on Alpine daemons""" - result = server.execute_command( + result = server.exec( "daemon-alpine-1", "apk update && apk add curl", timeout=60 ) assert result.success - result = server.execute_command("daemon-alpine-1", "curl --version", timeout=5) + result = server.exec("daemon-alpine-1", "curl --version", timeout=5) assert result.success assert "curl" in result.stdout def test_package_manager_rocky(self, server): """Test dnf package manager on Rocky daemons""" - result = server.execute_command( + result = server.exec( "daemon-rocky-1", "microdnf install -y curl", timeout=60 ) assert result.success - result = server.execute_command("daemon-rocky-1", "curl --version", timeout=5) + result = server.exec("daemon-rocky-1", "curl --version", timeout=5) assert result.success assert "curl" in result.stdout @@ -226,9 +226,133 @@ def test_all_distros_run_same_command(self, server): "daemon-rocky-1" ] for daemon_id in daemon_ids: - result = server.execute_command(daemon_id, "uname -s", timeout=5) + result = server.exec(daemon_id, "uname -s", timeout=5) assert result.success assert result.stdout.strip() == "Linux" + +class TestE2ESessionSessions: + """Test interactive sessions across distributions""" + + def test_session_basic_commands(self, server): + """Test basic session interaction""" + daemon_id = "daemon-debian-1" + + session = server.new_session(daemon_id) + assert session is not None + + try: + # Send a command + session.write(b"echo 'Hello from session'\n") + time.sleep(0.5) + + # Read output + output = session.read(timeout=2.0) + assert output is not None + output_str = output.decode('utf-8', errors='ignore') + assert 'Hello from session' in output_str + + finally: + session.close() + + def test_session_across_distributions(self, server): + """Test session works on all distributions""" + daemon_ids = [ + "daemon-debian-1", + "daemon-alpine-1", + "daemon-rocky-1" + ] + + for daemon_id in daemon_ids: + session = server.new_session(daemon_id) + assert session is not None + + try: + # Test command execution + session.write(b"whoami\n") + time.sleep(0.3) + + output = session.read(timeout=2.0) + assert output is not None + # Should see some output (username) + assert len(output) > 0 + + finally: + session.close() + + def test_session_multiline_commands(self, server): + """Test multi-line commands in session""" + daemon_id = "daemon-alpine-1" + + session = server.new_session(daemon_id) + assert session is not None + + try: + # Send multi-line command + session.write(b"for i in 1 2 3; do\n") + time.sleep(0.2) + session.write(b"echo $i\n") + time.sleep(0.2) + session.write(b"done\n") + time.sleep(0.5) + + output = session.read(timeout=2.0) + assert output is not None + output_str = output.decode('utf-8', errors='ignore') + # Should see the numbers + assert '1' in output_str and '2' in output_str + + finally: + session.close() + + def test_session_environment_variables(self, server): + """Test setting and reading environment variables""" + daemon_id = "daemon-rocky-1" + + session = server.new_session(daemon_id) + assert session is not None + + try: + # Set environment variable + session.write(b"export TEST_VAR='test123'\n") + time.sleep(0.3) + + # Read it back + session.write(b"echo $TEST_VAR\n") + time.sleep(0.3) + + output = session.read(timeout=2.0) + assert output is not None + output_str = output.decode('utf-8', errors='ignore') + assert 'test123' in output_str + + finally: + session.close() + + def test_session_cd_persistence(self, server): + """Test that cd persists within a session session""" + daemon_id = "daemon-debian-1" + + session = server.new_session(daemon_id) + assert session is not None + + try: + # Change directory + session.write(b"cd /tmp\n") + time.sleep(0.3) + + # Verify we're in /tmp + session.write(b"pwd\n") + time.sleep(0.3) + + output = session.read(timeout=2.0) + assert output is not None + output_str = output.decode('utf-8', errors='ignore') + assert '/tmp' in output_str + + finally: + session.close() + + if __name__ == "__main__": pytest.main([__file__, "-v", "-s"]) diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py index 4910889..ad2c4ac 100644 --- a/python/tests/test_integration.py +++ b/python/tests/test_integration.py @@ -196,23 +196,23 @@ def test_execute_simple_command(self, server, daemon_process): """Test executing a simple command""" daemon_id, _ = daemon_process - result = server.execute_command(daemon_id, "echo 'Hello World'", timeout=5) + result = server.exec(daemon_id, "echo 'Hello World'", timeout=5) assert result.success assert result.exit_code == 0 assert "Hello World" in result.stdout assert result.duration_ms > 0 - def test_execute_command_with_failure(self, server, daemon_process): + def test_exec_with_failure(self, server, daemon_process): """Test executing a command that fails""" daemon_id, _ = daemon_process - result = server.execute_command(daemon_id, "exit 42", timeout=5) + result = server.exec(daemon_id, "exit 42", timeout=5) assert not result.success assert result.exit_code == 42 - def test_execute_command_with_env(self, server, daemon_process): + def test_exec_with_env(self, server, daemon_process): """Test executing command with environment variables""" daemon_id, _ = daemon_process @@ -222,16 +222,16 @@ def test_execute_command_with_env(self, server, daemon_process): else: # Unix cmd = "echo $TEST_VAR" - result = server.execute_command(daemon_id, cmd, timeout=5, env=env) + result = server.exec(daemon_id, cmd, timeout=5, env=env) assert result.success assert "test_value_123" in result.stdout - def test_execute_command_with_cwd(self, server, daemon_process): + def test_exec_with_cwd(self, server, daemon_process): """Test executing command with custom working directory""" daemon_id, _ = daemon_process - result = server.execute_command(daemon_id, "pwd", timeout=5, cwd="/tmp") + result = server.exec(daemon_id, "pwd", timeout=5, cwd="/tmp") assert result.success # On some systems /tmp might be a symlink, so check both @@ -243,7 +243,7 @@ def test_execute_long_output(self, server, daemon_process): # Generate 1000 lines of output cmd = "for i in {1..1000}; do echo 'Line $i'; done" - result = server.execute_command(daemon_id, cmd, timeout=10) + result = server.exec(daemon_id, cmd, timeout=10) assert result.success assert result.stdout.count('\n') >= 1000 @@ -254,14 +254,14 @@ def test_command_timeout(self, server, daemon_process): # Command that sleeps longer than timeout with pytest.raises(Exception): # Should raise timeout or runtime error - server.execute_command(daemon_id, "sleep 10", timeout=1) + server.exec(daemon_id, "sleep 10", timeout=1) def test_execute_python_script(self, server, daemon_process): """Test executing Python code""" daemon_id, _ = daemon_process cmd = "python3 -c 'import sys; print(f\"Python {sys.version_info.major}.{sys.version_info.minor}\")'" - result = server.execute_command(daemon_id, cmd, timeout=5) + result = server.exec(daemon_id, cmd, timeout=5) assert result.success assert "Python" in result.stdout @@ -291,62 +291,62 @@ def test_stats_platform_reporting(self, server, daemon_process): platforms = list(stats.by_platform.keys()) assert len(platforms) > 0 - # Common platform names - assert any(p in ["linux", "darwin", "windows", "Linux", "Darwin", "Windows"] + # Common platform names (Rust's std::env::consts::OS values) + assert any(p in ["linux", "macos", "windows", "Linux", "Darwin", "Windows"] for p in platforms) -class TestFileTransfer: - """Test file upload/download with real daemons""" +# class TestFileTransfer: +# """Test file upload/download with real daemons""" - def test_upload_and_download_file(self, server, daemon_process): - """Test uploading and downloading a file""" - daemon_id, _ = daemon_process +# def test_upload_and_download_file(self, server, daemon_process): +# """Test uploading and downloading a file""" +# daemon_id, _ = daemon_process - # Create test data - test_data = b"Hello from SandD test!\nLine 2\nLine 3" - remote_path = f"/tmp/sandd-test-{os.getpid()}.txt" +# # Create test data +# test_data = b"Hello from SandD test!\nLine 2\nLine 3" +# remote_path = f"/tmp/sandd-test-{os.getpid()}.txt" - try: - # Upload file - server.upload_file(daemon_id, remote_path, test_data) +# try: +# # Upload file +# server.upload_file(daemon_id, remote_path, test_data) - # Verify file exists - result = server.execute_command(daemon_id, f"cat {remote_path}", timeout=5) - assert result.success - assert test_data.decode() in result.stdout +# # Verify file exists +# result = server.exec(daemon_id, f"cat {remote_path}", timeout=5) +# assert result.success +# assert test_data.decode() in result.stdout - # Download file - downloaded_data = server.download_file(daemon_id, remote_path) - assert downloaded_data == test_data +# # Download file +# downloaded_data = server.download_file(daemon_id, remote_path) +# assert downloaded_data == test_data - finally: - # Cleanup - server.execute_command(daemon_id, f"rm -f {remote_path}", timeout=5) +# finally: +# # Cleanup +# server.exec(daemon_id, f"rm -f {remote_path}", timeout=5) - def test_upload_large_file(self, server, daemon_process): - """Test uploading a larger file (1MB)""" - daemon_id, _ = daemon_process +# def test_upload_large_file(self, server, daemon_process): +# """Test uploading a larger file (1MB)""" +# daemon_id, _ = daemon_process - # Create 1MB of test data - test_data = b"x" * (1024 * 1024) - remote_path = f"/tmp/sandd-large-test-{os.getpid()}.bin" +# # Create 1MB of test data +# test_data = b"x" * (1024 * 1024) +# remote_path = f"/tmp/sandd-large-test-{os.getpid()}.bin" - try: - server.upload_file(daemon_id, remote_path, test_data) - - # Verify size - result = server.execute_command( - daemon_id, - f"wc -c < {remote_path}", - timeout=5 - ) - assert result.success - size = int(result.stdout.strip()) - assert size == len(test_data) +# try: +# server.upload_file(daemon_id, remote_path, test_data) - finally: - server.execute_command(daemon_id, f"rm -f {remote_path}", timeout=5) +# # Verify size +# result = server.exec( +# daemon_id, +# f"wc -c < {remote_path}", +# timeout=5 +# ) +# assert result.success +# size = int(result.stdout.strip()) +# assert size == len(test_data) + +# finally: +# server.exec(daemon_id, f"rm -f {remote_path}", timeout=5) class TestWaitForDaemon: @@ -395,29 +395,29 @@ def wait_thread(): not DAEMON_BINARY.exists(), reason="Requires compiled daemon binary" ) -class TestShellSession: - """Test interactive shell sessions""" +class TestSession: + """Test interactive sessions""" - def test_start_shell_session(self, server, daemon_process): - """Test starting an interactive shell""" + def test_session(self, server, daemon_process): + """Test starting an interactive session""" daemon_id, _ = daemon_process - shell = server.start_shell(daemon_id) - assert shell is not None + session = server.new_session(daemon_id) + assert session is not None # Write a command - shell.write(b"echo 'test123'\n") + session.write(b"echo 'test123'\n") # Read output with timeout - output = shell.read(timeout=2.0) + output = session.read(timeout=2.0) # Should contain our echo if output: output_str = output.decode('utf-8', errors='ignore') assert 'test123' in output_str or 'echo' in output_str - # Close shell - shell.close() + # Close session + session.close() if __name__ == "__main__": diff --git a/python/tests/test_unit.py b/python/tests/test_unit.py index d6331d4..528984c 100644 --- a/python/tests/test_unit.py +++ b/python/tests/test_unit.py @@ -102,15 +102,15 @@ def test_stats_repr(self): class TestErrorHandling: """Test error handling""" - def test_execute_command_invalid_daemon(self): + def test_exec_invalid_daemon(self): server = Server() with pytest.raises(ValueError, match="not found"): - server.execute_command("invalid", "echo test") + server.exec("invalid", "echo test") - def test_start_shell_invalid_daemon(self): + def test_session_invalid_daemon(self): server = Server() with pytest.raises(ValueError, match="not found"): - server.start_shell("invalid") + server.new_session("invalid") def test_upload_file_invalid_daemon(self): server = Server() diff --git a/sandd/src/main.rs b/sandd/src/main.rs index 225af00..5ce7f00 100644 --- a/sandd/src/main.rs +++ b/sandd/src/main.rs @@ -1,10 +1,6 @@ -// Shell functionality is disabled in MVP, allow dead code warnings -#![allow(dead_code)] -#![allow(unused_imports)] - mod executor; mod protocol; -mod shell; +mod session; use anyhow::Result; use clap::Parser; @@ -20,7 +16,9 @@ use tracing::{debug, error, info, warn}; #[derive(Parser, Debug)] #[command(name = "sandd")] -#[command(about = "SandD - Sandbox Daemon for remote command execution")] +#[command( + about = "SandD - A lightweight sandbox daemon that provides secure, isolated execution environments for agents." +)] struct Args { /// Server URL (e.g., ws://localhost:8765/ws) #[arg(short, long, env = "SERVER_URL")] @@ -77,7 +75,14 @@ async fn main() -> Result<()> { // Main connection loop with reconnection loop { - match connect_and_serve(&args.server_url, &daemon_id, args.heartbeat_interval, labels.clone()).await { + match connect_and_serve( + &args.server_url, + &daemon_id, + args.heartbeat_interval, + labels.clone(), + ) + .await + { Ok(_) => info!("Connection closed gracefully"), Err(e) => error!("Connection error: {}", e), } @@ -161,6 +166,7 @@ async fn connect_and_serve( // Initialize executors let executor = Arc::new(CommandExecutor::new()); + let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new())); // Spawn heartbeat task let ws_tx_clone = Arc::new(tokio::sync::Mutex::new(ws_tx)); @@ -203,7 +209,7 @@ async fn connect_and_serve( }; // Handle message inline - if let Err(e) = handle_message(message, ws_tx_clone.clone(), executor.clone()).await { + if let Err(e) = handle_message(message, ws_tx_clone.clone(), executor.clone(), session_manager.clone()).await { error!("Error handling message: {}", e); } } @@ -218,9 +224,10 @@ async fn handle_message( message: Message, ws_tx: Arc>, executor: Arc, + session_manager: Arc>, ) -> Result<()> where - T: SinkExt + Unpin, + T: SinkExt + Unpin + Send + 'static, T::Error: std::error::Error + Send + Sync + 'static, { match message { @@ -286,22 +293,30 @@ where } } - Message::StartShell { - request_id, - rows: _, - cols: _, - term: _, + Message::StartSession { + session_id, + rows, + cols, + term, } => { - debug!( - "Starting shell session: {} (not implemented in MVP)", - request_id - ); - - // TODO: Shell functionality disabled for MVP due to PtySystem Sync issues - let response = Message::ShellStarted { - request_id, - success: false, - error: Some("Shell functionality not implemented in MVP".to_string()), + debug!("Starting session: {}", session_id); + + let mut manager = session_manager.lock().await; + let result = manager + .start_session(session_id.clone(), rows, cols, &term, ws_tx.clone()) + .await; + + let response = match result { + Ok(()) => Message::SessionStarted { + session_id, + success: true, + error: None, + }, + Err(e) => Message::SessionStarted { + session_id, + success: false, + error: Some(e.to_string()), + }, }; let json = serde_json::to_string(&response)?; @@ -311,21 +326,33 @@ where .map_err(|e| anyhow::anyhow!("{}", e))?; } - Message::ShellInput { - request_id: _, - data: _, + Message::SessionInput { + session_id, + data, } => { - debug!("Shell input (not implemented)"); - // TODO: Shell functionality disabled for MVP + debug!("Session input: {} bytes for session {}", data.len(), session_id); + let manager = session_manager.lock().await; + if let Err(e) = manager.send_input(&session_id, &data).await { + error!("Failed to send input to session {}: {}", session_id, e); + } } - Message::ShellResize { - request_id: _, - rows: _, - cols: _, + Message::SessionResize { + session_id, + rows, + cols, } => { - debug!("Shell resize (not implemented)"); - // TODO: Shell functionality disabled for MVP + debug!("Session resize: {} to {}x{}", session_id, rows, cols); + let manager = session_manager.lock().await; + if let Err(e) = manager.resize(&session_id, rows, cols).await { + error!("Failed to resize session {}: {}", session_id, e); + } + } + + Message::SessionClose { session_id } => { + debug!("Closing session: {}", session_id); + let mut manager = session_manager.lock().await; + manager.close_session(&session_id); } Message::FileUploadStart { diff --git a/sandd/src/protocol.rs b/sandd/src/protocol.rs index abe18b6..6366da0 100644 --- a/sandd/src/protocol.rs +++ b/sandd/src/protocol.rs @@ -37,35 +37,38 @@ pub enum Message { request_id: String, error: String, }, - StartShell { - request_id: String, + StartSession { + session_id: String, rows: u16, cols: u16, #[serde(default = "default_term")] term: String, }, - ShellStarted { - request_id: String, + SessionStarted { + session_id: String, success: bool, error: Option, }, - ShellInput { - request_id: String, + SessionInput { + session_id: String, #[serde(with = "base64_bytes")] data: Vec, }, - ShellOutput { - request_id: String, + SessionOutput { + session_id: String, #[serde(with = "base64_bytes")] data: Vec, }, - ShellResize { - request_id: String, + SessionResize { + session_id: String, rows: u16, cols: u16, }, - ShellExit { - request_id: String, + SessionClose { + session_id: String, + }, + SessionExit { + session_id: String, exit_code: i32, }, FileUploadStart { diff --git a/sandd/src/shell.rs b/sandd/src/session.rs similarity index 58% rename from sandd/src/shell.rs rename to sandd/src/session.rs index f4f5c11..45a531b 100644 --- a/sandd/src/shell.rs +++ b/sandd/src/session.rs @@ -3,24 +3,25 @@ use anyhow::{anyhow, Result}; use futures_util::SinkExt; use portable_pty::{native_pty_system, CommandBuilder, PtySize, PtySystem}; use std::collections::HashMap; +use std::io::Write; use std::sync::Arc; -use tokio::io::AsyncReadExt; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::protocol::Message as WsMessage; use tracing::{debug, error}; -pub struct ShellSession { - #[allow(dead_code)] +pub struct SessionHandle { session_id: String, + writer: Arc>>, + master: Arc>>, _reader_handle: tokio::task::JoinHandle<()>, } -pub struct ShellManager { - sessions: HashMap, +pub struct SessionManager { + sessions: HashMap, pty_system: Box, } -impl ShellManager { +impl SessionManager { pub fn new() -> Self { Self { sessions: HashMap::new(), @@ -28,7 +29,7 @@ impl ShellManager { } } - pub async fn start_shell( + pub async fn start_session( &mut self, session_id: String, rows: u16, @@ -41,7 +42,7 @@ impl ShellManager { T::Error: std::error::Error + Send + Sync + 'static, { debug!( - "Starting shell session {} ({}x{}, term={})", + "Starting session {} ({}x{}, term={})", session_id, rows, cols, term ); @@ -57,32 +58,35 @@ impl ShellManager { .openpty(pty_size) .map_err(|e| anyhow!("Failed to open PTY: {}", e))?; - // Spawn shell - let shell = if cfg!(target_os = "windows") { + // Spawn session + let session = if cfg!(target_os = "windows") { "cmd.exe".to_string() } else { std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string()) }; - let mut cmd = CommandBuilder::new(&shell); + let mut cmd = CommandBuilder::new(&session); cmd.env("TERM", term); - let _child = pair - .slave - .spawn_command(cmd) - .map_err(|e| anyhow!("Failed to spawn shell: {}", e))?; - - drop(pair.slave); + let mut reader = pair + .master + .try_clone_reader() + .map_err(|e| anyhow!("Failed to get PTY reader: {}", e))?; - let _writer = pair + // Take the writer before spawning + let writer = pair .master .take_writer() .map_err(|e| anyhow!("Failed to get PTY writer: {}", e))?; - let mut reader = pair - .master - .try_clone_reader() - .map_err(|e| anyhow!("Failed to get PTY reader: {}", e))?; + let _child = pair + .slave + .spawn_command(cmd) + .map_err(|e| anyhow!("Failed to spawn session: {}", e))?; + + // Store writer and master separately + let writer = Arc::new(Mutex::new(writer)); + let master = Arc::new(Mutex::new(pair.master)); // Spawn task to read from PTY and send to WebSocket let session_id_clone = session_id.clone(); @@ -92,11 +96,11 @@ impl ShellManager { loop { match reader.read(&mut buffer) { Ok(0) => { - debug!("Shell session {} ended", session_id_clone); + debug!("Session {} ended", session_id_clone); // Send exit message - let exit_msg = Message::ShellExit { - request_id: session_id_clone.clone(), + let exit_msg = Message::SessionExit { + session_id: session_id_clone.clone(), exit_code: 0, }; @@ -110,15 +114,15 @@ impl ShellManager { Ok(n) => { let data = buffer[..n].to_vec(); - let output_msg = Message::ShellOutput { - request_id: session_id_clone.clone(), + let output_msg = Message::SessionOutput { + session_id: session_id_clone.clone(), data, }; if let Ok(json) = serde_json::to_string(&output_msg) { let mut tx = ws_tx.lock().await; if tx.send(WsMessage::Text(json)).await.is_err() { - error!("Failed to send shell output, connection closed"); + error!("Failed to send session output, connection closed"); break; } } @@ -133,8 +137,10 @@ impl ShellManager { self.sessions.insert( session_id.clone(), - ShellSession { + SessionHandle { session_id, + writer, + master, _reader_handle: reader_handle, }, ); @@ -142,26 +148,53 @@ impl ShellManager { Ok(()) } - pub async fn send_input(&self, _session_id: &str, data: &[u8]) -> Result<()> { - // Note: In a production implementation, you'd want interior mutability here - // For now, this is a simplified version - debug!("Sending {} bytes to shell session {}", data.len(), _session_id); + pub async fn send_input(&self, session_id: &str, data: &[u8]) -> Result<()> { + debug!("Sending {} bytes to session {}", data.len(), session_id); + + let session = self + .sessions + .get(session_id) + .ok_or_else(|| anyhow!("Session not found: {}", session_id))?; + + let mut writer = session.writer.lock().await; + + writer + .write_all(data) + .map_err(|e| anyhow!("Failed to write to PTY: {}", e))?; + + writer + .flush() + .map_err(|e| anyhow!("Failed to flush PTY writer: {}", e))?; Ok(()) } - pub fn resize(&self, session_id: &str, rows: u16, cols: u16) -> Result<()> { - debug!("Resizing shell session {} to {}x{}", session_id, rows, cols); + pub async fn resize(&self, session_id: &str, rows: u16, cols: u16) -> Result<()> { + debug!("Resizing session {} to {}x{}", session_id, rows, cols); + + let session = self + .sessions + .get(session_id) + .ok_or_else(|| anyhow!("Session not found: {}", session_id))?; + + let master = session.master.lock().await; + let new_size = PtySize { + rows, + cols, + pixel_width: 0, + pixel_height: 0, + }; - // Note: portable-pty doesn't expose resize after creation easily - // In production, you'd store the PtyPair and call resize on it + master + .resize(new_size) + .map_err(|e| anyhow!("Failed to resize PTY: {}", e))?; Ok(()) } pub fn close_session(&mut self, session_id: &str) { if let Some(session) = self.sessions.remove(session_id) { - debug!("Closing shell session {}", session.session_id); + debug!("Closing session {}", session.session_id); } } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 4841fb8..8c41f85 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,6 +8,7 @@ mod server; use pyo3::exceptions::{PyRuntimeError, PyTimeoutError, PyValueError}; use pyo3::prelude::*; +use pyo3::types::PyBytes; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -67,7 +68,7 @@ impl Server { /// Execute a command on a daemon #[pyo3(signature = (daemon_id, command, timeout=300, env=None, cwd=None))] - fn execute_command( + fn exec( &self, daemon_id: String, command: String, @@ -112,37 +113,37 @@ impl Server { }) } - /// Start an interactive shell session + /// Create a new interactive session #[pyo3(signature = (daemon_id, rows=24, cols=80, term="xterm-256color".to_string()))] - fn start_shell( + fn new_session( &self, daemon_id: String, rows: u16, cols: u16, term: String, - ) -> PyResult { + ) -> PyResult { let conn = self .registry .get(&daemon_id) .ok_or_else(|| PyValueError::new_err(format!("Daemon {} not found", daemon_id)))?; - let request_id = Uuid::new_v4().to_string(); + let session_id = Uuid::new_v4().to_string(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let msg = Message::StartShell { - request_id: request_id.clone(), + let msg = Message::StartSession { + session_id: session_id.clone(), rows, cols, term, }; conn.send_message(msg) - .map_err(|e| PyRuntimeError::new_err(format!("Failed to start shell: {}", e)))?; + .map_err(|e| PyRuntimeError::new_err(format!("Failed to start session: {}", e)))?; - conn.register_shell_session(request_id.clone(), tx); + conn.register_session(session_id.clone(), tx); - Ok(ShellSession { - session_id: request_id, + Ok(Session { + session_id, daemon_id, registry: self.registry.clone(), runtime_handle: self.runtime.handle().clone(), @@ -242,9 +243,9 @@ impl Server { } } -/// Shell session handle -#[pyclass] -pub struct ShellSession { +/// Session handle +#[pyclass(name = "Session")] +pub struct Session { session_id: String, daemon_id: String, registry: Arc, @@ -253,16 +254,16 @@ pub struct ShellSession { } #[pymethods] -impl ShellSession { - /// Write data to the shell +impl Session { + /// Write data to the session fn write(&self, data: Vec) -> PyResult<()> { let conn = self .registry .get(&self.daemon_id) .ok_or_else(|| PyRuntimeError::new_err("Daemon disconnected"))?; - let msg = Message::ShellInput { - request_id: self.session_id.clone(), + let msg = Message::SessionInput { + session_id: self.session_id.clone(), data, }; @@ -270,28 +271,28 @@ impl ShellSession { .map_err(|e| PyRuntimeError::new_err(format!("Failed to write: {}", e))) } - /// Read output from the shell (non-blocking) + /// Read output from the session (non-blocking) #[pyo3(signature = (timeout=1.0))] - fn read(&self, timeout: f64) -> PyResult>> { + fn read(&self, timeout: f64) -> PyResult>> { self.runtime_handle.block_on(async { let mut rx = self.output_rx.lock().await; match tokio::time::timeout(Duration::from_secs_f64(timeout), rx.recv()).await { - Ok(Some(data)) => Ok(Some(data)), + Ok(Some(data)) => Python::with_gil(|py| Ok(Some(PyBytes::new(py, &data).into()))), Ok(None) => Ok(None), Err(_) => Ok(None), // Timeout } }) } - /// Resize the shell + /// Resize the session fn resize(&self, rows: u16, cols: u16) -> PyResult<()> { let conn = self .registry .get(&self.daemon_id) .ok_or_else(|| PyRuntimeError::new_err("Daemon disconnected"))?; - let msg = Message::ShellResize { - request_id: self.session_id.clone(), + let msg = Message::SessionResize { + session_id: self.session_id.clone(), rows, cols, }; @@ -300,6 +301,21 @@ impl ShellSession { .map_err(|e| PyRuntimeError::new_err(format!("Failed to resize: {}", e))) } + /// Close the session + fn close(&self) -> PyResult<()> { + let conn = self + .registry + .get(&self.daemon_id) + .ok_or_else(|| PyRuntimeError::new_err("Daemon disconnected"))?; + + let msg = Message::SessionClose { + session_id: self.session_id.clone(), + }; + + conn.send_message(msg) + .map_err(|e| PyRuntimeError::new_err(format!("Failed to close session: {}", e))) + } + /// Get session ID #[getter] fn session_id(&self) -> String { @@ -350,7 +366,7 @@ pub struct PyStats { #[pymodule] fn _core(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; Ok(()) diff --git a/server/src/protocol.rs b/server/src/protocol.rs index d569e2f..1cb8255 100644 --- a/server/src/protocol.rs +++ b/server/src/protocol.rs @@ -39,36 +39,39 @@ pub enum Message { error: String, }, - // Interactive shell (PTY mode) - StartShell { - request_id: String, + // Interactive session (PTY mode) + StartSession { + session_id: String, rows: u16, cols: u16, #[serde(default = "default_term")] term: String, }, - ShellStarted { - request_id: String, + SessionStarted { + session_id: String, success: bool, error: Option, }, - ShellInput { - request_id: String, + SessionInput { + session_id: String, #[serde(with = "base64_bytes")] data: Vec, }, - ShellOutput { - request_id: String, + SessionOutput { + session_id: String, #[serde(with = "base64_bytes")] data: Vec, }, - ShellResize { - request_id: String, + SessionResize { + session_id: String, rows: u16, cols: u16, }, - ShellExit { - request_id: String, + SessionClose { + session_id: String, + }, + SessionExit { + session_id: String, exit_code: i32, }, @@ -198,7 +201,10 @@ mod tests { let parsed: Message = serde_json::from_str(&json).unwrap(); match parsed { - Message::Register { daemon_id, metadata } => { + Message::Register { + daemon_id, + metadata, + } => { assert_eq!(daemon_id, "daemon-1"); assert_eq!(metadata.hostname, "test-host"); assert_eq!(metadata.platform, "linux"); @@ -252,9 +258,9 @@ mod tests { } #[test] - fn test_shell_messages() { - let start = Message::StartShell { - request_id: "shell-1".to_string(), + fn test_session_messages() { + let start = Message::StartSession { + session_id: "session-1".to_string(), rows: 24, cols: 80, term: "xterm-256color".to_string(), @@ -264,13 +270,13 @@ mod tests { let parsed: Message = serde_json::from_str(&json).unwrap(); match parsed { - Message::StartShell { - request_id, + Message::StartSession { + session_id, rows, cols, term, } => { - assert_eq!(request_id, "shell-1"); + assert_eq!(session_id, "session-1"); assert_eq!(rows, 24); assert_eq!(cols, 80); assert_eq!(term, "xterm-256color"); @@ -280,10 +286,10 @@ mod tests { } #[test] - fn test_shell_input_with_binary_data() { + fn test_session_input_with_binary_data() { let data = vec![0x01, 0x02, 0x03, 0xFF]; - let msg = Message::ShellInput { - request_id: "shell-1".to_string(), + let msg = Message::SessionInput { + session_id: "session-1".to_string(), data: data.clone(), }; @@ -291,7 +297,9 @@ mod tests { let parsed: Message = serde_json::from_str(&json).unwrap(); match parsed { - Message::ShellInput { data: parsed_data, .. } => { + Message::SessionInput { + data: parsed_data, .. + } => { assert_eq!(parsed_data, data); } _ => panic!("Wrong message type"), @@ -362,7 +370,10 @@ mod tests { let parsed: Message = serde_json::from_str(&json).unwrap(); match parsed { - Message::Error { message, recoverable } => { + Message::Error { + message, + recoverable, + } => { assert_eq!(message, "Something went wrong"); assert_eq!(recoverable, true); } @@ -446,8 +457,8 @@ mod tests { fn test_base64_encoding() { // Test that binary data is properly encoded let data = vec![0, 1, 2, 255, 254, 253]; - let msg = Message::ShellOutput { - request_id: "test".to_string(), + let msg = Message::SessionOutput { + session_id: "test".to_string(), data: data.clone(), }; @@ -458,7 +469,9 @@ mod tests { let parsed: Message = serde_json::from_str(&json).unwrap(); match parsed { - Message::ShellOutput { data: parsed_data, .. } => { + Message::SessionOutput { + data: parsed_data, .. + } => { assert_eq!(parsed_data, data); } _ => panic!("Wrong message type"), @@ -466,9 +479,9 @@ mod tests { } #[test] - fn test_shell_resize() { - let msg = Message::ShellResize { - request_id: "shell-1".to_string(), + fn test_session_resize() { + let msg = Message::SessionResize { + session_id: "session-1".to_string(), rows: 50, cols: 120, }; @@ -477,7 +490,7 @@ mod tests { let parsed: Message = serde_json::from_str(&json).unwrap(); match parsed { - Message::ShellResize { rows, cols, .. } => { + Message::SessionResize { rows, cols, .. } => { assert_eq!(rows, 50); assert_eq!(cols, 120); } diff --git a/server/src/registry.rs b/server/src/registry.rs index 234b176..dfe9b40 100644 --- a/server/src/registry.rs +++ b/server/src/registry.rs @@ -25,7 +25,7 @@ pub struct DaemonConnection { // ═══════════════════════════════════════════════════════════════════ // Incoming: Daemon → Python (Request/Response Pattern) // ═══════════════════════════════════════════════════════════════════ - /// Maps command_id → response channel for execute_command() calls + /// Maps request_id → response channel for execute_command() calls /// When Python sends a command, it registers a oneshot channel here and waits. /// When daemon responds with CommandOutput, we look up and send result back. /// Pattern: Request/Response (each command gets exactly one response) @@ -34,15 +34,15 @@ pub struct DaemonConnection { // ═══════════════════════════════════════════════════════════════════ // Incoming: Daemon → Python (Streaming Pattern) // ═══════════════════════════════════════════════════════════════════ - /// Maps session_id → output channel for interactive shell sessions - /// Shell output arrives incrementally from daemon, gets forwarded to Python. + /// Maps request_id → output channel for interactive sessions + /// Session output arrives incrementally from daemon, gets forwarded to Python. /// Pattern: Streaming (continuous flow of data chunks) - shell_sessions: Arc>>>, + sessions: Arc>>>, // ═══════════════════════════════════════════════════════════════════ // Incoming: Daemon → Python (Chunked Buffering Pattern) // ═══════════════════════════════════════════════════════════════════ - /// Maps transfer_id → accumulated file chunks for download operations + /// Maps request_id → accumulated file chunks for download operations /// File arrives in chunks from daemon, we buffer them until complete. /// Pattern: Chunked (collect pieces, return whole on completion) file_transfers: Arc>, @@ -82,7 +82,7 @@ impl DaemonConnection { connected_at: now, command_tx, pending_commands: Arc::new(DashMap::new()), - shell_sessions: Arc::new(DashMap::new()), + sessions: Arc::new(DashMap::new()), file_transfers: Arc::new(DashMap::new()), } } @@ -121,18 +121,18 @@ impl DaemonConnection { } } - pub fn register_shell_session(&self, session_id: String, tx: mpsc::UnboundedSender>) { - self.shell_sessions.insert(session_id, tx); + pub fn register_session(&self, session_id: String, tx: mpsc::UnboundedSender>) { + self.sessions.insert(session_id, tx); } - pub fn send_shell_output(&self, session_id: &str, data: Vec) { - if let Some(tx) = self.shell_sessions.get(session_id) { + pub fn send_session_output(&self, session_id: &str, data: Vec) { + if let Some(tx) = self.sessions.get(session_id) { let _ = tx.send(data); } } - pub fn close_shell_session(&self, session_id: &str) { - self.shell_sessions.remove(session_id); + pub fn close_session(&self, session_id: &str) { + self.sessions.remove(session_id); } pub fn start_file_transfer(&self, transfer_id: String, path: String, total_size: u64) { diff --git a/server/src/server.rs b/server/src/server.rs index 017528e..89f5f8c 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -229,25 +229,25 @@ async fn handle_daemon_message( } } - Message::ShellOutput { request_id, data } => { + Message::SessionOutput { session_id, data } => { if let Some(ref id) = daemon_id { if let Some(conn) = registry.get(id) { - conn.send_shell_output(&request_id, data); + conn.send_session_output(&session_id, data); } } } - Message::ShellExit { - request_id, + Message::SessionExit { + session_id, exit_code, } => { if let Some(ref id) = daemon_id { if let Some(conn) = registry.get(id) { debug!( - "Shell session {} exited with code {} on daemon {}", - request_id, exit_code, id + "Session {} exited with code {} on daemon {}", + session_id, exit_code, id ); - conn.close_shell_session(&request_id); + conn.close_session(&session_id); } } } From 52bf159de0bf1d745141fe02e5ad706044cd1b66 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Tue, 2 Jun 2026 16:04:03 +0800 Subject: [PATCH 3/5] update comment Signed-off-by: kerthcet --- server/src/registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/registry.rs b/server/src/registry.rs index dfe9b40..a62df72 100644 --- a/server/src/registry.rs +++ b/server/src/registry.rs @@ -34,7 +34,7 @@ pub struct DaemonConnection { // ═══════════════════════════════════════════════════════════════════ // Incoming: Daemon → Python (Streaming Pattern) // ═══════════════════════════════════════════════════════════════════ - /// Maps request_id → output channel for interactive sessions + /// Maps session_id → output channel for interactive sessions /// Session output arrives incrementally from daemon, gets forwarded to Python. /// Pattern: Streaming (continuous flow of data chunks) sessions: Arc>>>, From ed1e7b9e166f22711345bf6b2e283f447d3ea2d5 Mon Sep 17 00:00:00 2001 From: Kante Yin Date: Tue, 2 Jun 2026 09:17:18 +0100 Subject: [PATCH 4/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- server/src/registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/registry.rs b/server/src/registry.rs index a62df72..f754190 100644 --- a/server/src/registry.rs +++ b/server/src/registry.rs @@ -25,7 +25,7 @@ pub struct DaemonConnection { // ═══════════════════════════════════════════════════════════════════ // Incoming: Daemon → Python (Request/Response Pattern) // ═══════════════════════════════════════════════════════════════════ - /// Maps request_id → response channel for execute_command() calls + /// Maps request_id → response channel for exec() calls /// When Python sends a command, it registers a oneshot channel here and waits. /// When daemon responds with CommandOutput, we look up and send result back. /// Pattern: Request/Response (each command gets exactly one response) From 4de742566b616ad4ef0f8b1d3076766659050e36 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Tue, 2 Jun 2026 16:23:24 +0800 Subject: [PATCH 5/5] address comments Signed-off-by: kerthcet --- README.md | 2 +- docs/PROTOCOL.md | 2 +- python/sandd/__init__.py | 2 +- server/src/lib.rs | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 8bf5673..f3a0e38 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ Rust-powered WebSocket server with Python API for secure command execution in is │ │ from sandd import Server │ │ │ │ │ │ │ │ server = Server("0.0.0.0", 8765) │ │ -│ │ result = server.exec( │ │ +│ │ result = server.exec( │ │ │ │ "daemon-1", "ls -la" │ │ │ │ ) │ │ │ └────────────────────────────────────┘ │ diff --git a/docs/PROTOCOL.md b/docs/PROTOCOL.md index 559f7ee..5a4ceab 100644 --- a/docs/PROTOCOL.md +++ b/docs/PROTOCOL.md @@ -424,7 +424,7 @@ All messages are JSON with a `type` field indicating the message type: - Agent detects closed connection - Registry removes daemon - All pending commands fail - - Terminate + - Sessions terminate ``` ## Heartbeat & Connection Monitoring diff --git a/python/sandd/__init__.py b/python/sandd/__init__.py index 16e6f67..4285203 100644 --- a/python/sandd/__init__.py +++ b/python/sandd/__init__.py @@ -330,7 +330,7 @@ def _run_interactive(self, session: Session) -> None: Args: session: Session to make interactive """ - print("Entering interactive session session. Press Ctrl+D to exit.") + print("Entering interactive session. Press Ctrl+D to exit.") print("-" * 60) # Set terminal to raw mode on Unix systems diff --git a/server/src/lib.rs b/server/src/lib.rs index 8c41f85..25a85af 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -137,11 +137,11 @@ impl Server { term, }; + conn.register_session(session_id.clone(), tx); + conn.send_message(msg) .map_err(|e| PyRuntimeError::new_err(format!("Failed to start session: {}", e)))?; - conn.register_session(session_id.clone(), tx); - Ok(Session { session_id, daemon_id,