Skip to content

fix: broadcast DEL_PREFIX to all shard groups in ShardedCoordinator#482

Merged
bootjp merged 8 commits intomainfrom
feat/fix-jepsen
Apr 12, 2026
Merged

fix: broadcast DEL_PREFIX to all shard groups in ShardedCoordinator#482
bootjp merged 8 commits intomainfrom
feat/fix-jepsen

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 11, 2026

DEL_PREFIX with a nil key (FLUSHALL) or a prefix spanning multiple shards could not be routed by groupMutations, causing "invalid request" errors in production where ShardedCoordinator is always used.

DEL_PREFIX with a nil key (FLUSHALL) or a prefix spanning multiple
shards could not be routed by groupMutations, causing "invalid request"
errors in production where ShardedCoordinator is always used.
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements broadcasting for DEL_PREFIX operations within the ShardedCoordinator to ensure all shards process the prefix deletion. The review feedback identifies several critical issues: broadcasting mixed operation types (e.g., PUT and DEL_PREFIX) can lead to data loss because the underlying FSM only processes the first DEL_PREFIX mutation, and DEL_PREFIX is currently unsupported within transactions. Additionally, the HLC timestamp should be generated once outside the broadcast loop to maintain consistency across shards, and the FSM's limitation of only processing a single DEL_PREFIX mutation per request should be handled.

