Skip to content

test(amber): add unit test coverage for logreplay primitives#4840

Merged
Yicong-Huang merged 9 commits into
apache:mainfrom
aglinxinyuan:test-logreplay-primitives-spec
May 4, 2026
Merged

test(amber): add unit test coverage for logreplay primitives#4840
Yicong-Huang merged 9 commits into
apache:mainfrom
aglinxinyuan:test-logreplay-primitives-spec

Conversation

@aglinxinyuan
Copy link
Copy Markdown
Contributor

@aglinxinyuan aglinxinyuan commented May 3, 2026

What changes were proposed in this PR?

Add LogreplayPrimitivesSpec covering five files in amber/.../engine/architecture/logreplay:

ReplayLoggerImpl:

  • Channel-switch logging
  • Same-channel-no-message skip
  • Same-channel-WITH-message still logs (the conjunction in the skip guard)
  • Message append (ProcessingStep + MessageContent)
  • Channel-switch-without-message append
  • markAsReplayDestination exact-ordering pin (in-flight ProcessingStep → ReplayDestination → synthetic trailing ProcessingStep)
  • Drain buffer clear between drains
  • Synthetic ProcessingStep when drain step differs from lastStep

OrderEnforcer trait:

  • Implementable by a custom subclass

ReplayOrderEnforcer:

  • Immediate completion on empty queue (with onComplete fire)
  • startStep skip during construction (boundary-inclusive — uses distinct channels at the boundary so a < vs <= regression is observable)
  • Step-driven channel advance + onComplete fire on the non-empty-to-empty transition (asserts the new active channel is allowed, not just that the old one is rejected)
  • Single onComplete fire when canProceed is called past the end
  • Duplicate-step while loop consumes every queue entry sharing the current step (so a regression to consuming one entry per step would fail)

ReplayLogRecord case-class subtypes — Pekko Serialization round-trip via AmberRuntime.serde (the same path SequentialRecordStorage uses in production):

  • MessageContent(ControlInvocation(...))
  • MessageContent(ReturnInvocation(...)) — both DirectControlMessagePayload subtypes that production actually logs
  • ProcessingStep
  • ReplayDestination

TerminateSignal is intentionally NOT round-tripped: it is an in-memory shutdown sentinel for AsyncReplayLogWriter and is filtered out before records reach storage, so a serde round-trip is not on a real production path. The case-object's compile-time extends ReplayLogRecord already pins subtype membership.

AsyncReplayLogWriter is intentionally skipped — it spawns its own thread and is hard to unit-test without flake. ReplayLogGenerator is likewise skipped since it requires a SequentialRecordStorage with records read from disk.

The suite owns a suite-local Pekko ActorSystem injected into AmberRuntime via reflection (matching CheckpointSubsystemSpec's pattern), with TestKit.shutdownActorSystem teardown in afterAll so no Pekko threads outlive the suite.

Any related issues, documentation, discussions?

Closes #4839

How was this PR tested?

sbt "WorkflowExecutionService/Test/testOnly org.apache.texera.amber.engine.architecture.logreplay.LogreplayPrimitivesSpec"
# → 18 tests, all pass

Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.7)

Add LogreplayPrimitivesSpec covering five files in
amber/.../engine/architecture/logreplay:
- ReplayLoggerImpl: channel-switch logging, same-channel-no-message
  skip, message append, channel-switch-without-message append, drain
  buffer clear, synthetic ProcessingStep when drain step differs from
  lastStep, ReplayDestination append
- OrderEnforcer trait: implementable by a custom subclass
- ReplayOrderEnforcer: immediate completion on empty queue, startStep
  skip during construction, step-driven channel advance via
  canProceed, single onComplete fire even when canProceed is called
  past the end
- ReplayLogRecord case-class subtypes: Serializable + case-class
  equality + TerminateSignal singleton

AsyncReplayLogWriter is intentionally skipped — it spawns its own
thread and is hard to unit-test without flake. ReplayLogGenerator is
likewise skipped since it requires a SequentialRecordStorage with
records read from disk.

Closes apache#4839

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aglinxinyuan aglinxinyuan requested review from Yicong-Huang and Copilot and removed request for Copilot May 3, 2026 04:25
@github-actions github-actions Bot added the engine label May 3, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 42.10%. Comparing base (a4b186b) to head (63e13af).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #4840      +/-   ##
============================================
+ Coverage     42.08%   42.10%   +0.01%     
- Complexity     2170     2176       +6     
============================================
  Files           980      980              
  Lines         36292    36292              
  Branches       3783     3783              
============================================
+ Hits          15273    15280       +7     
+ Misses        20098    20091       -7     
  Partials        921      921              
Flag Coverage Δ
amber 42.99% <ø> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Yicong-Huang Yicong-Huang enabled auto-merge (squash) May 3, 2026 07:11
Copilot AI review requested due to automatic review settings May 3, 2026 20:47
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new Scala unit test suite for Amber's log-replay primitives in the fault-tolerance path. It expands direct coverage for low-level logreplay components that were previously covered mostly through broader integration-style tests.

