Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .coderabbit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
reviews:
request_changes_workflow: true
pre_merge_checks:
enabled: true
57 changes: 57 additions & 0 deletions crates/fbuild-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,63 @@ impl SerialMonitor {
timeout
)))
}

/// Reset the device via the daemon's DTR/RTS reset endpoint.
///
/// Sends POST /api/reset to the daemon, which preempts any active
/// serial monitor session, toggles DTR/RTS to reset the device,
/// then clears preemption so monitors can reconnect.
///
/// Works whether or not __enter__ has been called — the reset goes
/// through the daemon's HTTP API, not the WebSocket session.
///
/// Args:
/// board: Board identifier (e.g. "esp32s3", "teensy40").
/// Determines the platform-specific reset sequence.
/// If None, a generic DTR toggle is used.
///
/// Returns:
/// True if reset succeeded, False otherwise.
#[pyo3(signature = (board=None))]
fn reset_device(&self, board: Option<String>) -> PyResult<bool> {
let url = format!("{}/api/reset", fbuild_paths::get_daemon_url());

#[derive(Serialize)]
struct ResetPayload {
port: String,
#[serde(skip_serializing_if = "Option::is_none")]
board: Option<String>,
}

let payload = ResetPayload {
port: self.port.clone(),
board,
};

let resp = reqwest::blocking::Client::new()
.post(&url)
.json(&payload)
.timeout(std::time::Duration::from_secs(10))
.send()
.map_err(|e| {
pyo3::exceptions::PyConnectionError::new_err(format!(
"failed to send reset request to daemon: {}",
e
))
})?;

let body: serde_json::Value = resp.json().map_err(|e| {
pyo3::exceptions::PyRuntimeError::new_err(format!(
"failed to parse reset response: {}",
e
))
})?;

Ok(body
.get("success")
.and_then(|v| v.as_bool())
.unwrap_or(false))
}
}

impl SerialMonitor {
Expand Down
27 changes: 27 additions & 0 deletions tests/test_serial_reset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""RED tests: SerialMonitor.reset_device() must exist and return bool.

These tests verify the PyO3 binding exposes reset_device() on SerialMonitor.
They will FAIL until the Rust implementation is added.
"""

from __future__ import annotations

def test_serial_monitor_has_reset_device() -> None:
"""SerialMonitor must expose a reset_device method."""
from fbuild._native import SerialMonitor

assert hasattr(SerialMonitor, "reset_device"), (
"SerialMonitor is missing reset_device method. "
"Add #[pyo3] fn reset_device(&self, board: Option<String>) -> PyResult<bool> "
"to the SerialMonitor impl block in crates/fbuild-python/src/lib.rs"
)


def test_serial_monitor_reset_device_is_callable() -> None:
"""reset_device must be callable (not just an attribute)."""
from fbuild._native import SerialMonitor

mon = SerialMonitor(port="COM13", baud_rate=115200)
assert callable(getattr(mon, "reset_device", None)), (
"SerialMonitor.reset_device exists but is not callable"
)
72 changes: 72 additions & 0 deletions tests/test_serial_reset_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Integration test: reset a real ESP32-S3 via the fbuild daemon.

Requires:
- fbuild daemon running
- ESP32-S3 device on COM13
- Device flashed with AutoResearch firmware

Skip conditions checked at import time.
"""

from __future__ import annotations

import time

import pytest
import serial.tools.list_ports as list_ports


def _port_exists(port: str) -> bool:
"""Check if a serial port is enumerated (without opening it).

Uses pyserial's port enumeration instead of opening the port,
which would conflict with a daemon holding the port open.
"""
return any(info.device == port for info in list_ports.comports())


def _daemon_running() -> bool:
"""Check if the fbuild daemon is reachable."""
try:
import urllib.request

urllib.request.urlopen("http://127.0.0.1:8765/health", timeout=2)
return True
except Exception:
return False
Comment thread
zackees marked this conversation as resolved.


@pytest.mark.skipif(not _port_exists("COM13"), reason="COM13 not available")
@pytest.mark.skipif(not _daemon_running(), reason="fbuild daemon not running")
def test_reset_esp32s3_and_read_output() -> None:
"""After reset_device, a fresh monitor should capture device output."""
from fbuild._native import SerialMonitor

# Reset first (no monitor attached — avoids preemption race)
mon = SerialMonitor(port="COM13", baud_rate=115200)
result = mon.reset_device(board="esp32s3")
assert result is True, "reset_device should return True on success"

# Wait for reboot
time.sleep(3.0)

# Now attach a fresh monitor and read output
with SerialMonitor(port="COM13", baud_rate=115200) as mon2:
lines = mon2.read_lines(timeout=5.0)
assert len(lines) > 0, (
"Device should produce serial output after reset. "
"Got 0 lines — device may not have rebooted."
)


@pytest.mark.skipif(not _port_exists("COM13"), reason="COM13 not available")
@pytest.mark.skipif(not _daemon_running(), reason="fbuild daemon not running")
def test_reset_device_without_enter() -> None:
"""reset_device should work without calling __enter__ first."""
from fbuild._native import SerialMonitor

mon = SerialMonitor(port="COM13", baud_rate=115200)
# Should NOT require __enter__ — reset goes through HTTP, not WebSocket
result = mon.reset_device(board="esp32s3")
assert isinstance(result, bool), "reset_device must return a bool"
assert result is True, "reset_device should succeed for a connected ESP32-S3"
Loading