Comment on lines +74 to +76
if hasDelPrefixElem(reqs.Elems) {
return c.dispatchDelPrefixBroadcast(reqs.Elems)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Broadcasting an entire OperationGroup that contains mixed operation types (e.g., PUT and DEL_PREFIX) is problematic for two reasons:

  1. Data Loss: The kvFSM.handleRawRequest implementation (in fsm.go) only processes the first DEL_PREFIX mutation it finds and ignores all other mutations in the same request. Thus, any PUT or DEL operations in the same group would be lost.
  2. Transaction Bypass: DEL_PREFIX is not supported within transactions by the FSM. This broadcast logic currently intercepts transactions before they are validated, potentially bypassing intended transactional semantics.

You should ensure that DEL_PREFIX is only allowed in non-transactional groups and that it is not mixed with other operation types.

	if hasDelPrefixElem(reqs.Elems) {
		if reqs.IsTxn {
			return nil, errors.Wrapf(ErrInvalidRequest, "DEL_PREFIX not supported in transactions")
		}
		for _, e := range reqs.Elems {
			if e.Op != DelPrefix {
				return nil, errors.Wrapf(ErrInvalidRequest, "mixed DEL_PREFIX and other operations not supported")
			}
		}
		return c.dispatchDelPrefixBroadcast(reqs.Elems)
	}

Comment on lines +118 to +122
func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(elems []*Elem[OP]) (*CoordinateResponse, error) {
muts := make([]*pb.Mutation, 0, len(elems))
for _, elem := range elems {
muts = append(muts, elemToMutation(elem))
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Note that if elems contains multiple DEL_PREFIX operations, only the first one will be executed by the FSM due to the logic in extractDelPrefix (in fsm.go). While this PR enables broadcasting, the underlying FSM still expects a DEL_PREFIX request to be the sole mutation or at least only processes the first one. If multiple prefixes need to be deleted, they should be sent as separate requests or the FSM should be updated to iterate over all mutations.

Comment on lines +126 to +132
for _, g := range c.groups {
req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: c.clock.Next(),
Mutations: muts,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The HLC timestamp should be generated once outside the loop to ensure consistency across all shards. Using c.clock.Next() inside the loop assigns a different timestamp to each shard for what is logically a single broadcast operation. In an MVCC system, operations within a single logical unit must share the same start timestamp to ensure atomicity and a consistent snapshot view.

Suggested change
for _, g := range c.groups {
req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: c.clock.Next(),
Mutations: muts,
}
ts := c.clock.Next()
for _, g := range c.groups {
req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: ts,
Mutations: muts,
}
References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.

bootjp added 2 commits April 11, 2026 21:47
Unit tests verify broadcast to all shard groups, partial failure
handling, and single-shard scenarios. Integration tests verify
end-to-end deletion across shards and txn-internal key preservation.
- Reject DEL_PREFIX in transactional groups (unsupported by FSM)
- Reject mixing DEL_PREFIX with other operations (PUT/DEL)
- Generate HLC timestamp once per element, shared across all shards
- Dispatch each DEL_PREFIX element as a separate request (FSM only
  processes the first DEL_PREFIX mutation per request)
- Add tests for validation and timestamp consistency
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 11, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for DEL_PREFIX operations within the ShardedCoordinator. Because these operations may span multiple shards, they are broadcast to all shard groups. The implementation includes validation to prevent DEL_PREFIX from being used in transactions or mixed with other operation types, along with comprehensive unit and integration tests. Feedback focuses on optimizing the broadcast logic by batching requests per shard group to improve performance, enforcing a maximum batch size to prevent potential OOM issues, and adding safety checks for response objects.

Comment on lines +139 to +168
func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(elems []*Elem[OP]) (*CoordinateResponse, error) {
var maxIndex uint64
var firstErr error
for _, elem := range elems {
mut := elemToMutation(elem)
ts := c.clock.Next()
for _, g := range c.groups {
req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: ts,
Mutations: []*pb.Mutation{mut},
}
r, err := g.Txn.Commit([]*pb.Request{req})
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
if r.CommitIndex > maxIndex {
maxIndex = r.CommitIndex
}
}
}
if firstErr != nil {
return nil, errors.WithStack(firstErr)
}
return &CoordinateResponse{CommitIndex: maxIndex}, nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation performs N_elements * N_groups synchronous Commit calls, which can be extremely slow in production environments with many shards. Since g.Txn.Commit accepts a slice of requests, you can optimize this by pre-building the list of pb.Request objects and sending them in a single call per shard group. Additionally, to prevent unbounded memory growth and potential OOM issues as per repository guidelines, you should ensure the input collection is bounded and reject requests that exceed this limit. A nil check for the response r should also be added to prevent potential panics.

func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(elems []*Elem[OP]) (*CoordinateResponse, error) {
	if len(elems) > MaxBatchSize {
		return nil, errors.Errorf("batch size %d exceeds maximum allowed %d", len(elems), MaxBatchSize)
	}
	requests := make([]*pb.Request, 0, len(elems))
	for _, elem := range elems {
		requests = append(requests, &pb.Request{
			IsTxn:     false,
			Phase:     pb.Phase_NONE,
			Ts:        c.clock.Next(),
			Mutations: []*pb.Mutation{elemToMutation(elem)},
		})
	}

	var maxIndex uint64
	var firstErr error
	for _, g := range c.groups {
		r, err := g.Txn.Commit(requests)
		if err != nil {
			if firstErr == nil {
				firstErr = err
			}
			continue
		}
		if r != nil && r.CommitIndex > maxIndex {
			maxIndex = r.CommitIndex
		}
	}

	if firstErr != nil {
		return nil, errors.WithStack(firstErr)
	}
	return &CoordinateResponse{CommitIndex: maxIndex}, nil
}
References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests. Reject new requests when the bound is reached.

- Extract DEL_PREFIX validation into dispatchDelPrefixBroadcast to
  reduce Dispatch cyclomatic complexity below threshold (cyclop)
- Batch all DEL_PREFIX requests into a single Commit call per shard
  group instead of N_elements * N_groups synchronous calls
- Add nil check for Commit response
- Fix gofmt alignment in test file
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 11, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for DEL_PREFIX operations within the ShardedCoordinator, allowing for prefix-based deletions that are broadcast across all shard groups. The changes include new validation logic to ensure DEL_PREFIX is not mixed with other operation types or executed within transactions, along with comprehensive unit and integration tests. Feedback was provided regarding the sequential nature of the broadcast mechanism, suggesting that parallelizing the commits to shard groups would significantly reduce latency in environments with many shards.

Comment on lines +153 to +164
for _, g := range c.groups {
r, err := g.Txn.Commit(requests)
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
if r != nil && r.CommitIndex > maxIndex {
maxIndex = r.CommitIndex
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The broadcast of DEL_PREFIX operations to all shard groups is performed sequentially. In a sharded environment with a significant number of shard groups, this will lead to high latency (O(N_shards * Raft_latency)). Since ShardedCoordinator is used in production, this sequential execution could easily exceed client timeouts for operations like FLUSHALL (nil prefix). Consider parallelizing these commits using goroutines and a sync.WaitGroup or similar mechanism to improve performance.

Extract broadcastToAllGroups to dispatch Commit calls concurrently
using goroutines, reducing latency from O(N_shards * raft_latency)
to O(raft_latency). Also resolves cyclop complexity warning by
splitting validation and broadcast into separate functions.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 11, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for DEL_PREFIX operations within the sharded coordinator. Because prefix deletions can span multiple shards, the implementation adds logic to broadcast these requests to all shard groups in parallel and aggregate the resulting commit indices. The changes include validation to ensure DEL_PREFIX is not mixed with other operations or used within transactions, along with comprehensive unit and integration tests. Feedback was provided regarding the timestamping logic in dispatchDelPrefixBroadcast, where assigning unique timestamps to each element in a single batch may violate MVCC consistency requirements.

Comment on lines +135 to +154
func (c *ShardedCoordinator) dispatchDelPrefixBroadcast(isTxn bool, elems []*Elem[OP]) (*CoordinateResponse, error) {
if isTxn {
return nil, errors.Wrap(ErrInvalidRequest, "DEL_PREFIX not supported in transactions")
}
if err := validateDelPrefixOnly(elems); err != nil {
return nil, err
}

requests := make([]*pb.Request, 0, len(elems))
for _, elem := range elems {
requests = append(requests, &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: c.clock.Next(),
Mutations: []*pb.Mutation{elemToMutation(elem)},
})
}

return c.broadcastToAllGroups(requests)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In dispatchDelPrefixBroadcast, each DEL_PREFIX element in a batch is currently assigned a unique timestamp via c.clock.Next(). This violates the requirement that all operations within a single transaction in an MVCC system must share the same start timestamp to ensure atomicity and a consistent snapshot view. Please update the implementation to use a single timestamp for all operations in the batch to maintain atomicity.

References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.

All elements in a DEL_PREFIX batch now share one timestamp to ensure
atomicity and consistent MVCC snapshot view across shards.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 11, 2026

/gemini reiview

@gemini-code-assist
Copy link
Copy Markdown
Contributor

It looks like you made a small typo in your command (/gemini reiview instead of /gemini review). If you would like me to perform a code review of this pull request, please use the command /gemini review.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 11, 2026

/gemini reiview

@gemini-code-assist
Copy link
Copy Markdown
Contributor

It appears you are still using the command /gemini reiview which contains a typo. Please use the command /gemini review if you would like me to perform a code review of this pull request.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 11, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for the DEL_PREFIX operation in the ShardedCoordinator. Since DEL_PREFIX can span multiple shards, the implementation broadcasts the operation to all shard groups in parallel and returns the maximum commit index. The changes include validation logic to prevent mixing DEL_PREFIX with other operations or using it within transactions. Additionally, a comprehensive suite of unit and integration tests has been added to verify the broadcast behavior, error handling, and preservation of internal transaction metadata. I have no feedback to provide.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 12, 2026

/gemini review

@bootjp bootjp enabled auto-merge April 12, 2026 04:28
@bootjp bootjp merged commit ed6393f into main Apr 12, 2026
7 checks passed
@bootjp bootjp deleted the feat/fix-jepsen branch April 12, 2026 04:30
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces DEL_PREFIX support in the ShardedCoordinator, broadcasting the operation to all shard groups to handle keys spanning multiple shards. It includes validation to prohibit mixing DEL_PREFIX with other operations or using it in transactions. Feedback was provided to address a potential nil pointer dereference in the broadcast loop when accessing shard group transactions.

Comment on lines +166 to +168
for _, g := range c.groups {
wg.Add(1)
go func(g *ShardGroup) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The loop iterates over c.groups and starts a goroutine for each group. However, there is no check to ensure that the ShardGroup pointer g or its Txn field are non-nil. If any entry in the map is nil or has a nil transaction manager, the goroutine will panic when calling g.Txn.Commit. While the coordinator is typically initialized with valid groups, adding a defensive check here improves robustness and aligns with the nil checks performed in other methods like txnGroupForID and commitSecondaryTxns.

	for gid, g := range c.groups {
		if g == nil || g.Txn == nil {
			errMu.Lock()
			if firstErr == nil {
				firstErr = errors.Wrapf(ErrInvalidRequest, "group %d has no transaction manager", gid)
			}
			errMu.Unlock()
			continue
		}
		wg.Add(1)
		go func(g *ShardGroup) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant