test(amber): add unit test coverage for EmptyReplayLogger and ReplayLogGenerator#5554
Conversation
…ogGenerator
Pins behavior of two previously-uncovered modules in
`engine/architecture/logreplay`:
- `EmptyReplayLogger` — null-object ReplayLogger whose drain returns
an empty array and whose log/markAsReplayDestination are no-ops
- `ReplayLogGenerator` — pure decoder that partitions stored records
into (steps, messages) queues and short-circuits at the matching
ReplayDestination
Closes apache#5550
There was a problem hiding this comment.
Pull request overview
Adds ScalaTest unit coverage for two previously-untested log replay components under amber/engine/architecture/logreplay, pinning expected behavior without modifying production code.
Changes:
- Add
EmptyReplayLoggerSpecto verify the null-object logger remains a no-op and always returns an emptyArray[ReplayLogRecord]. - Add
ReplayLogGeneratorSpecto validateReplayLogGenerator.generatepartitions records correctly, handles replay-destination cut semantics, and throws on unknown record types.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala | New unit spec covering record partitioning, replay cut semantics, and error behavior for ReplayLogGenerator.generate using temp-dir-backed storage. |
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala | New unit spec pinning no-op behavior and return-type/trait conformance for EmptyReplayLogger. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { | ||
| val field = AmberRuntime.getClass.getDeclaredField(name) | ||
| field.setAccessible(true) | ||
| field.set(AmberRuntime, value) | ||
| } | ||
|
|
||
| override protected def beforeAll(): Unit = { | ||
| super.beforeAll() | ||
| setAmberRuntimeField("_actorSystem", testSystem) | ||
| setAmberRuntimeField("_serde", testSerde) | ||
| } | ||
|
|
||
| override protected def afterAll(): Unit = { | ||
| setAmberRuntimeField("_serde", null) | ||
| setAmberRuntimeField("_actorSystem", null) | ||
| TestKit.shutdownActorSystem(testSystem) | ||
| super.afterAll() | ||
| } |
| .sorted(java.util.Comparator.reverseOrder()) | ||
| .forEach { child => | ||
| try Files.deleteIfExists(child) | ||
| catch { case _: java.nio.file.FileSystemException => () } |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5554 +/- ##
============================================
+ Coverage 52.09% 52.16% +0.06%
- Complexity 2471 2484 +13
============================================
Files 1067 1067
Lines 41273 41273
Branches 4437 4437
============================================
+ Hits 21502 21528 +26
+ Misses 18509 18483 -26
Partials 1262 1262
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
What changes were proposed in this PR?
Pin behavior of two previously-uncovered modules in
engine/architecture/logreplay. No production-code changes.EmptyReplayLoggerSpecEmptyReplayLoggerReplayLogGeneratorSpecReplayLogGeneratorBoth spec files follow the
<srcClassName>Spec.scalaone-to-one convention.Behavior pinned —
EmptyReplayLoggerdrainCurrentLogRecords(step)Array[ReplayLogRecord]regardless ofstep(positive, zero,Long.MaxValue, negative); returns a non-null array; element type isReplayLogRecord(compile-time enforced)markAsReplayDestinationEmbeddedControlMessageIdentity; does not accumulate statelogCurrentStepWithMessage(step, channelId, msg)triple, includingmsg = Noneand 100 successive callsReplayLoggerinterfaceBehavior pinned —
ReplayLogGenerator.generategetStorage(None)(EmptyRecordStorage)(empty queue, empty queue)VFSRecordStoragefile(empty queue, empty queue)ProcessingSteprecordsMessageContentrecordsReplayDestination(id)matchingreplayToReplayDestination(id)NOT matchingreplayToReplayDestinationTerminateSignal)RuntimeExceptionwith a diagnostic messageNotes
While writing
ReplayLogGeneratorSpecI discovered a production stream-leak inReplayLogGenerator.generate: when it hits the matchingReplayDestinationit short-circuits via a non-localreturn, leaving theSequentialRecordReader's underlyingInputstream open. On Windows the leaked file handle blocks subsequent temp-dir deletion. The spec'scleanuphelper tolerates the resultingFileSystemExceptionso the case still passes; the real fix is at the source and is out of scope for a test-coverage PR (will file a follow-up issue).ReplayLogGeneratorSpecuses a temp-dir-backedVFSRecordStorage[ReplayLogRecord]and the productionAmberRuntime.serde(suite-localActorSysteminjected via reflection, torn down inafterAll) — same harness pattern asCheckpointSubsystemSpec/ClientEventSpec/VFSRecordStorageSpec.Any related issues, documentation, discussions?
Closes #5550.
How was this PR tested?
Pure unit-test additions; verified locally with:
sbt "WorkflowExecutionService/testOnly org.apache.texera.amber.engine.architecture.logreplay.EmptyReplayLoggerSpec org.apache.texera.amber.engine.architecture.logreplay.ReplayLogGeneratorSpec"— 19 tests, all greensbt scalafmtCheckAll— cleanWas this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Sonnet 4.5)