Conversation
…uccess Single-node clusters returned success without writing logs to disk, violating Raft durability guarantees and causing potential data loss on node restart. Changes: - replication_handler.rs: Restructure control flow to persist logs before returning (Phase 1 detect topology, Phase 2-4 persist logs, Phase 5 single-node early return) - leader_state.rs: Fix commit index calculation for single-node (commit_index = last_log_index for quorum of 1) - leader_state_test.rs: Add mock expectations for last_entry_id() in 3 lease_read tests Ensures durability, crash safety, and maintains zero overhead for multi-node clusters.
- Replace implicit `.into()` with explicit `as i32` for role enums - Improves code clarity and consistency across test and utility code - Standardizes conversions for: Follower, Candidate, Leader, Learner - Updates 26 files for uniform type conversion pattern - No functional changes, purely code quality improvement
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds embedded mode: a zero-overhead in-process KV client and high-level EmbeddedEngine, refactors KV client surface (new KvClient trait, Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Areas needing extra attention:
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Pull request overview
This PR implements a local KV client for zero-overhead embedded access to the d-engine database, addressing issue #182.
Key Changes:
- Introduces a unified
KvClienttrait ind-engine-corefor both remote and embedded access - Implements
LocalKvClientind-engine-serverfor direct Raft core communication (10-20x faster than gRPC) - Renames
KvClienttoGrpcKvClientind-engine-clientand implements the core trait - Refactors enum conversions from
.into()to explicitas i32casts for clarity - Improves single-node cluster handling in replication and commit logic
Reviewed changes
Copilot reviewed 40 out of 45 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
d-engine-core/src/kv_client.rs |
New unified KvClient trait with comprehensive documentation |
d-engine-server/src/node/client/local_kv.rs |
LocalKvClient implementation for embedded access |
d-engine-client/src/grpc_kv_client.rs |
Renamed from kv.rs, implements core KvClient trait |
d-engine-server/src/node/mod.rs |
Adds local_client() method to Node |
d-engine-core/src/replication/replication_handler.rs |
Improved single-node cluster detection logic |
d-engine-core/src/raft_role/leader_state.rs |
Enhanced commit index calculation for single-node clusters |
| Multiple test files | Comprehensive integration tests for LocalKvClient |
| Cargo.toml files | Added async-trait dependency and d-engine-core references |
| Various source files | Refactored enum conversions from .into() to as i32 |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d-engine-server/src/node/mod.rs
Outdated
| /// Creates a local KV client for zero-overhead embedded access. | ||
| /// | ||
| /// Returns the node ID. |
There was a problem hiding this comment.
The doc comment on line 210-212 is incorrect. It says "Creates a local KV client for zero-overhead embedded access. Returns the node ID." but the function actually returns u32 (node ID), not a client. The doc comment should match the node_id() function it describes.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
d-engine-core/src/raft_role/candidate_state.rs (1)
480-488: RoleViolation error reports wrong current/required role stringsThis is
CandidateState, but theLogPurgeCompletedbranch reports:
current_role: "Learner"required_role: "Leader"That’s confusing for operators and logs.
Consider updating the strings to match the actual role:
- return Err(ConsensusError::RoleViolation { - current_role: "Learner", - required_role: "Leader", - context: format!( - "Learner node {} should not receive LogPurgeCompleted event.", - ctx.node_id - ), - } + return Err(ConsensusError::RoleViolation { + current_role: "Candidate", + required_role: "Leader", + context: format!( + "Candidate node {} should not receive LogPurgeCompleted event.", + ctx.node_id + ), + } .into());d-engine-client/src/lib.rs (1)
45-60: Docs still referenceKvClientwhile the type is nowGrpcKvClientThe public API has been moved from the old
kvmodule togrpc_kv_client, andClient.kv()now returns&GrpcKvClient. However, the crate docs still advertiseKvClient://! - [`KvClient`] - Key-value store operationsTo avoid confusion (and potential broken intra‑doc links), consider:
-//! - [`KvClient`] - Key-value store operations +//! - [`GrpcKvClient`] - Key-value store operationsIf you care about downstream compatibility with existing
KvClientusers, you might also reintroduce an alias:+/// Backwards-compatible alias for the gRPC-based KV client. +pub type KvClient = GrpcKvClient;placed after
pub use grpc_kv_client::*;. That keeps old code compiling while reflecting the new name.Also applies to: 101-137
🧹 Nitpick comments (14)
d-engine-core/src/commit_handler/default_commit_handler_test.rs (1)
318-323: Role casting toi32is consistent; consider centralizing the conversionAll the updates from
Leader.into()/Follower.into()toLeader as i32/Follower as i32align with therole: i32usage inTestHarnessandDefaultCommitHandler::new, so the behavior stays the same and type expectations are explicit.Given how often
Leader as i32appears in this file, you might consider a small helper (e.g.,const ROLE_LEADER: i32 = Leader as i32;) or a conversion function to avoid sprinkling raw casts everywhere and to make any future change in the proto enum representation easier to apply in one place. This is optional, the current code is correct.Also applies to: 437-452, 474-488, 507-520, 536-551, 586-601, 639-653, 692-706, 745-759, 851-860, 878-887, 910-919, 950-959, 981-990, 1009-1018, 1035-1044, 1057-1066, 1092-1101, 1118-1127, 1152-1173, 1201-1223
d-engine-core/src/lib.rs (1)
6-6: Newkv_clientmodule is cleanly exposed; be aware of API surface implicationsAdding
pub mod kv_client;andpub use kv_client::*;cleanly surfaces the KV client traits/types at the crate root, which is convenient for consumers. Ifkv_clientaccumulates many items over time, you may later want to curate what gets re-exported (e.g., only the main trait and error types) to keep the root API smaller, but the current approach is fine if full exposure is intended.Also applies to: 25-25
d-engine-client/src/cluster_test.rs (1)
50-50: Role casts look good; redundant leader_node assignment can be droppedUsing
NodeRole::<Variant> as i32forNodeMeta.roleand assertions is consistent with the proto representation and keeps tests aligned with on-the-wire values.Minor cleanup: in
test_join_cluster_failure,leader_nodeis never used after construction, andleader_node.role = NodeRole::Leader as i32;(Line 145) simply reassigns the same value. You can safely removeleader_nodeand the extra assignment to reduce noise in the test.Also applies to: 64-64, 94-94, 133-133, 141-145
d-engine-core/src/replication/replication_handler.rs (1)
73-94: Single-node fast path and NoPeerFound behavior look correct; consider an earlier early-returnThe new topology-aware flow makes sense:
- Single-node clusters:
replication_targetsis empty, logs are appended locally in Phase 2, and Phase 5 returnsAppendResults { commit_quorum_achieved: true, .. }without any network calls.- Multi-node clusters:
replication_peers().awaitis required to be non-empty; otherwise aReplicationError::NoPeerFound { leader_id }is raised, which is a clearer failure mode than silently proceeding with no peers.Two follow‑ups:
- You can avoid some unnecessary work for single-node clusters by returning before building
entries_per_peerandrequests(e.g., immediately after Phase 2 once entries are persisted), since they are never used whenis_single_nodeis true.- The new
NoPeerFoundbranch slightly tightens behavior for misconfigured multi-node clusters; please ensure there are tests covering this path so it doesn’t cause surprising regressions when membership is in flux.Also applies to: 148-166
d-engine-server/Cargo.toml (1)
31-31: async-trait dependency is fine; consider centralizing the versionAdding
async-trait = "0.1"here is consistent with the rest of the workspace using async traits.To avoid version skew and duplicate async-trait versions in the dependency tree, consider moving async-trait into the workspace
[dependencies](or[workspace.dependencies]) so client/server/core all share the same pinned version.d-engine-client/Cargo.toml (1)
20-20: Client’s new core + async-trait deps look good; watch workspace consistencyBringing in
d-engine-corehere matches the new shared KV abstractions, and addingasync-trait = "0.1"is expected for async traits in the client.Two minor follow‑ups:
- Ensure
d-engine-coredoesn’t depend ond-engine-client(directly or indirectly) to keep the crate graph acyclic.- Since multiple crates (client, server, possibly core) now use async-trait, consider moving it to a shared workspace dependency so you don’t accidentally pin different
0.1.xversions.Also applies to: 32-32
d-engine-server/src/node/node_test.rs (1)
110-123: Role casts updated to explicitas i32Switching
NodeMeta.roletoFollower as i32/Learner as i32matches the proto field type and keeps the test semantics unchanged; this looks correct. If you want to reduce cognitive overhead later, consider gradually standardizing role/status conversions in this file to eitheras i32or.into()consistently, but it’s not urgent.Also applies to: 196-209, 283-296
d-engine-core/Cargo.toml (1)
32-32:async-traitdependency additionAdding
async-trait = "0.1"is a reasonable choice for async trait support; just make sure the chosen version range aligns with your MSRV and matches any workspace‑wide policy on pinning shared deps.d-engine-server/src/lib.rs (1)
69-69: Re‑exportingLocalKvClientfrom the crate rootExposing
LocalKvClientviapub use node::{LocalKvClient, Node, NodeBuilder};makes sense if it’s intended as a stable public entry point; the wiring looks correct. Consider updating the top‑level docs ornodemodule docs later to briefly mentionLocalKvClientas a supported usage path.d-engine-client/src/kv_test.rs (2)
13-15: GrpcKvClient migration in tests is type-correct but quite repetitiveAll the updated constructions:
let client = GrpcKvClient::new(Arc::new(ArcSwap::from_pointee(ClientInner { pool, client_id: 123, config, endpoints, })));are type‑correct and match
GrpcKvClient::new(Arc<ArcSwap<ClientInner>>). No behavioral regression here.Given how often this block appears, consider a small helper to reduce duplication and keep future constructor changes localized, e.g.:
+fn make_test_kv_client( + pool: ConnectionPool, + config: ClientConfig, + endpoints: Vec<String>, +) -> GrpcKvClient { + GrpcKvClient::new(Arc::new(ArcSwap::from_pointee(ClientInner { + pool, + client_id: 123, + config, + endpoints, + }))) +}and then in tests:
- let client = GrpcKvClient::new(Arc::new(ArcSwap::from_pointee(ClientInner { - pool, - client_id: 123, - config, - endpoints, - }))); + let client = make_test_kv_client(pool, config, endpoints);This keeps the new client type usage clear and centralizes constructor changes.
Also applies to: 42-47, 78-83, 115-120, 152-157, 188-193, 232-237, 268-273, 302-307, 336-341, 393-398, 465-470, 513-518, 554-560, 593-598, 631-636, 669-674, 722-727, 780-785, 820-825, 882-886, 952-956, 1019-1023, 1138-1142
547-560: Align inner config/endpoints intest_get_multi_empty_keysfor clarityHere the
ClientInnerused for the client is initialized withClientConfig::default()andendpoints: vec![], even though aconfigandendpointsare built just above. It works because the pool is already constructed and the test only validates empty-key behavior, but it’s a bit misleading.You could make this more self‑consistent with a tiny tweak:
- let pool = ConnectionPool::create(endpoints.clone(), config.clone()) + let pool = ConnectionPool::create(endpoints.clone(), config.clone()) .await .expect("Should create connection pool"); - // Test with empty keys vector - let result = GrpcKvClient::new(Arc::new(ArcSwap::from_pointee(ClientInner { - pool, - client_id: 123, - config: ClientConfig::default(), - endpoints: vec![], - }))) + // Test with empty keys vector + let result = GrpcKvClient::new(Arc::new(ArcSwap::from_pointee(ClientInner { + pool, + client_id: 123, + config, + endpoints, + }))) .get_multi_with_policy(std::iter::empty::<String>(), None)Not required for correctness, but it avoids surprising anyone reading the test.
d-engine-server/tests/local_kv_client_integration_test.rs (2)
204-247: Concurrent test verifies operations succeed but not data correctness.The test verifies all concurrent PUTs succeed but doesn't verify the data can be read back correctly. Consider adding verification after the concurrent writes complete:
// Wait for all to complete let (r1, r2, r3) = tokio::join!(handle1, handle2, handle3); assert!( r1.is_ok() && r2.is_ok() && r3.is_ok(), "All concurrent operations should succeed" ); + + // Allow time for commits to apply + tokio::time::sleep(Duration::from_millis(500)).await; + + // Verify a sample of written keys + let verify_client = node.local_client(); + let result = verify_client.get(b"concurrent_key_0").await.expect("GET failed"); + assert!(result.is_some(), "Concurrent write should be readable"); println!("✅ LocalKvClient concurrent operations succeeded");
274-298: Empty key test could be more thorough.The test allows empty keys (line 281-282) but doesn't verify the GET works for empty keys. If empty keys are intentionally supported, consider verifying retrieval:
// Empty key with value let result = client.put(b"", b"some_value").await; assert!(result.is_ok(), "Empty key PUT should succeed"); + + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify empty key can be retrieved + let get_result = client.get(b"").await.expect("GET empty key failed"); + assert!(get_result.is_some(), "Empty key should be retrievable");Alternatively, if empty keys should be rejected, consider adding validation in the
LocalKvClientimplementation.d-engine-server/src/node/client/local_kv.rs (1)
255-297: Consider extracting common write request logic.The
put_with_ttlimplementation duplicates the send/timeout/error pattern fromputanddelete. A helper method could reduce this duplication:async fn send_write_request(&self, request: ClientWriteRequest) -> Result<()> { let (resp_tx, resp_rx) = MaybeCloneOneshot::new(); self.event_tx .send(RaftEvent::ClientPropose(request, resp_tx)) .await .map_err(|_| LocalClientError::ChannelClosed)?; let result = tokio::time::timeout(self.timeout, resp_rx) .await .map_err(|_| LocalClientError::Timeout(self.timeout))? .map_err(|_| LocalClientError::ChannelClosed)?; let response = result.map_err(|status| { LocalClientError::ServerError(format!("RPC error: {}", status.message())) })?; if response.error != ErrorCode::Success as i32 { return Err(LocalClientError::ServerError(format!( "Error code: {}", response.error ))); } Ok(()) }Then
put,put_with_ttl, anddeletecould each construct theirClientWriteRequestand delegate to this helper.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
Cargo.lockis excluded by!**/*.lockbenches/d-engine-bench/Cargo.lockis excluded by!**/*.lockexamples/client_usage/Cargo.lockis excluded by!**/*.lockexamples/rocksdb-cluster/Cargo.lockis excluded by!**/*.lockexamples/single-node-expansion/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (40)
d-engine-client/Cargo.toml(2 hunks)d-engine-client/src/builder.rs(2 hunks)d-engine-client/src/cluster_test.rs(4 hunks)d-engine-client/src/grpc_kv_client.rs(3 hunks)d-engine-client/src/kv_test.rs(25 hunks)d-engine-client/src/lib.rs(4 hunks)d-engine-client/src/mock_rpc_service.rs(4 hunks)d-engine-client/src/pool.rs(1 hunks)d-engine-client/src/pool_test.rs(4 hunks)d-engine-core/Cargo.toml(1 hunks)d-engine-core/src/commit_handler/default_commit_handler_test.rs(29 hunks)d-engine-core/src/config/cluster.rs(1 hunks)d-engine-core/src/kv_client.rs(1 hunks)d-engine-core/src/lib.rs(2 hunks)d-engine-core/src/raft_role/candidate_state.rs(1 hunks)d-engine-core/src/raft_role/follower_state.rs(1 hunks)d-engine-core/src/raft_role/leader_state.rs(2 hunks)d-engine-core/src/raft_role/learner_state.rs(2 hunks)d-engine-core/src/replication/replication_handler.rs(2 hunks)d-engine-core/src/state_machine_handler/default_state_machine_handler_test.rs(1 hunks)d-engine-core/src/state_machine_handler/snapshot_policy/composite_test.rs(2 hunks)d-engine-core/src/state_machine_handler/snapshot_policy/log_size_test.rs(6 hunks)d-engine-core/src/state_machine_handler/snapshot_policy/time_based_test.rs(3 hunks)d-engine-core/src/test_utils/mock/mock_rpc_service.rs(4 hunks)d-engine-core/src/utils/cluster.rs(1 hunks)d-engine-server/Cargo.toml(1 hunks)d-engine-server/src/lib.rs(1 hunks)d-engine-server/src/membership/raft_membership.rs(9 hunks)d-engine-server/src/membership/raft_membership_test.rs(27 hunks)d-engine-server/src/network/grpc/grpc_transport_test.rs(16 hunks)d-engine-server/src/node/client/local_kv.rs(1 hunks)d-engine-server/src/node/client/mod.rs(1 hunks)d-engine-server/src/node/mod.rs(3 hunks)d-engine-server/src/node/node_test.rs(3 hunks)d-engine-server/src/test_utils/integration/mod.rs(1 hunks)d-engine-server/src/utils/cluster.rs(1 hunks)d-engine-server/tests/components/raft_role/leader_state_test.rs(4 hunks)d-engine-server/tests/local_kv_client_integration_test.rs(1 hunks)d-engine/Cargo.toml(1 hunks)d-engine/src/lib.rs(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (9)
d-engine-core/src/replication/replication_handler.rs (1)
d-engine-server/src/membership/raft_membership.rs (1)
new(778-795)
d-engine-core/src/utils/cluster.rs (1)
d-engine-server/src/utils/cluster.rs (3)
is_candidate(71-73)is_leader(76-78)is_learner(81-83)
d-engine-client/src/builder.rs (1)
d-engine-client/src/grpc_kv_client.rs (1)
new(37-39)
d-engine-server/tests/local_kv_client_integration_test.rs (5)
d-engine-client/src/kv_test.rs (1)
std(560-560)d-engine-core/src/raft_role/leader_state.rs (3)
from(2336-2372)new(105-113)new(1922-1959)d-engine-server/src/node/client/local_kv.rs (3)
from(62-68)client_id(222-224)timeout(227-229)d-engine-server/src/node/builder.rs (3)
storage_engine(188-194)state_machine(197-203)from_cluster_config(159-166)d-engine-client/src/builder.rs (1)
new(35-40)
d-engine-server/src/node/mod.rs (2)
d-engine-core/src/raft_role/role_state.rs (1)
node_id(42-44)d-engine-server/src/node/client/local_kv.rs (1)
new_internal(83-93)
d-engine-client/src/grpc_kv_client.rs (2)
d-engine-server/src/node/client/local_kv.rs (9)
from(62-68)put(96-135)put(247-253)put_with_ttl(255-297)get(138-181)get(299-304)get_multi(306-348)delete(184-219)delete(350-355)d-engine-core/src/kv_client.rs (5)
put(111-115)put_with_ttl(137-142)get(173-176)get_multi(205-208)delete(230-233)
d-engine-server/src/membership/raft_membership.rs (1)
d-engine-core/src/membership.rs (1)
reset_leader(115-115)
d-engine-client/src/kv_test.rs (1)
d-engine-client/src/grpc_kv_client.rs (1)
new(37-39)
d-engine-server/src/node/client/local_kv.rs (2)
d-engine-core/src/kv_client.rs (6)
fmt(53-64)put(111-115)get(173-176)delete(230-233)put_with_ttl(137-142)get_multi(205-208)d-engine-proto/src/exts/client_ext.rs (3)
insert(53-65)read_results(127-133)insert_with_ttl(73-86)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Lint and Format Check
🔇 Additional comments (43)
d-engine-client/src/pool.rs (1)
182-193: ExplicitNodeRole→i32cast looks correct and clearerUsing
node.role == NodeRole::Leader as i32matches the likelyi32backing type onNodeMeta.roleand makes the intent clearer than a generic.into(), without changing behavior. No issues spotted with the leader/follower selection logic around this change.d-engine-server/src/network/grpc/grpc_transport_test.rs (1)
131-131: LGTM! Consistent enum-to-integer conversions.The explicit
as i32casts forNodeRolevariants (Follower,Candidate,Leader,Learner) are consistent throughout the test file and align with themock_membershipfunction signature that expectsi32role values. The changes are syntactic only and don't affect test behavior.Also applies to: 172-172, 224-224, 298-298, 381-381, 458-458, 541-541, 591-591, 643-643, 697-697, 746-746, 800-800, 874-874, 921-921, 984-984, 1036-1036
d-engine-core/src/test_utils/mock/mock_rpc_service.rs (1)
181-188: ExplicitLeader as i32in NodeMeta is correct and consistentSwitching
NodeMeta.roletoLeader as i32matches the protobuf field type (i32) and is consistent with the rest of the PR’s role handling. No functional change here; just a clearer, explicit cast.Also applies to: 225-232, 255-262, 287-294
d-engine-core/src/config/cluster.rs (1)
128-135: Default clusterFollower as i32role is correctUsing
role: Follower as i32indefault_initial_clustermatches the proto type and the expected default RAFT role for peers. Looks good.d-engine-client/src/mock_rpc_service.rs (1)
103-108: Client mock NodeMeta usesLeader as i32consistentlyThe change to
role: NodeRole::Leader as i32across the mock helpers aligns this test scaffolding with the proto field type and with the core mocks. No issues here.Also applies to: 131-137, 162-168, 193-199
d-engine-core/src/state_machine_handler/snapshot_policy/time_based_test.rs (1)
21-26: SnapshotContext now usesLeader as i32directlyThe updates to set
role: Leader as i32in the time-based snapshot tests match theSnapshotContextfield type and keep the tests semantically identical to the previousinto()usage. No concerns.Also applies to: 47-52, 69-74
d-engine-core/src/state_machine_handler/snapshot_policy/composite_test.rs (1)
13-23: Leader/follower role casts are correct in composite policy testsUsing
Leader as i32inleader_ctxandFollower as i32infollower_ctxcleanly reflects the proto enums into theSnapshotContext’si32role field and preserves the intended leader-only triggering behavior in these tests.Also applies to: 26-36
d-engine-core/src/state_machine_handler/snapshot_policy/log_size_test.rs (1)
35-36: Log size policy tests now use explicit role casts consistentlyAll updated contexts now pass
Leader as i32orFollower as i32intoSnapshotContext, matching the field type and the broader change away from.into(). The leader-only triggering and follower-never-triggers semantics remain intact.Also applies to: 41-43, 48-50, 55-57, 68-70, 81-83, 98-100, 105-107, 118-130, 137-139
d-engine-client/src/pool_test.rs (1)
23-23: Enum-to-i32 role casts in tests are consistent and correctThe updates to use
NodeRole::<Variant> as i32forNodeMeta.roleacross these tests align with the underlying proto representation and with the role helpers elsewhere in the codebase. Behavior of the tests remains the same.Also applies to: 29-29, 45-45, 170-170, 205-205
d-engine/Cargo.toml (1)
31-31: Adding d-engine-core to the top-level crate is appropriate; just confirm no dependency cyclesDepending on
d-engine-corefromd-enginematches the new layering where core holds shared traits (e.g., KV client abstractions).Please just double-check that
d-engine-coredoes not depend (directly or indirectly) back ond-engine, to avoid a circular dependency in the workspace.d-engine-core/src/raft_role/follower_state.rs (1)
634-638: Follower role snapshot encoding matches the new conventionSetting
StateSnapshot.roleviaFollower as i32keeps the follower snapshot consistent with the rest of the codebase’s explicit i32 role representation (Leader/Candidate/Learner doing the same). This is a clean, non‑behavioral tightening of the encoding.d-engine-server/src/utils/cluster.rs (1)
67-68: Explicit(Role as i32)checks are clear and consistentThe role predicate helpers now compare
role_i32against(Follower|Candidate|Leader|Learner as i32), which matches the proto enum backing type and the rest of the codebase’s explicit casting strategy. This keeps role checks simple and avoids depending onInto<i32>plumbing.Also applies to: 71-73, 76-78, 81-83
d-engine-server/src/test_utils/integration/mod.rs (1)
195-215: Sharedfollower_roleconstant is fineUsing
Follower as i32cached infollower_rolematchesNodeMeta.role’s i32 type and avoids repeated casts; no issues spotted.d-engine-core/src/utils/cluster.rs (1)
27-45: Enum–integer comparisons now use explicit castsUsing
role_i32 == (Follower as i32)(and the same for other roles) is consistent with howNodeRoleis represented elsewhere and keeps these helpers straightforward; no issues here.d-engine-core/src/state_machine_handler/default_state_machine_handler_test.rs (1)
862-865:NewCommitData.rolecast updateSetting
role: Leader as i32aligns this test with the i32 role representation used elsewhere; behavior remains the same.d-engine-server/tests/components/raft_role/leader_state_test.rs (2)
3245-3248: Mockinglast_entry_idfor learner promotion testsAdding
raft_log.expect_last_entry_id().returning(|| 10);ensures the mock satisfies any newLeaderStatecalls tolast_entry_id()during batch learner promotion, and the chosen value is consistent with a log that has entries beyond the committed index. Looks good.
3969-3972: Client read tests now fully configureMockRaftLogThe added
raft_log.expect_last_entry_id().returning(|| 10);expectations in the lease/linearizable read tests correctly cover any newlast_entry_id()usage byLeaderStateduring client reads and avoid under‑specified mocks. The value is coherent with the rest of each test’s commit/majority indices.Also applies to: 4021-4024, 4074-4077
d-engine-client/src/builder.rs (1)
12-12: Builder now wiresGrpcKvClientintoClientSwitching the KV field to
GrpcKvClient::new(inner.clone())(and importingGrpcKvClient) aligns the builder with the new client type and reuses the existingClientInner/ArcSwapwiring; this is consistent and looks correct.Also applies to: 114-118
d-engine-core/src/raft_role/candidate_state.rs (1)
601-608: ExplicitCandidate as i32role encoding looks consistentThe switch to
role: Candidate as i32matches the pattern used for other roles (Leader/Follower/Learner) and keepsStateSnapshoton the raw i32 representation. No functional issues here.d-engine-core/src/raft_role/learner_state.rs (1)
466-477: Consistent numeric encoding for Learner roleUsing
Learner as i32in bothJoinRequest.node_roleandstate_snapshot().rolealigns with the rest of the codebase’s explicit i32 role representation. Behavior and semantics remain the same, just more explicit.Also applies to: 541-548
d-engine-server/src/node/client/mod.rs (1)
1-5: Local KV client module wiring looks goodThe
local_kvsubmodule and thepub use local_kv::{LocalClientError, LocalKvClient};re‑export cleanly expose the embedded client API from the node surface. Nothing to change here.d-engine-server/src/node/mod.rs (1)
20-25: Embedded LocalKvClient construction is correct and well-scoped
mod client; pub use client::*;bringsLocalKvClientinto scope and into the public node API.local_client()correctly usesLocalKvClient::new_internal(self.event_tx.clone(), self.node_id, Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms)), which matches the internal constructor signature and keeps the timeout config-driven.The embedded client wiring itself looks solid.
Also applies to: 71-96, 210-238
d-engine-server/src/membership/raft_membership.rs (1)
286-305: Role handling via explicitas i32is consistent across membership logicThe membership implementation now consistently uses raw i32 role values:
- Leaders are set/compared via
Leader as i32inmark_leader_id,reset_leader, andcurrent_leader_id.- Learner → Follower promotions (
PromoteandBatchPromote) andapply_config_changeuseLearner as i32/Follower as i32symmetrically.add_learnerinitializesNodeMeta.roleasLearner as i32.can_rejoinvalidates the passedroleagainstLearner as i32.This keeps the internal representation explicit without altering semantics. Tests in
raft_membership_test.rsmirror the same pattern, so the change looks safe.Also applies to: 328-332, 396-399, 411-415, 500-504, 682-706, 758-761
d-engine-server/src/membership/raft_membership_test.rs (1)
38-69: Tests correctly updated to use explicit i32 role valuesAcross the membership tests:
NodeMeta.roleis now initialized withLeader as i32,Follower as i32,Learner as i32, orCandidate as i32.- Assertions and filters (e.g.
n.role == Leader as i32, or the predicates passed intoget_peers_id_with_condition) use the same casts.- Promotion and batch‑change tests (e.g. cases 3/4/7 and batch remove tests) still verify the intended role transitions.
These updates keep the tests aligned with the production code’s explicit i32 role representation without changing semantics.
Also applies to: 181-219, 235-277, 292-332, 339-379, 420-496, 611-637, 682-721, 860-888, 897-908, 1092-1121, 1305-1377
d-engine-server/tests/local_kv_client_integration_test.rs (1)
21-79: Test helper looks solid for integration testing.The
create_test_nodefunction properly sets up a single-node cluster with cleanup of old test data. A few observations:
The spawned
node.run()task (lines 69-73) is fire-and-forget - theJoinHandleis dropped. This is acceptable for tests since the_shutdownsender controls lifecycle.The 2-second sleep (line 76) for leader election is reasonable for single-node clusters but may be fragile under heavy CI load.
d-engine-core/src/raft_role/leader_state.rs (2)
1205-1206: Explicitas i32cast is cleaner.The change from
.into()toLeader as i32makes the type conversion explicit and self-documenting. This is consistent with how other enum values (likeNodeRole::Learner as i32at line 1899) are handled in the codebase.
1333-1357: Good fix for single-node cluster commit advancement.The new logic correctly handles single-node clusters where
peer_updatesis empty. In this case, the leader'slast_log_indexbecomes the commit index since there's no quorum to coordinate with. The guard checkinglast_log_index > self.commit_index()prevents unnecessary updates.The comment clearly documents the two cases:
- Single-node:
commit_index = last_log_index- Multi-node: calculated based on majority quorum
d-engine-client/src/grpc_kv_client.rs (2)
349-367: Error conversion looks correct but could lose error chain context.The
From<ClientApiError>implementation maps errors well, but wrapping errors as strings loses the original error chain. This is acceptable for a trait boundary, but consider whetherKvClientErrorshould useBox<dyn Error>or similar for richer error context in the future.One minor observation:
ErrorCode::ConnectionTimeoutmaps toKvClientError::Timeout, which is semantically correct.
370-416: Clean trait implementation with proper delegation.The
CoreKvClientimplementation correctly:
- Delegates each method to the corresponding
GrpcKvClientinherent method- Uses
map_err(Into::into)for consistent error conversion- Extracts
result.valuefromClientResultinget/get_multito return rawBytesThe pattern
GrpcKvClient::put(self, key, value)for method disambiguation is correct since both the trait and struct have methods namedput.d-engine/src/lib.rs (3)
123-128: Good: CoreKvClienttrait is conditionally exposed.Exposing
KvClientfromd_engine_corewhen eitherclientorserverfeatures are enabled allows users to write generic code against the trait while using concrete implementations (GrpcKvClientorLocalKvClient).
141-143:LocalKvClientcorrectly added to server exports.This enables embedded/local KV access with zero network overhead for server-side applications. The comment "zero-overhead, same process" accurately describes the benefit.
190-202: Prelude organization is sensible.
- Client prelude:
GrpcKvClientfor remote access- Server prelude:
LocalKvClientfor embedded access- Full prelude:
KvClienttrait for generic programmingd-engine-core/src/kv_client.rs (4)
34-50: Error enum covers key failure modes.The
KvClientErrorvariants appropriately cover:
ChannelClosed- for embedded client shutdownTimeout- for operation timeoutsServerError- for Raft/storage errorsNetworkError- for gRPC transport failuresInvalidArgument- for validation errorsThe
Clonederive is useful for error propagation in async contexts.
88-89: Trait design withSend + Syncbounds is appropriate.The
Send + Syncrequirement enables concurrent use of clients across threads, which is essential for async Rust applications. Theasync_traitmacro handles the async method bounds correctly.
205-208:get_multiuses&[Bytes]rather than generic iterator.This is a pragmatic choice to avoid the complexity of
impl IntoIterator<Item = impl AsRef<[u8]> + Send> + Sendwhich can be problematic with async traits. TheGrpcKvClientimpl at line 404 shows the straightforward usage:keys.iter().cloned().If more flexibility is needed in the future, a helper method could accept an iterator and convert to
Vec<Bytes>before callingget_multi.
1-27: Excellent module-level documentation.The documentation clearly explains:
- The purpose (unified interface for remote/embedded access)
- The two implementations (
GrpcKvClient,LocalKvClient)- Design principles (unified, async-first, type-safe, zero-cost)
- A practical example showing generic usage
d-engine-server/src/node/client/local_kv.rs (7)
1-16: Well-documented module with clear performance expectations.The module-level documentation effectively communicates the purpose, performance benefits, and usage pattern of
LocalKvClient.
18-28: Imports are appropriately organized.All imports are used within the module.
30-69: Error handling is well-structured.The error type properly implements
Display,Debug, andstd::error::Error, and the conversion toKvClientErroris correctly implemented. Note that theTimeoutvariant loses the duration information during conversion, but this is acceptable givenKvClientError::Timeoutdoesn't carry that detail.
96-135: Put operation implementation is correct.The method properly handles channel send/receive errors, timeouts, and server-side error codes. The use of
Bytes::copy_from_slicefor the conversion is appropriate.
184-219: Delete operation follows the same robust pattern.Correctly mirrors the
putimplementation for delete semantics.
232-242: Debug implementation appropriately excludes the channel sender.Showing only
client_idandtimeoutkeeps debug output meaningful.
171-180: The "key not found" semantics inget()are correctly implemented.Verification confirms the code works as documented:
Missing key path:
read_from_state_machine()returnsNonewhen no keys exist →.unwrap_or_default()converts toVec::new()→ClientResponse::read_results(vec![])→ client receives empty results →first()returnsNone→ returnsNone✓Key exists path:
read_from_state_machine()returnsSome(vec![ClientResult{...}])→ converted to results vector → client receives non-empty results →first()returnsSome→ returnsSome(value)✓Empty value path: Results include
ClientResultwith emptyBytes→first()returnsSome→ returnsSome(empty_bytes)✓The comment in lines 175-176 ("If results list is empty, key doesn't exist / Otherwise, return the value (even if empty bytes)") is accurate and the implementation correctly distinguishes between missing keys and keys with empty values. Server-side confirmation across all three role states (leader, candidate, follower) shows consistent use of
.unwrap_or_default()to convertNone→ empty vec.
… vision - Position d-engine as "distributed coordination engine" and "consensus layer" - Highlight single-node start capability (zero downtime expansion to cluster) - Add "Tunable Persistence" feature (DiskFirst/MemFirst strategies) - Update production note: single-node deployment supported for low-traffic use - Change "plans to provide" → "provides" (features already implemented) - Emphasize resource efficiency over max CPU performance
- Create d-engine-client/src/kv_error.rs with KvClientError and KvResult - Move KvClientError enum definition from kv_client.rs - Move From<ClientApiError> impl from grpc_kv_client.rs - Remove duplicate From impl from grpc_kv_client.rs - Update d-engine-client/src/lib.rs to export kv_error module - Remove unused d-engine-core direct dependency from d-engine/Cargo.toml - KvClient trait remains in kv_client.rs for unified interface
- Fix semantic bug in get_multi: server returns sparse results (only existing keys), but client was mapping by position instead of by key, causing wrong result order - Use HashMap to reconstruct results by key, preserving positional correspondence with input keys. Query [key1, key2, key3] with key2 missing now correctly returns [Some(val1), None, Some(val3)] - Fix misleading doc comment on Node::node_id() - it returns node ID, not a client - Add comprehensive unit tests covering all cases: partial existence, all exist, none exist, empty value preservation
Core Changes: - Add EmbeddedEngine::with_rocksdb() for single-node quick start - Add EmbeddedEngine::wait_leader(timeout) for first election - Add EmbeddedEngine::leader_notifier() for leader change monitoring - Add Raft::register_leader_change_listener() for leader notifications - Add Node::leader_notifier() to expose leader changes to applications Documentation: - Add quick-start-5min.md with single-node embedded example - Add scale-to-cluster.md guide for 1→3 node expansion - Add examples/quick-start with runnable code Tests: - Add unit tests for leader change notification channels - Add integration tests for 3-node failover scenarios - Note: Some test fixes pending (see TODO below) Architecture: - Raft core emits leader change events via mpsc (unbounded) - Node converts to watch channel for stateful subscriptions - EmbeddedEngine provides developer-friendly async APIs TODO: - Fix test compilation errors in d-engine-server - Research LocalKvClient redirect strategy (openraft-style) - Add performance benchmarks for redirect path Related: Conversation "etcd embedded hello world"
- Add tests/embedded/ module with 3 test suites: * single_node_test.rs: Basic lifecycle, leader notification, data persistence * scale_to_cluster_test.rs: Single-node to 3-node cluster scaling, failure tolerance * failover_test.rs: Leader failover, re-election, minority failure scenarios - Add 7 integration tests covering: * Single-node quick start (lifecycle management) * Scaling from 1→3 nodes with data preservation (core value proposition) * Leader election notifications (event-driven API) * 3-node cluster failover and re-election * Minority failure handling (Raft correctness) - Fix compilation errors in node/mod.rs and embedded/mod_test.rs - Fix clippy warnings (format string inlining, unused imports) - Update tests/integration_test.rs to include embedded module
- Replace 'etcd-like' with 'high-level KV API' in EmbeddedEngine docs - Replace 'designed to replace etcd' with descriptive language in watch modules - Replace 'Inspired by etcd' with 'provides lease-based key expiration' in lease/config - Replace 'etcd-compatible' with 'durable expiration semantics' in file state machine - Replace 'etcd-compatible TTL semantics' with 'absolute expiration time semantics' in MIGRATION_GUIDE - Update MIGRATION_GUIDE table: 'etcd Compatible' → 'Expiration Semantics' - Keep performance comparison vs etcd in README.md (critical benchmark data) - Keep etcd references in examples/ (external documentation links) Adheres to howto.md rule #20: avoid competitor names in codebase
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
d-engine-client/src/lib.rs (1)
1-6: Update documentation to reflect API rename.The module documentation still references
KvClientbut the public type has been renamed toGrpcKvClient(see line 115).Apply this diff:
//! Provides core components for interacting with the d_engine cluster: //! - [`Client`] - Main entry point with cluster access //! - [`ClientBuilder`] - Configurable client construction -//! - [`KvClient`] - Key-value store operations +//! - [`GrpcKvClient`] - Key-value store operations //! - [`ClusterClient`] - Cluster management operations
🧹 Nitpick comments (18)
d-engine-core/src/raft.rs (1)
162-170: Consider logging errors for failed leader change notifications.The
notify_leader_changemethod silently ignores send errors withlet _ = tx.send(...), while the similarnotify_new_commit(lines 401-405) logs errors. This inconsistency could make debugging harder if listeners are unexpectedly dropped.If this is intentional fire-and-forget behavior, a brief comment explaining the rationale would help. Otherwise, consider logging for consistency:
fn notify_leader_change( &self, leader_id: Option<u32>, term: u64, ) { for tx in &self.leader_change_listener { - let _ = tx.send((leader_id, term)); + if let Err(e) = tx.send((leader_id, term)) { + debug!("Leader change listener dropped: {:?}", e); + } } }d-engine-core/src/raft_test/leader_change_test.rs (3)
56-75: Consider simplifying the test helper by removing intermediate RaftContext.The
RaftContextis constructed on lines 56-71 and immediately destructured when callingRaft::new. SinceRaft::newaccepts individual components rather than aRaftContext, consider passing them directly to reduce intermediate allocation.This is a minor style suggestion - the current approach works correctly. No action required if you prefer the explicit intermediate struct for clarity.
84-89: Test only verifies construction, not listener functionality.This test creates a Raft instance with a registered listener but only asserts
node_id == 1. Consider adding an assertion that verifies the listener was actually registered (e.g., by triggering a role event and checking the receiver).The comprehensive channel tests are in
raft_test.rs, so this may be intentionally minimal. If this is meant as a smoke test, a brief comment clarifying that would help.
40-40: Prefix unused variable with underscore to suppress compiler warning.
shutdown_txis intentionally unused (it just needs to exist to keep the channel alive), but will trigger an unused variable warning.- let (shutdown_tx, shutdown_rx) = watch::channel(()); + let (_shutdown_tx, shutdown_rx) = watch::channel(());MIGRATION_GUIDE.md (1)
16-16: Consider clarifying client TTL vs stored absolute expiryThe new wording correctly explains the WAL switch to
expire_at_secsand “absolute expiration time semantics.” To avoid confusion for users callingput_with_ttl, consider adding a brief note that:
- Clients still pass a relative TTL (
ttl_secs),- The engine converts it to an absolute UNIX timestamp internally,
- Expiry behavior is now deterministic across restarts.
This can sit near the “What Changed” paragraph or the TTL behavior table row (“Expiration Semantics”).
Also applies to: 31-33, 103-110
examples/quick-start/Makefile (1)
1-23: Usecargo runto avoid hard-coding the binary path (and optionally addall/test)Hard-coding
./target/release/quick-startcan be brittle in a workspace; running via Cargo makes the Makefile more robust and portable.You can simplify
runlike this:-run: build - @mkdir -p data/single-node - @./target/release/quick-start +run: build + @mkdir -p data/single-node + cargo run --release --bin quick-startIf you plan to keep
checkmakehappy, you can also add minimalall/testtargets:-.PHONY: help build run clean +.PHONY: all help build run test clean + +all: build + +test: + cargo testThese are optional, but align with the reported lint hints.
d-engine-docs/src/docs/scale-to-cluster.md (2)
219-224: Fix minor typos and the broken Rust snippetThere are a few small issues that are visible in rendered docs and break the Rust example:
- Lines 219–220: “Node 2 an / d Node 3 …” is split.
- Lines 257–258: “Syn / cs missing data …” is split.
- Lines 238–239: the
"still-works"string is split across lines, which makes the Rust example invalid.- Lines 285–286: the
**Tradeoff**emphasis is broken over two lines.Suggested patch:
-1. Node 2 an - d Node 3 detect leader failure (~1s heartbeat timeout) +1. Node 2 and Node 3 detect leader failure (~1s heartbeat timeout) @@ -3. Syn - cs missing data from leader +3. Syncs missing data from leader @@ -client.put(b"after-failover".to_vec(), b"still-works -".to_vec()).await?; +client.put(b"after-failover".to_vec(), b"still-works".to_vec()).await?; @@ -**Tradeoff -**: 3x cost for fault tolerance and availability. +**Tradeoff**: 3x cost for fault tolerance and availability.These keep the text the same while improving readability and making the code sample compile.
Also applies to: 237-240, 257-258, 285-286
19-24: Optional: add fenced code languages and convert “bold headings” to real headingsMarkdownlint’s MD040/MD036 hints are valid here:
- The ASCII blocks under “Single-Node Limitations” / “3-Node Benefits” use bare ``` fences; you can tag them as
text:-``` +```text 1 node = No fault tolerance ... -``` +```
- Phrases like
**Expected output**:and similar are used as pseudo-headings; if you want stricter lint compliance, convert them to### Expected output(or####) instead.Not required for functionality, but worth considering if you enforce markdownlint in CI.
Also applies to: 28-35, 140-147
d-engine-docs/src/docs/quick-start-5min.md (2)
91-101: Add language specifier to code fence.The expected output block should specify a language (or use
textfor plain output) to satisfy Markdown linting requirements.Apply this diff:
-``` +```text Starting d-engine...
151-156: Add language specifier to code fence.The conceptual diagram should specify a language (or use
textfor plain content) to satisfy Markdown linting requirements.Apply this diff:
-``` +```text 1 node = Raft cluster of 1examples/quick-start/src/main.rs (1)
42-80: Consider using byte string literals for cleaner conversions.Multiple places convert string literals to
Vec<u8>using.as_bytes().to_vec(). Using byte string literalsb"..."would be more idiomatic and concise.For example:
- client.put("workflow:status".as_bytes().to_vec(), b"running".to_vec()).await?; + client.put(b"workflow:status".to_vec(), b"running".to_vec()).await?;- let value = client.get("workflow:status".as_bytes().to_vec()).await?; + let value = client.get(b"workflow:status".to_vec()).await?;Similar changes can be applied to lines 61, 70 for consistency.
d-engine-client/src/grpc_kv_client.rs (1)
18-19: Remove or justify the unused_imports allow attribute.Line 18 suppresses unused import warnings for
CoreKvClient,KvClientError, andKvResult. If these imports are actually used in the trait implementation (lines 351-396), the allow attribute should be removed. If they're conditionally unused, add a comment explaining why.#!/bin/bash # Verify these imports are actually used in the file rg -n "CoreKvClient|KvClientError|KvResult" d-engine-client/src/grpc_kv_client.rsd-engine-server/src/node/builder.rs (1)
399-416: Leader change forwarding task lacks shutdown coordination.The spawned task at lines 408-416 will run until
leader_change_rxcloses naturally, but it doesn't participate in the shutdown signaling mechanism used elsewhere in the builder. While it will terminate when the sender is dropped, consider whether explicit shutdown coordination is needed for graceful shutdown scenarios.Also, the result of
leader_elected_tx_clone.send(leader_info)is silently discarded withlet _ = .... This is acceptable since watch channel sends only fail when there are no receivers, but a debug log on failure could aid troubleshooting.+ let shutdown = shutdown_signal.clone(); // Spawn task to forward leader changes to watch channel tokio::spawn(async move { - while let Some((leader_id, term)) = leader_change_rx.recv().await { - let leader_info = leader_id.map(|id| crate::LeaderInfo { - leader_id: id, - term, - }); - let _ = leader_elected_tx_clone.send(leader_info); + loop { + tokio::select! { + _ = shutdown.changed() => break, + msg = leader_change_rx.recv() => { + match msg { + Some((leader_id, term)) => { + let leader_info = leader_id.map(|id| crate::LeaderInfo { + leader_id: id, + term, + }); + let _ = leader_elected_tx_clone.send(leader_info); + } + None => break, + } + } + } } });tests/embedded/scale_to_cluster_test.rs (3)
73-74: Consider cleaning up temporary config files.Config files are written to
/tmp/d-engine-test-node*.tomland/tmp/d-engine-test-failover-node*.tomlbut are not explicitly cleaned up after tests. While/tmpis typically cleared on reboot, consider either:
- Cleaning up in the test teardown
- Using
tempfilecrate for automatic cleanupAlso applies to: 165-166
112-113: Fixed sleep for replication may cause flakiness.The 500ms sleep at line 113 assumes replication completes within this window. Under CI load or slow I/O, this could cause intermittent failures. Consider polling with a timeout instead:
// Poll until data is replicated or timeout let deadline = tokio::time::Instant::now() + Duration::from_secs(5); for (i, engine) in engines.iter().enumerate() { loop { let val = engine.client().get(b"cluster-key".to_vec()).await?; if val == Some(b"cluster-value".to_vec()) { break; } if tokio::time::Instant::now() > deadline { panic!("Node {} failed to replicate within timeout", i + 1); } tokio::time::sleep(Duration::from_millis(50)).await; } }
58-78: Consider extracting cluster setup to a helper function.The cluster initialization logic (lines 58-78 and 151-170) is nearly identical between both tests. Extracting to a helper like
start_cluster(ports, db_root, log_dir) -> Vec<EmbeddedEngine>would reduce duplication and improve maintainability.Also applies to: 151-170
d-engine-client/src/kv_error.rs (1)
56-63: Consider a dedicatedNotLeadervariant for retry logic.The
NotLeadererror code is mapped toServerError, which loses semantic information that clients could use for automatic leader redirection/retry. If embedded clients need to distinguish "not leader" from other server errors, a dedicated variant would be helpful.pub enum KvClientError { /// Channel closed (node shutdown or connection lost) ChannelClosed, /// Operation timeout Timeout, /// Server returned error (e.g., storage error) ServerError(String), + + /// Not the leader - client should redirect to leader + NotLeader(Option<u32>), // Optional leader hint /// Network error (only for remote clients) NetworkError(String), /// Invalid argument InvalidArgument(String), }Then update the conversion:
match code { ErrorCode::ConnectionTimeout => KvClientError::Timeout, - ErrorCode::NotLeader => KvClientError::ServerError(message), + ErrorCode::NotLeader => KvClientError::NotLeader(None), // Could parse leader hint from message _ => KvClientError::ServerError(message), }d-engine-server/src/embedded/mod_test.rs (1)
167-167: Remove unnecessary.to_vec()allocations.The
putmethod acceptsimpl AsRef<[u8]>, so byte string literals can be passed directly without allocating aVec.Apply this diff:
- let result = client.put(b"test_key".to_vec(), b"test_value".to_vec()).await; + let result = client.put(b"test_key", b"test_value").await;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
Cargo.lockis excluded by!**/*.lockbenches/d-engine-bench/Cargo.lockis excluded by!**/*.lockexamples/client_usage/Cargo.lockis excluded by!**/*.lockexamples/rocksdb-cluster/Cargo.lockis excluded by!**/*.lockexamples/single-node-expansion/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (44)
Cargo.toml(1 hunks)MIGRATION_GUIDE.md(3 hunks)README.md(2 hunks)d-engine-client/src/grpc_kv_client.rs(3 hunks)d-engine-client/src/kv_client.rs(1 hunks)d-engine-client/src/kv_error.rs(1 hunks)d-engine-client/src/lib.rs(4 hunks)d-engine-core/src/config/raft.rs(1 hunks)d-engine-core/src/lib.rs(1 hunks)d-engine-core/src/raft.rs(6 hunks)d-engine-core/src/raft_test.rs(1 hunks)d-engine-core/src/raft_test/leader_change_test.rs(1 hunks)d-engine-core/src/storage/lease.rs(1 hunks)d-engine-core/src/watch/manager.rs(1 hunks)d-engine-core/src/watch/mod.rs(1 hunks)d-engine-docs/src/docs/quick-start-5min.md(1 hunks)d-engine-docs/src/docs/scale-to-cluster.md(1 hunks)d-engine-server/Cargo.toml(1 hunks)d-engine-server/src/embedded/mod.rs(1 hunks)d-engine-server/src/embedded/mod_test.rs(1 hunks)d-engine-server/src/lib.rs(1 hunks)d-engine-server/src/network/grpc/watch_handler.rs(1 hunks)d-engine-server/src/node/builder.rs(2 hunks)d-engine-server/src/node/client/local_kv.rs(1 hunks)d-engine-server/src/node/client/local_kv_test.rs(1 hunks)d-engine-server/src/node/client/mod.rs(1 hunks)d-engine-server/src/node/mod.rs(6 hunks)d-engine-server/src/node/node_test.rs(4 hunks)d-engine-server/src/storage/adaptors/file/file_state_machine.rs(2 hunks)d-engine-server/src/test_utils/mock/mock_node_builder.rs(2 hunks)d-engine/src/lib.rs(3 hunks)examples/quick-start/.gitignore(1 hunks)examples/quick-start/Cargo.toml(1 hunks)examples/quick-start/Makefile(1 hunks)examples/quick-start/README.md(1 hunks)examples/quick-start/config/single-node.toml(1 hunks)examples/quick-start/src/main.rs(1 hunks)tests/cluster_start_stop/failover_test.rs(1 hunks)tests/cluster_start_stop/mod.rs(1 hunks)tests/embedded/failover_test.rs(1 hunks)tests/embedded/mod.rs(1 hunks)tests/embedded/scale_to_cluster_test.rs(1 hunks)tests/embedded/single_node_test.rs(1 hunks)tests/integration_test.rs(1 hunks)
✅ Files skipped from review due to trivial changes (10)
- d-engine-core/src/storage/lease.rs
- d-engine-core/src/watch/manager.rs
- tests/embedded/single_node_test.rs
- d-engine-core/src/watch/mod.rs
- Cargo.toml
- d-engine-core/src/lib.rs
- tests/integration_test.rs
- d-engine-server/src/storage/adaptors/file/file_state_machine.rs
- d-engine-core/src/config/raft.rs
- README.md
🚧 Files skipped from review as they are similar to previous changes (1)
- d-engine-server/src/node/node_test.rs
🧰 Additional context used
🧬 Code graph analysis (13)
d-engine-server/src/node/client/local_kv_test.rs (3)
d-engine-client/src/kv_test.rs (1)
std(560-560)d-engine-server/src/node/client/local_kv.rs (1)
from(63-69)d-engine-core/src/membership.rs (1)
all(25-31)
tests/cluster_start_stop/failover_test.rs (6)
d-engine-client/src/kv_test.rs (1)
std(560-560)tests/common/mod.rs (3)
create_bootstrap_urls(373-375)get_available_ports(491-502)start_node(213-233)d-engine-server/src/node/builder.rs (2)
node_config(206-212)new(134-147)d-engine-server/src/test_utils/mock/mock_node_builder.rs (1)
new(137-158)d-engine-server/src/embedded/mod.rs (1)
client(279-281)d-engine-server/src/node/client/local_kv.rs (1)
timeout(228-230)
d-engine-client/src/grpc_kv_client.rs (1)
d-engine-client/src/kv_client.rs (5)
put(75-79)put_with_ttl(101-106)get(137-140)get_multi(169-172)delete(194-197)
d-engine-client/src/kv_client.rs (2)
d-engine-server/src/node/client/local_kv.rs (8)
put(97-136)put(248-254)put_with_ttl(256-298)get(139-182)get(300-305)get_multi(307-354)delete(185-220)delete(356-361)d-engine-client/src/grpc_kv_client.rs (10)
put(48-83)put(352-358)put_with_ttl(98-135)put_with_ttl(360-367)get(215-220)get(369-378)get_multi(247-252)get_multi(380-388)delete(149-178)delete(390-395)
tests/embedded/failover_test.rs (2)
tests/common/mod.rs (1)
get_available_ports(491-502)d-engine-server/src/embedded/mod.rs (1)
start(133-183)
d-engine-core/src/raft_test.rs (1)
d-engine-core/src/raft_test/leader_change_test.rs (1)
test_leader_change_listener_registration(85-89)
d-engine-core/src/raft.rs (2)
d-engine-core/src/raft_test.rs (5)
mpsc(10-10)mpsc(24-24)mpsc(25-25)mpsc(44-44)mpsc(56-56)d-engine-core/src/raft_role/role_state.rs (1)
current_term(221-223)
d-engine-server/src/embedded/mod.rs (1)
d-engine-server/src/node/builder.rs (4)
new(134-147)state_machine(197-203)storage_engine(188-194)ready(571-575)
tests/embedded/scale_to_cluster_test.rs (2)
tests/common/mod.rs (1)
get_available_ports(491-502)d-engine-server/src/embedded/mod.rs (2)
with_rocksdb(90-115)start(133-183)
d-engine-server/src/embedded/mod_test.rs (1)
d-engine-server/src/embedded/mod.rs (2)
start(133-183)client(279-281)
examples/quick-start/src/main.rs (1)
d-engine-server/src/embedded/mod.rs (2)
with_rocksdb(90-115)client(279-281)
d-engine-server/src/node/builder.rs (1)
d-engine-core/src/raft.rs (2)
new(78-92)new(100-142)
d-engine-server/src/node/client/local_kv.rs (3)
d-engine-server/src/embedded/mod.rs (1)
client(279-281)d-engine-client/src/grpc_kv_client.rs (11)
put(48-83)put(352-358)new(38-40)get(215-220)get(369-378)delete(149-178)delete(390-395)put_with_ttl(98-135)put_with_ttl(360-367)get_multi(247-252)get_multi(380-388)d-engine-client/src/kv_client.rs (5)
put(75-79)get(137-140)delete(194-197)put_with_ttl(101-106)get_multi(169-172)
🪛 checkmake (0.2.2)
examples/quick-start/Makefile
[warning] 7-7: Target body for "help" exceeds allowed length of 5 (6).
(maxbodylength)
[warning] 5-5: Missing required phony target "all"
(minphony)
[warning] 5-5: Missing required phony target "test"
(minphony)
🪛 markdownlint-cli2 (0.18.1)
d-engine-docs/src/docs/scale-to-cluster.md
29-29: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
132-132: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
140-140: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
147-147: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
d-engine-docs/src/docs/quick-start-5min.md
91-91: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
151-151: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Lint and Format Check
- GitHub Check: Lint and Format Check
🔇 Additional comments (55)
examples/quick-start/.gitignore (1)
1-13: LGTM!The
.gitignoreconfiguration is well-structured and appropriate for a Rust example project with embedded/local operation. Patterns correctly exclude build artifacts, runtime data, and OS/editor temporary files, with clear section comments.d-engine-server/src/network/grpc/watch_handler.rs (1)
214-214: Explicit enum-to-i32 cast in test assertion looks correctUsing
ProtoWatchEventType::Put as i32matches thei32type ofWatchResponse.event_typeand avoids relying onIntoin the assertion. Behavior is unchanged and the intent is clear.d-engine-core/src/raft_test.rs (1)
1-63: LGTM! Basic channel semantics tests are well-structured.The tests correctly validate the fundamental channel behavior for leader change notifications. Using synchronous
#[test]withtry_recv()is appropriate here since these tests only verify channel mechanics without async operations.d-engine-core/src/raft.rs (2)
356-366: Verify learner notification semantics match expectations.When a node becomes a learner, it notifies
Noneas leader_id. If learners can join a cluster with an existing leader, consider whether notifying the actual leader (if known) would be more accurate for downstream consumers.If learners always transition through follower state to learn the leader, the current approach is fine. Otherwise, consider passing leader info if available.
144-157: Well-documented API with clear semantics.The documentation clearly explains the notification semantics (Some/None for leader presence) and performance characteristics. The implementation correctly follows the existing listener pattern.
tests/cluster_start_stop/mod.rs (1)
2-2: Newfailover_testmodule wiring looks correctThe additional module declaration is consistent with the existing layout and cleanly wires in the new failover tests.
tests/cluster_start_stop/failover_test.rs (2)
20-113: Failover test flow is solid once shutdown signaling is fixedAside from the shutdown
sendissue, the 3-node failover test is structured well: it verifies pre-failover writes, kills one node, waits for re-election, refreshes the client, validates both existing and new data, then restarts the node and checks data sync. The join-handle shutdown pattern with??correctly propagates both join and node errors.
118-196: Minority failure behavior is asserted clearlyThe minority test correctly:
- Starts a 3-node cluster,
- Writes initial data,
- Shuts down two nodes and waits,
- Uses a timeout-wrapped write plus error check to assert that writes fail without a majority,
- Then shuts down the remaining node.
Once the shutdown
sendcalls are fixed as above, this gives good coverage of the “no majority, no writes” invariant.examples/quick-start/config/single-node.toml (1)
1-11: Single-node config looks consistent with the quick-start flowThe config is minimal, clear, and matches the embedded single-node example (id = 1 and
./data/single-nodedata dir). No issues from a review perspective.d-engine-docs/src/docs/quick-start-5min.md (1)
1-373: LGTM! Comprehensive documentation for embedded mode.The quick-start guide provides clear, step-by-step instructions with working code examples that align with the actual API implementation. The structure covers prerequisites, usage, concepts, troubleshooting, and next steps effectively.
d-engine-client/src/lib.rs (1)
113-141: LGTM! API surface refactoring is consistent.The Client struct and accessor method have been correctly updated to use GrpcKvClient throughout.
examples/quick-start/README.md (1)
1-167: LGTM! Clear and comprehensive example documentation.The README effectively communicates the embedded mode value proposition and provides practical guidance for users getting started.
examples/quick-start/src/main.rs (1)
10-40: LGTM! Clean example demonstrating embedded mode setup.The main function follows a clear sequence: initialize engine, wait for readiness, wait for leader, use client, and gracefully shut down. Error handling is appropriate for an example.
d-engine-server/src/node/client/local_kv_test.rs (1)
1-102: LGTM! Comprehensive test coverage for result reconstruction.The tests thoroughly verify the critical fix for preserving positional correspondence in multi-key results. The test scenarios cover sparse results, full results, empty results, and empty values.
d-engine-server/src/node/client/mod.rs (1)
1-7: LGTM! Clean module organization.The module properly exposes the LocalKvClient API and includes test coverage.
d-engine-client/src/grpc_kv_client.rs (2)
352-396: LGTM! Trait implementation correctly adapts GrpcKvClient to CoreKvClient interface.The trait methods properly delegate to the underlying GrpcKvClient implementation and handle the conversion between
ClientResultandBytescorrectly.
347-396: Error conversion trait implementation verified and working correctly.The
impl From<ClientApiError> for KvClientErrorexists atd-engine-client/src/kv_error.rs:50and properly handles all error variants, including special cases for timeout and not-leader errors. The.map_err(Into::into)pattern used throughout the trait implementation (lines 347-396) is fully supported and correctly convertsClientApiErrortoKvClientError.d-engine-server/src/node/builder.rs (1)
441-455: LGTM!The new
ready_notify_txandleader_elected_tx/_leader_elected_rxfields properly extend theNodestruct to support readiness and leader election notifications for embedded usage. The underscore-prefixed_ready_notify_rxis correctly discarded here since consumers will obtain receivers vianode.ready_notifier().d-engine-server/src/lib.rs (2)
63-75: LGTM!The new
embeddedmodule andLeaderInfostruct are well-documented. The struct appropriately derivesDebug,Clone,PartialEq, andEq. SinceLeaderInfois small (12 bytes: u32 + u64), you could consider addingCopyfor ergonomic usage, but this is optional.
83-84: LGTM!The public exports for
EmbeddedEngineandLocalKvClientprovide a clean API surface for embedded usage patterns.tests/embedded/scale_to_cluster_test.rs (1)
24-134: Well-structured integration test for cluster scaling.The
test_scale_single_to_clustertest properly validates the scaling scenario with clear phases and descriptive assertions. The test correctly verifies data preservation across the transition from single-node to cluster mode.tests/embedded/failover_test.rs (2)
232-243: Good defensive assertion for minority failure.The assertion correctly handles both timeout (
write_result.is_err()) and explicit error (write_result.unwrap().is_err()) cases, properly validating that writes fail without quorum.
135-164: Good coverage of node rejoin and data sync.The test properly verifies that a restarted node can sync data written during its downtime, validating the Raft log replication mechanism.
d-engine-client/src/kv_error.rs (1)
1-45: Well-structured error module.The error types are well-documented with clear variant descriptions. The
Displayimplementation provides user-friendly messages, and thestd::error::Errortrait implementation enables proper error chaining.d-engine-server/src/test_utils/mock/mock_node_builder.rs (2)
299-310: LGTM! Notification channels properly wired for testing.The readiness and leader-election channels are correctly initialized and stored in the Node. The
_ready_notify_rxis intentionally discarded while_leader_elected_rxis kept to prevent channel closure—this pattern aligns with the production builder implementation.
336-347: LGTM! Consistent channel initialization in RPC server variant.The notification channels follow the same pattern as
build_node(), ensuring consistent behavior across test configurations.d-engine-server/src/node/mod.rs (6)
23-24: LGTM! Clean module organization for client abstractions.The new
clientmodule and its public re-export appropriately expose the local KV client surface for embedded usage.
87-95: LGTM! Well-documented notification channels.The readiness and leader-election notification fields are clearly documented and properly typed. Keeping
_leader_elected_rxalive prevents premature channel closure.
190-191: LGTM! Readiness notification correctly triggered.The notification is sent when readiness state changes, enabling external observers to react to node initialization completion.
203-239: LGTM! Clear and well-documented notification accessors.Both
ready_notifier()andleader_elected_notifier()provide excellent documentation with usage examples and performance characteristics. The event-driven approach avoids polling overhead.
267-273: LGTM! Documentation now accurately describesnode_id().The misleading "creates a local KV client" text from the previous review has been corrected. The doc comment now clearly states this method returns the node's unique identifier.
290-296: LGTM! Local client factory method is well-designed.The
local_client()method correctly constructs aLocalKvClientwith the node's event channel, ID, and timeout configuration. The documentation accurately describes the performance benefits and usage pattern.d-engine/src/lib.rs (3)
134-145: LGTM! Clean embedded mode API surface.The new exports (
EmbeddedEngine,LeaderInfo,LocalKvClient) provide a clear and well-organized API for embedded usage, aligning with the PR's objectives.
199-204: LGTM! Prelude updated to expose embedded types.The prelude now includes
EmbeddedEngine,LeaderInfo, andLocalKvClientunder the server feature, andKvClientunder the full feature, providing convenient imports for common use cases.
106-106: Based on my verification, I can now provide the rewritten review comment:Misleading breaking change claim:
KvClientis not removed from top-level exports.The review comment incorrectly states that
KvClientis "replaced" byGrpcKvClient. In reality:
KvClient(the trait) remains exported at line 127 under theclientfeatureGrpcKvClient(the concrete implementation) is added at line 106- Users importing
d_engine::KvClientdirectly will not breakThe actual change is in the prelude:
GrpcKvClientis now the primary prelude export for theclientfeature (line 195), whileKvClientmoved to the prelude only under thefullfeature (line 204). This only affects users relying onuse d_engine::prelude::*with theclientfeature alone, not direct imports ofd_engine::KvClient.No migration guide or release notes were found documenting this change.
Likely an incorrect or invalid review comment.
d-engine-server/src/embedded/mod.rs (7)
64-70: LGTM! Well-structured embedded engine lifecycle state.The fields appropriately capture the engine's lifecycle: background task handle, shutdown signal, KV client, and notification receivers. The
Option<JoinHandle>enables safe consumption instop().
90-115: LGTM! Convenient RocksDB quick-start method.The
with_rocksdb()method provides an excellent developer experience by automatically creating directories and using sensible defaults. The empty-path fallback to/tmp/d-engineis helpful for development.
148-172: LGTM! Clean node lifecycle orchestration.The
start()method correctly:
- Creates shutdown channel
- Builds and readies the node
- Captures notification receivers before moving node
- Creates local client before spawning
- Spawns background task with proper error propagation
The ordering ensures all resources are available before returning.
198-201: LGTM! Simple readiness check.The
ready()method efficiently waits for node initialization using the watch channel without polling.
219-239: LGTM! Robust leader election wait with timeout.The
wait_leader()implementation correctly:
- Uses event-driven notification (no polling)
- Respects timeout
- Returns leader info when available
- Provides clear error on timeout
297-318: LGTM! Graceful shutdown with error propagation.The
stop()method correctly:
- Takes ownership (via
mut self) to prevent reuse- Sends shutdown signal
- Takes the handle to await completion
- Propagates both task panics and Raft errors
The
Option::take()pattern ensures idempotent behavior if called multiple times (though the consuming signature prevents this).
321-330: LGTM! Appropriate Drop warning for resource leak prevention.The Drop implementation warns if
stop()wasn't called, helping catch resource leaks during development. This is a reasonable approach—forcing an abort would be too harsh for an embedded library.d-engine-server/src/node/client/local_kv.rs (8)
31-70: LGTM! Well-structured error types with proper conversions.The
LocalClientErrorenum covers all failure modes (channel closed, timeout, server error) and correctly converts toKvClientErrorfor trait compatibility. The error messages are clear and actionable.
76-94: LGTM! Clean local client structure.The client holds only the essential state: event channel, client ID, and timeout. The
pub(crate)constructor appropriately restricts instantiation to the server crate.
97-136: LGTM! Correct put implementation with proper error handling.The method correctly:
- Builds write command and request
- Sends via event channel
- Awaits response with timeout
- Validates error code
- Propagates errors appropriately
139-182: LGTM! Correct get implementation.The single-key get correctly handles the response, returning
Nonefor missing keys andSome(value)for found keys. The logic at lines 176-178 appropriately extracts the first result.
185-220: LGTM! Correct delete implementation.The delete method follows the same robust pattern as put, ensuring consistent error handling and timeout behavior.
256-298: LGTM! TTL support correctly implemented.The
put_with_ttlmethod usesWriteCommand::insert_with_ttland follows the same error handling pattern as regular put. This ensures consistent behavior across write operations.
340-354: LGTM!get_multicorrectly reconstructed per past review.The implementation now properly handles the server's sparse response format:
- Builds a lookup map from returned results (key → value)
- Iterates the original keys slice to reconstruct the output vector
- Returns
Some(value)for found keys andNonefor missing keys- Preserves the original request order
This matches the fix described in the past review comment and aligns with the gRPC client reference implementation.
222-243: LGTM! Helpful accessor methods and debug formatting.The
client_id()andtimeout()accessors are useful for logging and debugging. The Debug implementation provides clear output without exposing sensitive channel internals.d-engine-client/src/kv_client.rs (1)
1-198: Excellent trait design and documentation!The KvClient trait provides a clean, well-documented abstraction that successfully unifies remote and embedded access patterns. The documentation is comprehensive, covering consistency guarantees, thread safety, performance characteristics, and usage examples. The async-first design with proper
Send + Syncbounds is idiomatic and appropriate for concurrent use cases.d-engine-server/src/embedded/mod_test.rs (1)
12-55: Strong test coverage for EmbeddedEngine lifecycle.The test suite provides comprehensive coverage of the EmbeddedEngine API surface, including:
- Leader election in single-node mode
- Leader change notifications and multi-subscriber behavior
- Ready/wait sequences with timing validation
- Client availability post-election
- Clean shutdown
The tests use appropriate assertions, timeouts, and cleanup patterns.
Also applies to: 78-111, 113-148, 176-226
d-engine-server/Cargo.toml (2)
25-25: Dependency addition supports embedded client integration.Adding the
d-engine-clientworkspace dependency is correct and necessary for the embedded mode feature, enabling the server to exposeLocalKvClientand integrate with the unifiedKvClienttrait.
32-32: The async-trait version constraint is appropriate.The dependency specifies
async-trait = "0.1", which uses SemVer-compatible versioning and will resolve to the latest 0.1.x version available (currently 0.1.89). This constraint automatically receives security updates and improvements within the 0.1 release line while preventing breaking changes from a 0.2.0+ upgrade. No action is required.tests/embedded/mod.rs (1)
1-3: Clean module organization.The module declarations follow standard Rust conventions for organizing tests into separate files. This structure supports the embedded testing suite introduced in this PR.
d-engine-client/src/kv_client.rs
Outdated
| #[allow(unused_imports)] | ||
| use crate::kv_error::{KvClientError, KvResult}; |
There was a problem hiding this comment.
Remove unnecessary #[allow(unused_imports)] directive.
Both KvClientError and KvResult are actively used in the trait method return types (lines 79, 106, 140, 172, 197), so the unused imports directive is incorrect and should be removed.
Apply this diff:
-#[allow(unused_imports)]
use crate::kv_error::{KvClientError, KvResult};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[allow(unused_imports)] | |
| use crate::kv_error::{KvClientError, KvResult}; | |
| use crate::kv_error::{KvClientError, KvResult}; |
🤖 Prompt for AI Agents
In d-engine-client/src/kv_client.rs around lines 30 to 31, remove the incorrect
#[allow(unused_imports)] attribute preceding the use statement so the file reads
just "use crate::kv_error::{KvClientError, KvResult};"; ensure the import line
remains intact (do not delete the actual imports) since KvClientError and
KvResult are used in trait method return types.
| #[tokio::test] | ||
| async fn test_notify_leader_change_become_follower_with_leader() { | ||
| let (raft, mut rx) = create_test_raft().await; | ||
|
|
||
| // Simulate becoming follower with known leader | ||
| let leader_id = Some(2); | ||
| let term |
There was a problem hiding this comment.
Incomplete test - file appears truncated.
The test test_notify_leader_change_become_follower_with_leader is incomplete. Line 97 shows let term without an assignment or the rest of the test body. This will cause a compilation error.
Please complete the test implementation or remove it if it's work-in-progress:
#[tokio::test]
async fn test_notify_leader_change_become_follower_with_leader() {
let (raft, mut rx) = create_test_raft().await;
// Simulate becoming follower with known leader
let leader_id = Some(2);
let term = 5u64;
// TODO: Complete test - trigger BecomeFollower event and verify notification
// Example:
// raft.handle_role_event(RoleEvent::BecomeFollower(leader_id)).await.unwrap();
// let (received_leader, received_term) = rx.recv().await.expect("Should receive notification");
// assert_eq!(received_leader, leader_id);
}| #[tokio::test] | ||
| async fn test_wait_leader_timeout() { | ||
| let (storage, sm) = create_test_storage_and_sm().await; | ||
|
|
||
| let engine = | ||
| EmbeddedEngine::start(None, storage, sm).await.expect("Failed to start engine"); | ||
|
|
||
| engine.ready().await; | ||
|
|
||
| // In single-node mode, leader should be elected immediately | ||
| // But if we had a cluster without quorum, this would timeout | ||
| // For this test, we verify timeout mechanism works | ||
| let very_short_timeout = Duration::from_nanos(1); | ||
|
|
||
| // Note: This might still succeed if election happens instantly | ||
| // The test verifies timeout handling exists | ||
| let _ = engine.wait_leader(very_short_timeout).await; | ||
|
|
||
| engine.stop().await.expect("Failed to stop engine"); | ||
| } |
There was a problem hiding this comment.
Test does not effectively verify timeout behavior.
The test uses an impractically short timeout (Duration::from_nanos(1)) and discards the result without assertions. The inline comment acknowledges that the test might still succeed if election happens instantly, making it ineffective at verifying timeout handling.
Consider either:
- Removing this test if timeout behavior is covered elsewhere
- Restructuring to test timeout in a multi-node scenario without quorum
- At minimum, assert on the result to verify expected timeout behavior
- // Note: This might still succeed if election happens instantly
- // The test verifies timeout handling exists
- let _ = engine.wait_leader(very_short_timeout).await;
+ // In single-node, election is instant, so timeout won't trigger
+ // This would be better tested in a multi-node cluster scenario
+ let result = engine.wait_leader(very_short_timeout).await;
+ // Assert result regardless of success/timeout to ensure API works
+ let _ = result; // Can be Ok or Err depending on timingCommittable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In d-engine-server/src/embedded/mod_test.rs around lines 57-76, the test uses an
impractically tiny timeout (Duration::from_nanos(1)) and discards the result so
it does not actually verify timeout behavior; either remove the test or make it
deterministic: create a scenario without quorum (or mock/stub leader election)
so election cannot succeed, call engine.wait_leader with a reasonable timeout
(e.g. milliseconds) and assert on the returned error/timeout value (or if API
returns Option, assert None); alternatively, if coverage exists elsewhere,
delete this redundant test.
examples/quick-start/Cargo.toml
Outdated
| [dependencies] | ||
| d-engine = { path = "../../d-engine", features = ["server", "rocksdb"] } | ||
|
|
||
| tokio = { version = "1.4", features = ["rt-multi-thread", "sync", "signal"] } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all Cargo.toml files and check tokio versions
echo "=== Searching for tokio versions in Cargo.toml files ==="
fd -n "Cargo.toml" -x rg -l "tokio" {} \; | while read file; do
echo ""
echo "File: $file"
rg -n "tokio" "$file" | head -20
doneRepository: deventlab/d-engine
Length of output: 366
🏁 Script executed:
#!/bin/bash
# Find all Cargo.toml files and check tokio versions
echo "=== Searching for tokio versions in all Cargo.toml files ==="
find . -name "Cargo.toml" -type f | while read file; do
if rg -q "tokio" "$file"; then
echo ""
echo "File: $file"
rg -n "tokio" "$file"
fi
doneRepository: deventlab/d-engine
Length of output: 2285
Based on my verification, I've found significant version inconsistencies across the workspace:
Key findings:
- Root workspace Cargo.toml defines
tokio = "1.4"(line 30 in ./Cargo.toml) - Multiple examples use this outdated 1.4 version (quick-start, rocksdb-cluster, sled-cluster, three-nodes-cluster, single-node-expansion)
- Other examples use much newer versions: 1.36.0 (client_usage) and 1.44 (benches)
- Core packages (d-engine-server, d-engine-client, d-engine-core) reference workspace version via
{ workspace = true }, inheriting the old 1.4
The quick-start example's use of 1.4 is consistent with the workspace definition, but this is a workspace-wide outdated version issue, not a quick-start-specific problem. The version 1.4 is from 2021 and should be updated.
Update the workspace root tokio version to align with other examples and current Tokio releases (e.g., 1.36.0 or later). The quick-start example will inherit the corrected version via workspace dependency resolution.
🤖 Prompt for AI Agents
In examples/quick-start/Cargo.toml around line 9 (and importantly the workspace
root Cargo.toml where tokio is defined at line ~30), the project currently pins
tokio = "1.4" which is outdated and causing workspace-wide inconsistencies;
update the tokio entry in the root workspace Cargo.toml to a modern compatible
version (e.g., "1.36.0" or later) so all packages that use { workspace = true }
inherit the new version, then run cargo update or cargo metadata to verify
dependency resolution and adjust any code/features if the newer Tokio requires
minor API changes.
| info!("Initial data written. Killing node 1 (likely leader)"); | ||
|
|
||
| // Kill node 1 (typically the leader in 3-node bootstrap) | ||
| ctx.graceful_txs[0].send(()).await.map_err(|_| ClientApiError::ChannelClosed)?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name failover_test.rs -type fRepository: deventlab/d-engine
Length of output: 139
🏁 Script executed:
wc -l tests/cluster_start_stop/failover_test.rs 2>/dev/null || find . -name "*.rs" -path "*failover*" -type fRepository: deventlab/d-engine
Length of output: 107
🏁 Script executed:
git ls-files | grep -i failoverRepository: deventlab/d-engine
Length of output: 135
🏁 Script executed:
fd -t f failover --extension rsRepository: deventlab/d-engine
Length of output: 135
🌐 Web query:
tokio::sync::watch::Sender send method signature async or synchronous
💡 Result:
Synchronous — signature is pub fn send(&self, value: T) -> Result<(), SendError<T>> (not async). [1][2]
Sources:
[1] tokio docs: Sender::send signature and description.
[2] tokio::sync::watch module docs.
🏁 Script executed:
sed -n '55,70p' tests/cluster_start_stop/failover_test.rsRepository: deventlab/d-engine
Length of output: 689
🏁 Script executed:
sed -n '158,175p' tests/cluster_start_stop/failover_test.rsRepository: deventlab/d-engine
Length of output: 665
🏁 Script executed:
sed -n '185,196p' tests/cluster_start_stop/failover_test.rsRepository: deventlab/d-engine
Length of output: 405
🏁 Script executed:
rg -n "fn start_node" tests/ -A 5 -B 2Repository: deventlab/d-engine
Length of output: 533
🏁 Script executed:
sed -n '213,230p' tests/common/mod.rsRepository: deventlab/d-engine
Length of output: 775
Perfect. The verification is complete. The issue is confirmed:
start_nodereturnswatch::Sender<()>(confirmed in tests/common/mod.rs:213-227)watch::Sender::send()is a synchronous method that returnsResult<(), SendError<()>>(confirmed by API documentation)- All three locations have
.send(()).awaitwhich is a compile-time type error - The proposed fix (removing
.await) is correct
Now I'll generate the final rewritten review comment:
Fix compile-time error: remove incorrect .await on synchronous watch::Sender::send
watch::Sender::send() is synchronous and returns Result<(), SendError<()>> directly—not a Future. The .await keyword cannot be applied here and will cause a type error.
Remove .await from all three call sites:
- ctx.graceful_txs[0].send(()).await.map_err(|_| ClientApiError::ChannelClosed)?;
+ ctx.graceful_txs[0]
+ .send(())
+ .map_err(|_| ClientApiError::ChannelClosed)?;
- for i in 0..2 {
- ctx.graceful_txs[i].send(()).await.map_err(|_| ClientApiError::ChannelClosed)?;
+ for i in 0..2 {
+ ctx.graceful_txs[i]
+ .send(())
+ .map_err(|_| ClientApiError::ChannelClosed)?;
ctx.node_handles[i]
.await
.map_err(|e| ClientApiError::ServerError(format!("Node shutdown failed: {e}")))??;
}
- ctx.graceful_txs[2].send(()).await.map_err(|_| ClientApiError::ChannelClosed)?;
+ ctx.graceful_txs[2]
+ .send(())
+ .map_err(|_| ClientApiError::ChannelClosed)?;Lines: 61, 163–167, 190–193
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ctx.graceful_txs[0].send(()).await.map_err(|_| ClientApiError::ChannelClosed)?; | |
| ctx.graceful_txs[0] | |
| .send(()) | |
| .map_err(|_| ClientApiError::ChannelClosed)?; |
🤖 Prompt for AI Agents
In tests/cluster_start_stop/failover_test.rs around lines 61, 163-167, and
190-193 the code incorrectly calls .await on watch::Sender::send(), but
watch::Sender::send() is synchronous and returns Result<(), SendError<()>>;
remove the trailing .await from each .send(()).await call so they become
.send(()), keeping the existing map_err handling and return types intact.
tests/embedded/failover_test.rs
Outdated
| // Kill node 1 (likely the initial leader) | ||
| info!("Killing node 1"); | ||
| let killed_engine = engines.remove(0); | ||
| let killed_config = configs.remove(0); | ||
| killed_engine.stop().await?; | ||
|
|
||
| // Wait for re-election event | ||
| info!("Waiting for re-election"); | ||
| tokio::time::timeout(Duration::from_secs(5), leader_rx.changed()) | ||
| .await | ||
| .expect("Should receive leader change notification")?; | ||
|
|
||
| let new_leader = leader_rx.borrow().clone(); | ||
| assert!(new_leader.is_some(), "New leader should be elected"); | ||
|
|
||
| let new_leader_info = new_leader.unwrap(); | ||
| assert_ne!( | ||
| new_leader_info.leader_id, 1, | ||
| "New leader should not be node 1" | ||
| ); |
There was a problem hiding this comment.
Assertion assumes node 1 was the initial leader.
The test kills node 1 (engines[0]) assuming it's "likely the initial leader" (line 86 comment), but then asserts new_leader_info.leader_id != 1 at line 103. If node 1 wasn't the leader, killing it wouldn't trigger a leadership change, and the test logic would be incorrect.
Consider either:
- Killing the actual leader based on
initial_leader.leader_id - Adjusting the assertion to verify the new leader differs from the killed node
- // Kill node 1 (likely the initial leader)
- info!("Killing node 1");
- let killed_engine = engines.remove(0);
- let killed_config = configs.remove(0);
+ // Find and kill the leader node
+ let leader_idx = (initial_leader.leader_id - 1) as usize;
+ info!("Killing leader node {}", initial_leader.leader_id);
+ let killed_engine = engines.remove(leader_idx);
+ let killed_config = configs.remove(leader_idx);
killed_engine.stop().await?;
// ... later assertion ...
- assert_ne!(
- new_leader_info.leader_id, 1,
- "New leader should not be node 1"
- );
+ assert_ne!(
+ new_leader_info.leader_id, initial_leader.leader_id,
+ "New leader should not be the killed node"
+ );Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In tests/embedded/failover_test.rs around lines 86 to 105, the test
unconditionally kills engines[0] assuming it was the leader and then asserts the
new leader is not node 1; instead determine which node is the actual leader
before killing: read initial_leader = leader_rx.borrow().clone() (or the
recorded initial leader_id), remove/stop the engine whose id equals
initial_leader.leader_id, then wait for election and assert the new leader_id !=
initial_leader.leader_id; alternatively, if you prefer to keep killing
engines[0], change the assertion to check that the new leader_id !=
killed_node_id (the id of the removed engine) or accept new_leader.is_some()
without assuming a specific id.
EmbeddedEngine only called .build().ready() without starting RPC server. This caused cluster nodes unable to communicate (Connection refused). Solution: Add .start_rpc_server().await after .build() Why start RPC even for single-node: - Enables dynamic membership (add nodes later) - Uniform code path (reduces special cases) - Minimal overhead (~10ms startup) - Follows Raft best practice (etcd/consul) Tested: 3-node cluster now forms successfully with unified leader
…r app layer Enhanced LocalClientError to expose leader information when write operations hit follower nodes, enabling applications to implement smart retry logic. Changes: - Added NotLeader variant to LocalClientError with leader_id and leader_address - Implemented map_error_response() to extract leader info from ErrorMetadata - Exported LocalClientError in d-engine/prelude for application layer access - Updated all error handling to use structured error mapping instead of string patterns - Fixed clippy uninlined_format_args warnings Why: - Applications need leader address to implement retry/redirect strategies - Follows industry best practices (Patroni, Consul, etcd patterns) - Type-safe error handling vs fragile string matching Breaking Changes: None (backward compatible enhancement)
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
d-engine-server/src/node/client/local_kv.rs (1)
383-397: Previous review addressed:get_multinow correctly preserves key order.The implementation now uses a
HashMapto reconstruct results in the original key order, correctly handling the case where the server only returns results for existing keys.
🧹 Nitpick comments (3)
d-engine/src/lib.rs (1)
123-128: Consider clarifying the section name.The "Core API" label may be confused with the
d-engine-corecrate mentioned in the architecture section (line 81). Since this exports theKvClienttrait (the abstraction overGrpcKvClientandLocalKvClient), a more descriptive name would improve clarity.-// ==================== Core API ==================== +// ==================== KV Client Trait ==================== #[cfg(feature = "client")] #[cfg_attr(docsrs, doc(cfg(feature = "client")))] pub use d_engine_client::KvClient;d-engine-server/src/node/client/local_kv.rs (2)
129-150: Consider enriching error messages for unknown error codes.The catch-all case at line 148 only includes the numeric error code. Consider including the error code name when
try_fromsucceeds but isn'tNotLeader, to improve debuggability.- _ => LocalClientError::ServerError(format!("Error code: {error_code}")), + Ok(code) => LocalClientError::ServerError(format!("Error: {:?}", code)), + Err(_) => LocalClientError::ServerError(format!("Unknown error code: {error_code}")),
303-343:put_with_ttlduplicates logic fromput- acceptable but could be refactored.The
put_with_ttlmethod duplicates the request/response handling from the inherentputmethod. This is acceptable for now, but could be refactored to share the common logic if the pattern grows.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
d-engine-server/src/embedded/mod.rs(1 hunks)d-engine-server/src/lib.rs(1 hunks)d-engine-server/src/node/client/local_kv.rs(1 hunks)d-engine/src/lib.rs(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
d-engine-server/src/embedded/mod.rs (1)
d-engine-server/src/node/builder.rs (3)
new(134-147)state_machine(197-203)storage_engine(188-194)
🔇 Additional comments (17)
d-engine/src/lib.rs (4)
106-106: LGTM!The
GrpcKvClientexport is correctly placed alongside other specialized clients, consistent with the refactor whereKvClientbecomes a trait andGrpcKvClientis the concrete gRPC implementation.
134-145: LGTM!Server API exports are well-organized with clear grouping comments. The addition of
EmbeddedEngine,LeaderInfo, andLocalKvClientproperly supports the new embedded mode functionality.
195-195: LGTM!The client prelude correctly includes
GrpcKvClientfor convenient imports.
199-204: LGTM!The server prelude correctly includes the new embedded mode types. The
KvClienttrait being gated behindfullfeature in the prelude (vs.clientat top-level) is a reasonable design—users writing generic code over bothGrpcKvClientandLocalKvClientwould typically have both features enabled.d-engine-server/src/node/client/local_kv.rs (5)
1-16: Well-documented module with clear performance claims.The documentation is excellent with clear performance expectations and usage examples. However, the performance claims ("10-20x faster", "<0.1ms latency") should ideally be backed by benchmarks.
82-103: LGTM: Error conversion is well-structured.The
From<LocalClientError> for KvClientErrorimplementation correctly maps local error types to client error types. TheNotLeaderconversion appropriately includes leader hint information in the error message when available.
152-189: LGTM: Clean implementation with proper error handling.The
putmethod correctly handles the full request lifecycle: command creation, sending, timeout, and error mapping. The use ofMaybeCloneOneshotfor the response channel is appropriate.
222-232: LGTM: Proper handling of read response.The get method correctly handles the case where the key doesn't exist (empty results list) and properly extracts the value when present. The fallback to
Nonefor unexpected response types is safe.
234-267: LGTM: Consistent with put method pattern.The delete method correctly follows the same pattern as
put, usingWriteCommand::deletefor the operation.d-engine-server/src/lib.rs (3)
63-66: LGTM: New embedded module with appropriate documentation.The embedded module is properly declared with concise documentation explaining its purpose.
68-75: LGTM: Simple and well-documented struct.The
LeaderInfostruct provides the essential information for leader change notifications with appropriate derives.
82-84: LGTM: Public API surface expanded appropriately.The re-exports expose
EmbeddedEngine,LocalKvClient, andLocalClientErrorat the crate root, providing a clean public API for embedded usage.d-engine-server/src/embedded/mod.rs (5)
1-29: Excellent module documentation with clear comparison.The documentation clearly explains when to use
EmbeddedEnginevsNodedirectly, which will help developers make the right choice.
89-115: LGTM: Sensible defaults for RocksDB initialization.The constructor handles empty paths gracefully with a reasonable default, and properly creates the base directory before initializing storage engines.
147-155: Consider:.build().await?.start_rpc_server().await.ready()?call sequence.The chain of
build().await?.start_rpc_server().await.ready()?is correct per the API, but worth noting thatready()here transitions the node to a ready state, not waiting for leader election (which is handled separately bywait_leader()).
299-320: LGTM: Proper shutdown handling.The
stopmethod correctly consumesselfto prevent double-stop, handles task panics separately from inner errors, and gracefully ignores send errors when the receiver may already be dropped.
323-331: Good defensive programming in Drop.The Drop implementation appropriately warns about potential task leaks when
stop()wasn't called. This helps developers identify lifecycle issues during development.
| pub async fn wait_leader( | ||
| &self, | ||
| timeout: std::time::Duration, | ||
| ) -> Result<crate::LeaderInfo> { | ||
| let mut rx = self.leader_elected_rx.clone(); | ||
|
|
||
| tokio::time::timeout(timeout, async { | ||
| loop { | ||
| // Wait for leader election event (event-driven, no polling) | ||
| let _ = rx.changed().await; | ||
|
|
||
| // Check if a leader is elected | ||
| if let Some(info) = rx.borrow().as_ref() { | ||
| info!("Leader elected: {} (term {})", info.leader_id, info.term); | ||
| return Ok(info.clone()); | ||
| } | ||
| } | ||
| }) | ||
| .await | ||
| .map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))? | ||
| } |
There was a problem hiding this comment.
Potential race: wait_leader may miss already-elected leader.
If a leader is elected before wait_leader() is called, the rx.changed().await on line 230 will block waiting for the next change, potentially missing the already-elected leader. The method should check the current value first.
pub async fn wait_leader(
&self,
timeout: std::time::Duration,
) -> Result<crate::LeaderInfo> {
let mut rx = self.leader_elected_rx.clone();
tokio::time::timeout(timeout, async {
+ // Check current value first (leader may already be elected)
+ if let Some(info) = rx.borrow().as_ref() {
+ info!("Leader already elected: {} (term {})", info.leader_id, info.term);
+ return Ok(info.clone());
+ }
+
loop {
// Wait for leader election event (event-driven, no polling)
let _ = rx.changed().await;
// Check if a leader is elected
if let Some(info) = rx.borrow().as_ref() {
info!("Leader elected: {} (term {})", info.leader_id, info.term);
return Ok(info.clone());
}
}
})
.await
.map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
}🤖 Prompt for AI Agents
In d-engine-server/src/embedded/mod.rs around lines 221-241, wait_leader
currently always awaits rx.changed() first and can miss an already-elected
leader; change the logic to check the current value from rx.borrow().as_ref()
before awaiting a change, returning immediately if a leader is present, and only
call rx.changed().await when there is no leader yet; keep this inside the loop
so after a changed notification you re-check rx.borrow() and handle spurious
wakeups, and preserve the existing timeout/error behavior.
Fix race condition where wait_leader() always waits for next change event even if leader is already elected, causing unnecessary delays. Changes: - Check current leader value before waiting for change event - Add unit tests for race condition scenarios - Remove unused leader_change_test.rs module - Fix clippy warnings (unused_imports in kv_client.rs) - Standardize tokio version to "1" across all crates - Fix async send() calls (remove unnecessary .await) Impact: - wait_leader() returns immediately if leader already elected - Concurrent calls resolve in <100ms (vs potential seconds) - Better startup performance for clients/benchmarks Tests: - test_wait_leader_race_condition_already_elected - test_wait_leader_multiple_calls_concurrent - test_wait_leader_check_current_value_first
…ty() for commit index logic
Type
Description
#182
Related Issues
Checklist
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.