From d111507083a59e990d4578a9cb22325dfe77d4e5 Mon Sep 17 00:00:00 2001 From: Dmitry Ilyin Date: Wed, 29 Apr 2026 12:01:51 +0300 Subject: [PATCH 1/2] Fix flaky boot ROM handshake: retry, drain-to-silence, distinct errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three resilience fixes for the transient handshake failure observed on hi3516av200 (one in N flashes failed at "Failed to send DDR step" with no code change between successes): 1. Distinct DDR-step error attribution. _send_ddr_step now returns a phase-specific error string instead of a bool, so callers can tell handshake-timeout from PRESTEP0/DDRSTEP0/ PRESTEP1 frame failures. Previously every failure surfaced as "Failed to send DDR step" — misleading when the actual failure was the sendFrameForStart handshake never latching. 2. Drain-until-silent replaces fixed sleep + flush. Transport gets a drain_until_silent(quiet_period, max_wait) helper that loops reading until the line stays quiet long enough. Session uses it after power_cycle. More robust than a fixed 2s sleep + tcflush — a powered-off chip can't transmit, so silence is a deterministic ready signal, and we don't lose late-arriving stale bytes that beat the flush. 3. Retry the handshake/DDR phase on transient failure. RecoverySession.run wraps power_cycle + handshake + DDR-init in a retry loop (default 2 attempts) when programmatic power control is available. Past-DDR failures are not retried since they're slow and rarely transient. Without power control, retries are pointless and disabled. 11 regression tests in test_handshake_resilience.py cover all three fixes — DDR-step error attribution, drain_until_silent quiet-period / max-wait / idle-line behavior, and session retry behavior across all relevant cases (transient handshake, post-DDR failure, no power control, retries exhausted). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/defib/protocol/hisilicon_standard.py | 27 +- src/defib/recovery/session.py | 194 +++++++---- src/defib/transport/base.py | 36 ++ tests/test_handshake_resilience.py | 417 +++++++++++++++++++++++ 4 files changed, 598 insertions(+), 76 deletions(-) create mode 100644 tests/test_handshake_resilience.py diff --git a/src/defib/protocol/hisilicon_standard.py b/src/defib/protocol/hisilicon_standard.py index 9e49c57..d94ba05 100644 --- a/src/defib/protocol/hisilicon_standard.py +++ b/src/defib/protocol/hisilicon_standard.py @@ -313,12 +313,16 @@ async def _send_ddr_step( transport: Transport, profile: SoCProfile, on_progress: Callable[[ProgressEvent], None] | None = None, - ) -> bool: + ) -> str | None: """Send DDR initialization steps to SRAM. Matches HiTool's exact sequence: 1. sendFrameForStart: blast HEAD(64, ADDRESS0) until ACK (handshake) 2. For each step (PRESTEP0, DDRSTEP0, PRESTEP1): HEAD+DATA+TAIL + + Returns ``None`` on success, or a string describing which sub-step + failed. Distinguishes handshake failure from frame-send failures + so error messages aren't misleading. """ _emit(on_progress, ProgressEvent( stage=Stage.DDR_INIT, bytes_sent=0, bytes_total=64, @@ -331,7 +335,7 @@ async def _send_ddr_step( # sendFrameForStart: blast HEAD as handshake (HiTool approach) if prestep is not None: if not await self._send_frame_for_start(transport, profile): - return False + return "handshake (sendFrameForStart) timed out" # PRESTEP0: HEAD+DATA+TAIL (HEAD sent again per HiTool's loop) logger.debug( @@ -339,9 +343,9 @@ async def _send_ddr_step( addr, len(prestep), ) if not await self._send_head(transport, 64, addr): - return False + return "PRESTEP0 HEAD frame not ACKed" if not await self._send_data(transport, 1, prestep): - return False + return "PRESTEP0 DATA frame not ACKed" if not await self._send_tail(transport, 2): logger.debug("PRESTEP0 TAIL not ACKed (non-fatal)") @@ -352,10 +356,10 @@ async def _send_ddr_step( ) ddr_data = profile.ddr_step_data if not await self._send_head(transport, 64, addr): - return False + return "DDRSTEP0 HEAD frame not ACKed" if not await self._send_data(transport, 1, ddr_data): - return False + return "DDRSTEP0 DATA frame not ACKed" if not await self._send_tail(transport, 2): logger.debug("DDRSTEP0 TAIL not ACKed (non-fatal)") @@ -368,9 +372,9 @@ async def _send_ddr_step( addr, len(prestep1), ) if not await self._send_head(transport, len(prestep1), addr): - return False + return "PRESTEP1 HEAD frame not ACKed" if not await self._send_data(transport, 1, prestep1): - return False + return "PRESTEP1 DATA frame not ACKed" if not await self._send_tail(transport, 2): logger.debug("PRESTEP1 TAIL not ACKed (non-fatal)") @@ -378,7 +382,7 @@ async def _send_ddr_step( stage=Stage.DDR_INIT, bytes_sent=64, bytes_total=64, message="DDR step complete", )) - return True + return None @staticmethod def _detect_spl_size(firmware: bytes, profile_max: int) -> int: @@ -537,10 +541,11 @@ async def send_firmware( profile.name, len(firmware), spl_override is not None, ) - if not await self._send_ddr_step(transport, profile, on_progress): + ddr_err = await self._send_ddr_step(transport, profile, on_progress) + if ddr_err is not None: return RecoveryResult( success=False, stages_completed=stages, - error="Failed to send DDR step", + error=f"DDR init failed: {ddr_err}", ) stages.append(Stage.DDR_INIT) diff --git a/src/defib/recovery/session.py b/src/defib/recovery/session.py index 4e857f2..3fa0602 100644 --- a/src/defib/recovery/session.py +++ b/src/defib/recovery/session.py @@ -67,6 +67,7 @@ async def run( on_progress: Callable[[ProgressEvent], None] | None = None, on_log: Callable[[LogEvent], None] | None = None, send_break: bool = False, + max_handshake_attempts: int = 2, ) -> RecoveryResult: """Execute the full recovery: handshake → firmware transfer. @@ -75,6 +76,9 @@ async def run( on_progress: Callback for progress events. on_log: Callback for log events. send_break: If True, send Ctrl-C after upload to enter U-Boot console. + max_handshake_attempts: Retry the power-cycle + handshake + DDR-init + phase up to this many times if the transient handshake fails. + Only applies when programmatic power control is configured. Returns: RecoveryResult with success status. @@ -97,93 +101,153 @@ async def run( isinstance(protocol, HiSiliconStandard) and protocol.uses_frame_blast_handshake ) - # Power cycle - if self._power and self._poe_port: - if on_log: + # Retry the transient phase (power-cycle + handshake + DDR init) up + # to ``max_handshake_attempts`` times. Only meaningful when we have + # programmatic power control — manual power cycling would require + # human re-intervention. + can_retry = bool(self._power and self._poe_port) + attempts = max_handshake_attempts if can_retry else 1 + + firmware = self._load_firmware() + handshake: HandshakeResult | None = None + last_attempt_error: str | None = None + + for attempt in range(1, attempts + 1): + if attempt > 1 and on_log: on_log(LogEvent( - level="info", - message=f"Power-cycling device on {self._poe_port}...", - )) - if on_progress: - on_progress(ProgressEvent( - stage=Stage.POWER_CYCLE, bytes_sent=0, bytes_total=1, - message=f"Power-cycling {self._poe_port}...", + level="warn", + message=( + f"Retrying handshake (attempt {attempt}/{attempts}) — " + f"previous error: {last_attempt_error}" + ), )) - try: - await self._power.power_cycle(self._poe_port) - except Exception as e: - elapsed = (time.monotonic() - start_time) * 1000 + # Power cycle + if self._power and self._poe_port: if on_log: - on_log(LogEvent(level="error", message=f"Power cycle failed: {e}")) - return RecoveryResult( - success=False, - error=f"Power cycle failed: {e}", - elapsed_ms=elapsed, + on_log(LogEvent( + level="info", + message=f"Power-cycling device on {self._poe_port}...", + )) + if on_progress: + on_progress(ProgressEvent( + stage=Stage.POWER_CYCLE, bytes_sent=0, bytes_total=1, + message=f"Power-cycling {self._poe_port}...", + )) + + try: + await self._power.power_cycle(self._poe_port) + except Exception as e: + elapsed = (time.monotonic() - start_time) * 1000 + if on_log: + on_log(LogEvent(level="error", message=f"Power cycle failed: {e}")) + return RecoveryResult( + success=False, + error=f"Power cycle failed: {e}", + elapsed_ms=elapsed, + ) + + # Drain serial until line stays quiet for 500ms. Replaces a + # fixed 2-second sleep + flush_input — that approach can miss + # late-arriving stale bytes (the camera may still be powering + # down when the flush runs) and isn't robust against pyserial + # buffer caveats. Quiet-detection is deterministic: a + # powered-off chip cannot transmit. + discarded = await transport.drain_until_silent( + quiet_period=0.5, max_wait=5.0, ) + if discarded and on_log: + on_log(LogEvent( + level="info", + message=f"Drained {discarded} stale bytes from serial", + )) - # Wait for power to actually be cut, then flush any warm-reboot - # garbage from the serial buffer. - import asyncio - await asyncio.sleep(2.0) - await transport.flush_input() + if on_progress: + on_progress(ProgressEvent( + stage=Stage.POWER_CYCLE, bytes_sent=1, bytes_total=1, + message="Power cycle complete", + )) - if on_progress: - on_progress(ProgressEvent( - stage=Stage.POWER_CYCLE, bytes_sent=1, bytes_total=1, - message="Power cycle complete", - )) + # Handshake — skip for frame-blast chips (handled inside send_firmware) + if frame_blast: + if on_log: + on_log(LogEvent( + level="info", + message=f"Using sendFrameForStart handshake for {self.chip}", + )) + handshake = HandshakeResult(success=True, message="Frame-blast (deferred)") + elif self._power and self._poe_port: + # Power-cycle mode with 0x20→0xAA handshake: flood 0xAA + if on_log: + on_log(LogEvent( + level="info", + message=f"Starting {self._protocol_cls.name()} handshake for {self.chip}", + )) + import asyncio as _asyncio + handshake_task = _asyncio.create_task( + protocol.handshake(transport, on_progress) + ) + handshake = await handshake_task + else: + # Manual power cycling — just start handshake and wait + if on_log: + on_log(LogEvent( + level="info", + message=f"Starting {self._protocol_cls.name()} handshake for {self.chip}", + )) + handshake = await protocol.handshake(transport, on_progress) - # Handshake — skip for frame-blast chips (handled inside send_firmware) - if frame_blast: - if on_log: - on_log(LogEvent( - level="info", - message=f"Using sendFrameForStart handshake for {self.chip}", - )) - handshake = HandshakeResult(success=True, message="Frame-blast (deferred)") - elif self._power and self._poe_port: - # Power-cycle mode with 0x20→0xAA handshake: flood 0xAA + # If non-frame-blast handshake failed and we can retry, try again + if not handshake.success: + last_attempt_error = f"handshake: {handshake.message}" + if attempt < attempts: + continue + break + + # Handshake OK (or deferred for frame-blast). Send firmware. if on_log: on_log(LogEvent( level="info", - message=f"Starting {self._protocol_cls.name()} handshake for {self.chip}", + message=f"Sending {len(firmware)} bytes of firmware...", )) - import asyncio - handshake_task = asyncio.create_task( - protocol.handshake(transport, on_progress) + send_result = await protocol.send_firmware( + transport, firmware, on_progress, + ) + if send_result.success: + # Mutate handshake variable so post-loop code sees success. + handshake_succeeded_result = send_result + break + + # Firmware send failed — only retry if it failed in the early + # handshake/DDR phase (frame-blast handshake or DDR init). + # Once we're past DDR init, retrying is unlikely to help and + # costs another 30+ seconds of upload time. + err = send_result.error or "" + is_early = ( + Stage.DDR_INIT not in send_result.stages_completed ) - handshake = await handshake_task + if is_early and attempt < attempts: + last_attempt_error = err + continue + + # Either past-DDR failure or final attempt — bail out. + handshake_succeeded_result = send_result + break else: - # Manual power cycling — just start handshake and wait + # Loop completed without break — all retries exhausted on handshake + elapsed = (time.monotonic() - start_time) * 1000 if on_log: on_log(LogEvent( - level="info", - message=f"Starting {self._protocol_cls.name()} handshake for {self.chip}", + level="error", + message=f"Handshake failed after {attempts} attempts: {last_attempt_error}", )) - handshake = await protocol.handshake(transport, on_progress) - if not handshake.success: - elapsed = (time.monotonic() - start_time) * 1000 - if on_log: - on_log(LogEvent(level="error", message=f"Handshake failed: {handshake.message}")) return RecoveryResult( success=False, - error=f"Handshake failed: {handshake.message}", + error=f"Handshake failed: {last_attempt_error}", elapsed_ms=elapsed, ) - # Note: handshake success is already reported via on_progress, - # so we don't duplicate it here via on_log. - - # Firmware transfer - firmware = self._load_firmware() - if on_log: - on_log(LogEvent( - level="info", - message=f"Sending {len(firmware)} bytes of firmware...", - )) - - result = await protocol.send_firmware(transport, firmware, on_progress) + result = handshake_succeeded_result result.elapsed_ms = (time.monotonic() - start_time) * 1000 # Send break (Ctrl-C) to interrupt U-Boot autoboot diff --git a/src/defib/transport/base.py b/src/defib/transport/base.py index c610f36..c18d483 100644 --- a/src/defib/transport/base.py +++ b/src/defib/transport/base.py @@ -60,6 +60,42 @@ async def drain_input(self) -> bytes: return await self.read(waiting, timeout=0.1) return b"" + async def drain_until_silent( + self, + quiet_period: float = 0.5, + max_wait: float = 5.0, + ) -> int: + """Read and discard incoming data until the line stays quiet. + + Reads bytes with short timeouts, restarting the quiet timer whenever + any data arrives. Returns once no data has been seen for + ``quiet_period`` seconds, or once ``max_wait`` total seconds elapsed. + + This is more reliable than a fixed sleep + ``flush_input`` because + ``flush_input`` only drains the buffer at one moment in time — bytes + arriving immediately after survive. Reading until silence guarantees + a clean state when the chip is physically off (which can't transmit). + + Returns the number of bytes discarded. + """ + loop = asyncio.get_event_loop() + deadline = loop.time() + max_wait + last_data_at = loop.time() + discarded = 0 + + await self.flush_input() + while loop.time() < deadline: + if loop.time() - last_data_at >= quiet_period: + return discarded + try: + chunk = await self.read(256, timeout=0.05) + if chunk: + discarded += len(chunk) + last_data_at = loop.time() + except TransportTimeout: + continue + return discarded + async def read_exact(self, size: int, timeout: float | None = None) -> bytes: """Read exactly `size` bytes, raising TransportTimeout on timeout.""" buf = bytearray() diff --git a/tests/test_handshake_resilience.py b/tests/test_handshake_resilience.py new file mode 100644 index 0000000..3786628 --- /dev/null +++ b/tests/test_handshake_resilience.py @@ -0,0 +1,417 @@ +"""Regression tests for the handshake-resilience fixes. + +Three bugs were fixed together: + +1. The DDR-step error message used to claim "Failed to send DDR step" even + when the actual failure was the sendFrameForStart handshake or one of the + PRESTEP/DDRSTEP frame sends. Now each sub-step returns a distinct + diagnostic so you can tell handshake-timeout from frame-not-ACKed. + +2. Session previously waited a fixed ``sleep(2.0)`` then ``flush_input()`` + after a power cycle. That misses bytes arriving during/after the flush + (pyserial's tcflush only drains the kernel buffer at one moment). We now + use ``Transport.drain_until_silent`` which loops until the line stays + quiet for a configurable period — robust because a powered-off chip can't + transmit. + +3. A single transient handshake failure used to fail the entire install. + Session now retries the power-cycle + handshake + DDR-init phase up to + ``max_handshake_attempts`` times when programmatic power control is + available. Past-DDR failures are not retried (slow, rarely transient). +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from defib.power.base import PowerController +from defib.protocol.hisilicon_standard import HiSiliconStandard +from defib.recovery.events import LogEvent, RecoveryResult, Stage +from defib.recovery.session import RecoverySession +from defib.transport.base import TransportTimeout +from defib.transport.mock import MockTransport + + +# ----------------------------------------------------------------------------- +# Bug 1: distinct DDR-step error attribution +# ----------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_send_ddr_step_returns_handshake_message_on_handshake_timeout( + monkeypatch, +): + """If sendFrameForStart times out, the error message names the handshake + rather than masquerading as a generic DDR failure.""" + from defib.profiles.loader import load_profile + + proto = HiSiliconStandard() + proto.set_profile(load_profile("hi3516av200")) # uses prestep + frame-blast + + async def fake_handshake(self, transport, profile): # noqa: ARG001 + return False + + monkeypatch.setattr( + HiSiliconStandard, "_send_frame_for_start", fake_handshake + ) + + transport = MockTransport() + err = await proto._send_ddr_step(transport, proto._profile) + assert err is not None + assert "handshake" in err.lower() + assert "sendFrameForStart" in err + + +@pytest.mark.asyncio +async def test_send_ddr_step_returns_prestep0_message_on_head_failure( + monkeypatch, +): + """If the PRESTEP0 HEAD fails, the error names PRESTEP0 specifically — + not a generic "Failed to send DDR step".""" + from defib.profiles.loader import load_profile + + proto = HiSiliconStandard() + proto.set_profile(load_profile("hi3516av200")) + + async def fake_handshake(self, transport, profile): # noqa: ARG001 + return True + + async def fake_send_head(self, transport, length, address): # noqa: ARG001 + return False # HEAD frame fails to ACK + + monkeypatch.setattr( + HiSiliconStandard, "_send_frame_for_start", fake_handshake + ) + monkeypatch.setattr(HiSiliconStandard, "_send_head", fake_send_head) + + transport = MockTransport() + err = await proto._send_ddr_step(transport, proto._profile) + assert err is not None + assert "PRESTEP0" in err + assert "HEAD" in err + + +@pytest.mark.asyncio +async def test_send_ddr_step_returns_none_on_success(monkeypatch): + """Successful DDR step returns None (no error).""" + from defib.profiles.loader import load_profile + + proto = HiSiliconStandard() + proto.set_profile(load_profile("hi3516av200")) + + async def ok(*a, **kw): + return True + + monkeypatch.setattr(HiSiliconStandard, "_send_frame_for_start", ok) + monkeypatch.setattr(HiSiliconStandard, "_send_head", ok) + monkeypatch.setattr(HiSiliconStandard, "_send_data", ok) + monkeypatch.setattr(HiSiliconStandard, "_send_tail", ok) + + transport = MockTransport() + err = await proto._send_ddr_step(transport, proto._profile) + assert err is None + + +@pytest.mark.asyncio +async def test_send_firmware_propagates_ddr_error_message(monkeypatch): + """send_firmware's RecoveryResult.error includes the specific phase.""" + from defib.profiles.loader import load_profile + + proto = HiSiliconStandard() + proto.set_profile(load_profile("hi3516av200")) + + async def handshake_fail(self, transport, profile): # noqa: ARG001 + return False + + monkeypatch.setattr( + HiSiliconStandard, "_send_frame_for_start", handshake_fail + ) + + transport = MockTransport() + result = await proto.send_firmware(transport, b"\x00" * 1024) + assert not result.success + assert "handshake" in (result.error or "").lower() + assert "DDR init failed" in (result.error or "") + + +# ----------------------------------------------------------------------------- +# Bug 2: drain_until_silent on Transport +# ----------------------------------------------------------------------------- + + +class _DripTransport(MockTransport): + """Mock that keeps producing bytes for a configurable duration so we can + test that drain_until_silent really waits for silence.""" + + def __init__(self, drip_until: float, byte_per_call: bytes = b"x") -> None: + super().__init__(flush_clears_buffer=True) + self._drip_until = drip_until + self._byte_per_call = byte_per_call + + async def read(self, size: int, timeout: float | None = None) -> bytes: + loop = asyncio.get_event_loop() + if loop.time() < self._drip_until: + await asyncio.sleep(0.01) + return self._byte_per_call + # Past drip deadline — behave like idle line. + if timeout is not None: + await asyncio.sleep(min(timeout, 0.001)) + raise TransportTimeout("idle") + + +@pytest.mark.asyncio +async def test_drain_until_silent_returns_after_quiet_period(): + """drain_until_silent returns once the line stays quiet long enough.""" + loop = asyncio.get_event_loop() + drip_for = 0.3 + transport = _DripTransport(drip_until=loop.time() + drip_for) + + start = loop.time() + discarded = await transport.drain_until_silent( + quiet_period=0.1, max_wait=2.0, + ) + elapsed = loop.time() - start + + # Should have drained at least some bytes during the drip window. + assert discarded > 0 + # Should have returned shortly after drip stopped (~drip_for + quiet_period). + assert elapsed < 1.0, f"took too long: {elapsed:.2f}s" + assert elapsed >= drip_for, f"returned before drip stopped: {elapsed:.2f}s" + + +@pytest.mark.asyncio +async def test_drain_until_silent_returns_immediately_when_idle(): + """Idle transport returns near-instantly (within the quiet period).""" + transport = MockTransport() + loop = asyncio.get_event_loop() + + start = loop.time() + discarded = await transport.drain_until_silent( + quiet_period=0.05, max_wait=2.0, + ) + elapsed = loop.time() - start + + assert discarded == 0 + assert elapsed < 0.5, f"idle drain took too long: {elapsed:.2f}s" + + +@pytest.mark.asyncio +async def test_drain_until_silent_respects_max_wait(): + """If the line never goes quiet, drain_until_silent gives up at max_wait.""" + loop = asyncio.get_event_loop() + transport = _DripTransport(drip_until=loop.time() + 10.0) + + start = loop.time() + await transport.drain_until_silent(quiet_period=1.0, max_wait=0.3) + elapsed = loop.time() - start + + # Should bail out at max_wait, not wait forever. + assert elapsed < 1.0, f"did not honor max_wait: {elapsed:.2f}s" + + +# ----------------------------------------------------------------------------- +# Bug 3: handshake retry in RecoverySession +# ----------------------------------------------------------------------------- + + +class _FakePowerController(PowerController): + """Power controller that records calls; useful for retry tests.""" + + def __init__(self) -> None: + self.cycle_count = 0 + self.cycle_ports: list[str] = [] + + @classmethod + def name(cls) -> str: + return "fake" + + async def power_off(self, port: str) -> None: + return None + + async def power_on(self, port: str) -> None: + return None + + async def power_cycle( + self, port: str, off_duration: float = 3.0, + ) -> None: # noqa: ARG002 + self.cycle_count += 1 + self.cycle_ports.append(port) + + async def close(self) -> None: + return None + + +class _ScriptedTransport(MockTransport): + """Transport whose protocol.send_firmware result is scripted.""" + + def __init__(self) -> None: + super().__init__(flush_clears_buffer=False) + + async def drain_until_silent( + self, + quiet_period: float = 0.5, # noqa: ARG002 + max_wait: float = 5.0, # noqa: ARG002 + ) -> int: + return 0 + + +@pytest.mark.asyncio +async def test_session_retries_handshake_on_transient_failure(monkeypatch): + """When DDR-init fails on the first attempt but succeeds on the second, + the session retries and ultimately succeeds.""" + transport = _ScriptedTransport() + power = _FakePowerController() + + call_count = {"send_firmware": 0} + + async def scripted_send_firmware(self, transport, firmware, on_progress=None, + spl_override=None, payload_label="U-Boot"): + # noqa: ARG001 + call_count["send_firmware"] += 1 + if call_count["send_firmware"] == 1: + return RecoveryResult( + success=False, + error="DDR init failed: handshake (sendFrameForStart) timed out", + stages_completed=[], + ) + return RecoveryResult( + success=True, + stages_completed=[Stage.DDR_INIT, Stage.SPL, Stage.UBOOT, Stage.COMPLETE], + ) + + monkeypatch.setattr( + HiSiliconStandard, "send_firmware", scripted_send_firmware, + ) + + session = RecoverySession( + chip="hi3516av200", + firmware_data=b"\x00" * 1024, + power_controller=power, + poe_port="ether8", + ) + + logs: list[LogEvent] = [] + result = await session.run( + transport, + on_log=logs.append, + max_handshake_attempts=2, + ) + + assert result.success, f"expected success, got error: {result.error}" + assert call_count["send_firmware"] == 2 + assert power.cycle_count == 2 # one cycle per attempt + # A retry log should have been emitted + retry_logs = [e for e in logs if "retry" in e.message.lower()] + assert retry_logs, "expected retry log message" + + +@pytest.mark.asyncio +async def test_session_does_not_retry_post_ddr_failures(monkeypatch): + """If DDR succeeded but SPL/U-Boot failed, retrying won't help — and would + waste 30+ seconds of upload — so we bail out.""" + transport = _ScriptedTransport() + power = _FakePowerController() + + call_count = {"send_firmware": 0} + + async def post_ddr_failure(self, transport, firmware, on_progress=None, + spl_override=None, payload_label="U-Boot"): + # noqa: ARG001 + call_count["send_firmware"] += 1 + return RecoveryResult( + success=False, + error="Failed to send SPL", + stages_completed=[Stage.DDR_INIT], + ) + + monkeypatch.setattr( + HiSiliconStandard, "send_firmware", post_ddr_failure, + ) + + session = RecoverySession( + chip="hi3516av200", + firmware_data=b"\x00" * 1024, + power_controller=power, + poe_port="ether8", + ) + + result = await session.run(transport, max_handshake_attempts=3) + + assert not result.success + # Only ONE attempt — no retry past DDR + assert call_count["send_firmware"] == 1 + assert power.cycle_count == 1 + + +@pytest.mark.asyncio +async def test_session_no_retry_without_power_control(monkeypatch): + """Without programmatic power control, retries are pointless because the + user would need to physically re-cycle. Should attempt only once.""" + transport = _ScriptedTransport() + + call_count = {"send_firmware": 0} + + async def always_fail(self, transport, firmware, on_progress=None, + spl_override=None, payload_label="U-Boot"): + # noqa: ARG001 + call_count["send_firmware"] += 1 + return RecoveryResult( + success=False, + error="DDR init failed: handshake (sendFrameForStart) timed out", + stages_completed=[], + ) + + async def fake_handshake(self, transport, on_progress=None): # noqa: ARG002 + from defib.recovery.events import HandshakeResult + return HandshakeResult(success=True, message="ok") + + monkeypatch.setattr(HiSiliconStandard, "send_firmware", always_fail) + monkeypatch.setattr(HiSiliconStandard, "handshake", fake_handshake) + + session = RecoverySession( + chip="hi3516cv300", # non-frame-blast chip so handshake is separate + firmware_data=b"\x00" * 1024, + ) + + result = await session.run(transport, max_handshake_attempts=5) + + assert not result.success + assert call_count["send_firmware"] == 1 + + +@pytest.mark.asyncio +async def test_session_max_attempts_respected(monkeypatch): + """All attempts fail → final result reports failure with last error, + and we don't keep retrying forever.""" + transport = _ScriptedTransport() + power = _FakePowerController() + + call_count = {"send_firmware": 0} + + async def always_fail(self, transport, firmware, on_progress=None, + spl_override=None, payload_label="U-Boot"): + # noqa: ARG001 + call_count["send_firmware"] += 1 + return RecoveryResult( + success=False, + error="DDR init failed: PRESTEP0 HEAD frame not ACKed", + stages_completed=[], + ) + + monkeypatch.setattr(HiSiliconStandard, "send_firmware", always_fail) + + session = RecoverySession( + chip="hi3516av200", + firmware_data=b"\x00" * 1024, + power_controller=power, + poe_port="ether8", + ) + + result = await session.run(transport, max_handshake_attempts=3) + + assert not result.success + assert "PRESTEP0" in (result.error or "") + assert call_count["send_firmware"] == 3 + assert power.cycle_count == 3 From d9458cd20a1bf0f6505af5049b1ab76dd65cf2d2 Mon Sep 17 00:00:00 2001 From: Dmitry Ilyin Date: Wed, 29 Apr 2026 12:41:52 +0300 Subject: [PATCH 2/2] Set bootargs explicitly for NAND install to match rootfs format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit defib install previously set ``mtdparts`` and ``bootcmd`` but relied on U-Boot's compiled-in default ``bootargs``. Recent OpenIPC U-Boot builds default to: root=/dev/ubiblock0_0 rootfstype=squashfs ubi.block=0,0 init=/init Even when the actual ``rootfs.ubi.hi3516av200`` from the same release contains UBIFS. Result: kernel boots the wrong filesystem driver and panics with "Unable to mount root fs ... tried squashfs". Fix: defib now ``setenv bootargs`` to match the rootfs format it just wrote — UBIFS bootargs when the rootfs file is a UBI image (extracted to UBIFS), squashfs+ubiblock bootargs otherwise. Bootargs string built in a small ``_nand_bootargs`` helper for unit-testability. Tested on hi3516av200: - Kernel command line now: root=ubi0:rootfs rootfstype=ubifs ... - UBIFS mounts, init runs, dropbear/syslog start, login prompt reached Co-Authored-By: Claude Opus 4.6 (1M context) --- src/defib/cli/app.py | 25 +++++++++++++++ tests/test_nand_install.py | 64 +++++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/src/defib/cli/app.py b/src/defib/cli/app.py index 60b7f3b..0de694c 100644 --- a/src/defib/cli/app.py +++ b/src/defib/cli/app.py @@ -1666,6 +1666,28 @@ def install( } +def _nand_bootargs(rootfs_is_ubi: bool) -> str: + """Return kernel cmdline for OpenIPC NAND install. + + Don't rely on U-Boot's compiled-in default bootargs — recent OpenIPC + builds default to squashfs+ubiblock, which kernel-panics with + "Unable to mount root fs" when the actual rootfs volume contains + UBIFS. Always set bootargs explicitly to match what we wrote. + + The mtdparts substring must agree with the layout we set in U-Boot + (1M boot, 1M env, 8M kernel, rest UBI) so ``ubi.mtd=3`` resolves to + the right partition. + """ + base = ( + "mem=256M console=ttyAMA0,115200 panic=20 ubi.mtd=3,2048 " + "mtdparts=hinand:1024k(boot),1024k(env),8192k(kernel),-(ubi)" + ) + if rootfs_is_ubi: + return f"root=ubi0:rootfs rootfstype=ubifs {base}" + # Squashfs on a UBI block device (modern OpenIPC layout) + return f"root=/dev/ubiblock0_0 rootfstype=squashfs ubi.block=0,0 init=/init {base}" + + async def _install_async( chip: str, firmware_path: str, @@ -2136,6 +2158,9 @@ async def tftp_and_flash( r"setenv bootcmd nand read ${baseaddr} 0x200000 0x800000\; bootm ${baseaddr}", timeout=3.0, ) + # Match bootargs to actual rootfs format — see _nand_bootargs. + bootargs = _nand_bootargs(rootfs_is_ubi=is_ubi_image(rootfs_data)) + await _cmd(f"setenv bootargs {bootargs}", timeout=3.0) else: nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m" if output == "human": diff --git a/tests/test_nand_install.py b/tests/test_nand_install.py index dba5d2d..2063d2f 100644 --- a/tests/test_nand_install.py +++ b/tests/test_nand_install.py @@ -1,6 +1,11 @@ """Tests for NAND flash install support and protocol robustness fixes.""" -from defib.cli.app import _NAND_LAYOUT, _NOR8M_LAYOUT, _NOR16M_LAYOUT +from defib.cli.app import ( + _NAND_LAYOUT, + _NOR8M_LAYOUT, + _NOR16M_LAYOUT, + _nand_bootargs, +) class TestNandLayout: @@ -48,3 +53,60 @@ def test_nor_layouts_unchanged(self): assert _NOR8M_LAYOUT["kernel"] == (0x050000, 0x200000) assert _NOR16M_LAYOUT["boot"] == (0x000000, 0x40000) assert _NOR16M_LAYOUT["kernel"] == (0x050000, 0x300000) + + +class TestNandBootargs: + """Verify defib sets bootargs that match the rootfs format we flash. + + Regression: U-Boot's compiled-in default bootargs is unreliable. Recent + OpenIPC builds default to ``rootfstype=squashfs`` even when the actual + rootfs.ubi contains UBIFS, causing kernel panic ("Unable to mount root + fs"). defib must set bootargs explicitly to match what it wrote. + """ + + def test_ubifs_bootargs_uses_ubi_root(self): + """UBIFS rootfs: use ubi0:rootfs (kernel attaches the UBI volume).""" + args = _nand_bootargs(rootfs_is_ubi=True) + assert "root=ubi0:rootfs" in args + assert "rootfstype=ubifs" in args + # Must not contain squashfs-specific bits + assert "squashfs" not in args + assert "ubiblock" not in args + assert "ubi.block" not in args + + def test_squashfs_bootargs_uses_ubiblock(self): + """Non-UBI rootfs: use ubiblock0_0 with squashfs filesystem type.""" + args = _nand_bootargs(rootfs_is_ubi=False) + assert "root=/dev/ubiblock0_0" in args + assert "rootfstype=squashfs" in args + assert "ubi.block=0,0" in args + # Must not contain UBIFS-specific root + assert "ubi0:rootfs" not in args + + def test_bootargs_matches_mtdparts_layout(self): + """ubi.mtd index must match the partition layout (boot,env,kernel,ubi + → mtd3 = ubi). Otherwise UBI attaches the wrong partition.""" + for is_ubi in (True, False): + args = _nand_bootargs(rootfs_is_ubi=is_ubi) + assert "ubi.mtd=3,2048" in args, args + assert ( + "mtdparts=hinand:1024k(boot),1024k(env)," + "8192k(kernel),-(ubi)" + ) in args, args + + def test_bootargs_includes_console_and_panic(self): + """Standard fields needed for a usable rescue boot — serial console + for debugging and panic-reboot so a broken boot doesn't hang.""" + for is_ubi in (True, False): + args = _nand_bootargs(rootfs_is_ubi=is_ubi) + assert "console=ttyAMA0,115200" in args + assert "panic=20" in args + + def test_bootargs_is_single_line(self): + """No newlines or null bytes — saveenv must store it as a single + bootargs= line.""" + for is_ubi in (True, False): + args = _nand_bootargs(rootfs_is_ubi=is_ubi) + assert "\n" not in args + assert "\x00" not in args + assert "\r" not in args