Changes:

  • Added LogreplayPrimitivesSpec to exercise ReplayLoggerImpl logging, drain behavior, and replay-destination recording.
  • Added focused tests for OrderEnforcer and ReplayOrderEnforcer, including empty-queue completion and step-based progression.
  • Added basic assertions for ReplayLogRecord subtype shape, equality, and serializability.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

aglinxinyuan and others added 2 commits May 3, 2026 19:58
Address four threads on apache#4840:

- ReplayLoggerImpl: add a same-channel-WITH-message test. The skip
  guard is `currentChannelId == channelId && message.isEmpty` (both
  conditions); a regression that drops same-channel messages after the
  first record would previously have passed the spec.

- ReplayOrderEnforcer boundary: replace the all-cidA queue with one
  that uses a distinct channel at the boundary step (cidB at step 1,
  cidC at step 2). A `step < startStep` (vs `<= startStep`) regression
  would now fail because the leftover boundary entry would prevent
  cidC at step 2 from being the consumed/active channel.

- ReplayOrderEnforcer advancement: assert that cidB IS allowed after
  step 1 is consumed (a regression that drains the queue without
  updating the active channel would otherwise stay silent), and that
  the onComplete callback fires exactly once on the non-empty-to-empty
  transition (not at construction, not on subsequent canProceed calls
  past the end).

- ReplayLogRecord serde: replace the `isInstanceOf[Serializable]`
  check with an actual Pekko Serialization round-trip via
  `AmberRuntime.serde` — the same path SequentialRecordStorage uses
  in production. Each subtype (MessageContent, ProcessingStep,
  ReplayDestination, TerminateSignal) is serialized + deserialized
  and compared by value (case-class equality). To do this safely
  without leaking a global ActorSystem, the spec now extends
  BeforeAndAfterAll, owns a suite-local ActorSystem, injects it into
  AmberRuntime via reflection, and shuts it down with
  TestKit.shutdownActorSystem in afterAll (same pattern as
  CheckpointSubsystemSpec).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Address three threads on apache#4840:

- markAsReplayDestination ordering: the previous test only used
  `drained.contains(ReplayDestination(ecm))` on a fresh logger, which
  would still pass if `ReplayDestination` were duplicated or moved
  AFTER the synthetic trailing `ProcessingStep`. Since
  `ReplayLogGenerator` stops replay at `ReplayDestination`, position
  matters. Replaced the contains-check with an exact-sequence
  assertion (`ProcessingStep(cidA, 0L)`, `ReplayDestination(ecm)`,
  trailing `ProcessingStep(cidA, 3L)`).

- ReplayOrderEnforcer duplicate-step `while` loop: the previous spec
  only used distinct step numbers, so a regression to consuming one
  entry per step would have passed. Added a test with two adjacent
  same-step `ProcessingStep` entries and asserted that:
  - after construction at step 0, the LAST same-step entry's channel
    (cidB) is current — not the first (cidA);
  - cidA is no longer accepted at step 0 (proving both step-0 entries
    were consumed by the while loop, not just one);
  - advancing to step 1 still works (cidC consumed, queue empty).

- MessageContent serde uses a real DirectControlMessagePayload: the
  previous round-trip used a `DataFrame`, which `Controller.scala` /
  `DataProcessor.scala` filter out before logging — production never
  serializes a `DataFrame` through `MessageContent`. Switched to a
  `ControlInvocation(EmptyRequest(), AsyncRPCContext(...), ...)` so
  the round-trip exercises the actually-logged payload kind, plus
  added field-level assertions (`methodName`, `commandId`) so a
  payload-level deserialization regression would fail.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…on; fix DPThread reference

Address third Copilot review on apache#4840:

- Drop the TerminateSignal serde round-trip + eq-identity assertion. As
  Copilot pointed out, TerminateSignal is an in-memory shutdown sentinel
  for AsyncReplayLogWriter and is filtered out before records are
  written to storage, so a Pekko-serialization round-trip was not on a
  real production path. Pinning eq-identity post-deserialization
  over-constrained the serializer (a future change that re-created the
  case-object via reflection would have failed even when semantically
  correct). The case-object's compile-time `extends ReplayLogRecord`
  already pins subtype membership; replaced the test with a NOTE block
  documenting why no round-trip exists.

- Add a parallel ReturnInvocation round-trip alongside the existing
  ControlInvocation one. MessageContent in production wraps both
  subtypes (controller/worker logging records any
  DirectControlMessagePayload, and processDCM handles both); the
  previous spec only exercised ControlInvocation, so a serializer
  regression on replayed ReturnInvocation replies would have passed.
  The new test serializes
  ReturnInvocation(commandId = 42L, returnValue = EmptyReturn()) and
  asserts both case-class equality and field-level commandId /
  returnValue preservation.

- Fix the comment that pointed to DataProcessor.scala for the
  DirectControlMessagePayload logging filter — the worker-side filter
  now lives in DPThread.scala (`msgOpt.filter(_.payload.isInstanceOf[
  DirectControlMessagePayload])` at DPThread.scala:193).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@Yicong-Huang Yicong-Huang merged commit b562b55 into apache:main May 4, 2026
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

add unit test coverage for logreplay primitives

4 participants