Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions adapter/distribution_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *DistributionServer) SplitRange(ctx context.Context, req *pb.SplitRangeR
s.mu.Lock()
defer s.mu.Unlock()

if err := s.verifyCatalogLeader(); err != nil {
if err := s.verifyCatalogLeader(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -191,15 +191,15 @@ func (s *DistributionServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
return s.readTracker.Pin(ts)
}

func (s *DistributionServer) verifyCatalogLeader() error {
func (s *DistributionServer) verifyCatalogLeader(ctx context.Context) error {
if s.coordinator == nil {
return grpcStatusError(codes.FailedPrecondition, errDistributionCoordinatorRequired.Error())
}
key := distribution.CatalogVersionKey()
if !s.coordinator.IsLeaderForKey(key) {
return grpcStatusError(codes.FailedPrecondition, errDistributionNotLeader.Error())
}
if err := s.coordinator.VerifyLeaderForKey(key); err != nil {
if err := s.coordinator.VerifyLeaderForKey(ctx, key); err != nil {
return grpcStatusErrorf(codes.FailedPrecondition, "verify catalog leader: %v", err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions adapter/distribution_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (s *distributionCoordinatorStub) IsLeader() bool {
return s.leader
}

func (s *distributionCoordinatorStub) VerifyLeader() error {
func (s *distributionCoordinatorStub) VerifyLeader(context.Context) error {
if !s.leader {
return kv.ErrLeaderNotFound
}
Expand All @@ -728,7 +728,7 @@ func (s *distributionCoordinatorStub) IsLeaderForKey(_ []byte) bool {
return s.leader
}

func (s *distributionCoordinatorStub) VerifyLeaderForKey(_ []byte) error {
func (s *distributionCoordinatorStub) VerifyLeaderForKey(_ context.Context, _ []byte) error {
if !s.leader {
return kv.ErrLeaderNotFound
}
Expand Down
6 changes: 3 additions & 3 deletions adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,19 +377,19 @@ func (d *DynamoDBServer) serveDynamoLeaderHealthz(w http.ResponseWriter, r *http
return
}

if isVerifiedDynamoLeader(d.coordinator) {
if isVerifiedDynamoLeader(r.Context(), d.coordinator) {
writeDynamoHealthBody(w, r, http.StatusOK, "ok\n")
return
}

writeDynamoHealthBody(w, r, http.StatusServiceUnavailable, "not leader\n")
}

func isVerifiedDynamoLeader(coordinator kv.Coordinator) bool {
func isVerifiedDynamoLeader(ctx context.Context, coordinator kv.Coordinator) bool {
if coordinator == nil || !coordinator.IsLeader() {
return false
}
return coordinator.VerifyLeader() == nil
return coordinator.VerifyLeader(ctx) == nil
}

func writeDynamoHealthMethod(w http.ResponseWriter, r *http.Request) bool {
Expand Down
4 changes: 2 additions & 2 deletions adapter/dynamodb_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (d *DynamoDBServer) AdminCreateTable(ctx context.Context, principal AdminPr
if !principal.Role.canWrite() {
return nil, ErrAdminForbidden
}
if !isVerifiedDynamoLeader(d.coordinator) {
if !isVerifiedDynamoLeader(ctx, d.coordinator) {
return nil, ErrAdminNotLeader
}
legacy, err := buildLegacyCreateTableInput(in)
Expand Down Expand Up @@ -249,7 +249,7 @@ func (d *DynamoDBServer) AdminDeleteTable(ctx context.Context, principal AdminPr
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !isVerifiedDynamoLeader(d.coordinator) {
if !isVerifiedDynamoLeader(ctx, d.coordinator) {
return ErrAdminNotLeader
}
if strings.TrimSpace(name) == "" {
Expand Down
8 changes: 4 additions & 4 deletions adapter/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1831,8 +1831,8 @@ func (w *testCoordinatorWrapper) IsLeader() bool {
return w.inner.IsLeader()
}

func (w *testCoordinatorWrapper) VerifyLeader() error {
return w.inner.VerifyLeader()
func (w *testCoordinatorWrapper) VerifyLeader(ctx context.Context) error {
return w.inner.VerifyLeader(ctx)
}

func (w *testCoordinatorWrapper) RaftLeader() string {
Expand All @@ -1843,8 +1843,8 @@ func (w *testCoordinatorWrapper) IsLeaderForKey(key []byte) bool {
return w.inner.IsLeaderForKey(key)
}

func (w *testCoordinatorWrapper) VerifyLeaderForKey(key []byte) error {
return w.inner.VerifyLeaderForKey(key)
func (w *testCoordinatorWrapper) VerifyLeaderForKey(ctx context.Context, key []byte) error {
return w.inner.VerifyLeaderForKey(ctx, key)
}

func (w *testCoordinatorWrapper) RaftLeaderForKey(key []byte) string {
Expand Down
2 changes: 1 addition & 1 deletion adapter/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (i *Internal) Forward(ctx context.Context, req *pb.ForwardRequest) (*pb.For
}, errors.WithStack(err)
}

r, err := i.transactionManager.Commit(req.Requests)
r, err := i.transactionManager.Commit(ctx, req.Requests)
if err != nil {
return &pb.ForwardResponse{
Success: false,
Expand Down
13 changes: 10 additions & 3 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,14 @@ func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) {
pattern := cmd.Args[1]

if r.coordinator.IsLeader() {
if err := r.coordinator.VerifyLeader(); err != nil {
// Per-call ctx with redisDispatchTimeout instead of the
// long-lived handlerContext: a stalled VerifyLeader on KEYS
// must not pin the command handler indefinitely. The same
// bound the rest of the dispatch path (sadd, set, …) uses;
// see Codex P1 review on PR #749.
ctx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout)
defer cancel()
if err := r.coordinator.VerifyLeader(ctx); err != nil {
conn.WriteError(err.Error())
return
}
Expand Down Expand Up @@ -3254,7 +3261,7 @@ func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string,
return nil, wrongTypeError()
}

if err := r.coordinator.VerifyLeaderForKey(key); err != nil {
if err := r.coordinator.VerifyLeaderForKey(r.handlerContext(), key); err != nil {
return nil, errors.WithStack(err)
}

Expand Down Expand Up @@ -3510,7 +3517,7 @@ func (r *RedisServer) readValueAt(key []byte, readTS uint64) ([]byte, error) {
}

if r.coordinator.IsLeaderForKey(key) {
if err := r.coordinator.VerifyLeaderForKey(key); err != nil {
if err := r.coordinator.VerifyLeaderForKey(r.handlerContext(), key); err != nil {
return nil, errors.WithStack(err)
}
v, err := r.store.GetAt(context.Background(), key, readTS)
Expand Down
8 changes: 6 additions & 2 deletions adapter/redis_compat_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ func (r *RedisServer) dbsize(conn redcon.Conn, _ redcon.Command) {
conn.WriteInt(size)
return
}
if err := r.coordinator.VerifyLeader(); err != nil {
if err := r.coordinator.VerifyLeader(r.handlerContext()); err != nil {
conn.WriteError(err.Error())
return
}
Expand Down Expand Up @@ -1144,7 +1144,11 @@ func (r *RedisServer) flushDatabase(conn redcon.Conn, all bool) {
defer cancel()

if err := r.retryRedisWrite(ctx, func() error {
if err := r.coordinator.VerifyLeader(); err != nil {
// Use the per-call ctx with redisDispatchTimeout, NOT
// handlerContext (the long-lived server baseCtx). FLUSHDB's
// retry budget already lives in ctx; routing it to
// VerifyLeader keeps the whole command bounded.
if err := r.coordinator.VerifyLeader(ctx); err != nil {
return fmt.Errorf("verify leader: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion adapter/redis_compat_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func (r *RedisServer) doGetAt(key []byte, readTS uint64, verify bool) ([]byte, e
}
if r.coordinator.IsLeaderForKey(routingKey) {
if verify {
if err := r.coordinator.VerifyLeaderForKey(routingKey); err != nil {
if err := r.coordinator.VerifyLeaderForKey(r.handlerContext(), routingKey); err != nil {
return nil, errors.WithStack(err)
}
}
Expand Down
12 changes: 6 additions & 6 deletions adapter/redis_hello_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ type helloTestCoordinator struct {
func (c *helloTestCoordinator) Dispatch(context.Context, *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) {
return &kv.CoordinateResponse{}, nil
}
func (c *helloTestCoordinator) IsLeader() bool { return c.isLeader }
func (c *helloTestCoordinator) VerifyLeader() error { return nil }
func (c *helloTestCoordinator) RaftLeader() string { return "" }
func (c *helloTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
func (c *helloTestCoordinator) VerifyLeaderForKey([]byte) error { return nil }
func (c *helloTestCoordinator) RaftLeaderForKey([]byte) string { return "" }
func (c *helloTestCoordinator) IsLeader() bool { return c.isLeader }
func (c *helloTestCoordinator) VerifyLeader(context.Context) error { return nil }
func (c *helloTestCoordinator) RaftLeader() string { return "" }
func (c *helloTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
func (c *helloTestCoordinator) VerifyLeaderForKey(context.Context, []byte) error { return nil }
func (c *helloTestCoordinator) RaftLeaderForKey([]byte) string { return "" }
func (c *helloTestCoordinator) Clock() *kv.HLC {
if c.clock == nil {
c.clock = kv.NewHLC()
Expand Down
12 changes: 6 additions & 6 deletions adapter/redis_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ type infoTestCoordinator struct {
func (c *infoTestCoordinator) Dispatch(context.Context, *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) {
return &kv.CoordinateResponse{}, nil
}
func (c *infoTestCoordinator) IsLeader() bool { return c.isLeader }
func (c *infoTestCoordinator) VerifyLeader() error { return nil }
func (c *infoTestCoordinator) RaftLeader() string { return c.raftLeader }
func (c *infoTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
func (c *infoTestCoordinator) VerifyLeaderForKey([]byte) error { return nil }
func (c *infoTestCoordinator) RaftLeaderForKey([]byte) string { return c.raftLeader }
func (c *infoTestCoordinator) IsLeader() bool { return c.isLeader }
func (c *infoTestCoordinator) VerifyLeader(context.Context) error { return nil }
func (c *infoTestCoordinator) RaftLeader() string { return c.raftLeader }
func (c *infoTestCoordinator) IsLeaderForKey([]byte) bool { return c.isLeader }
func (c *infoTestCoordinator) VerifyLeaderForKey(context.Context, []byte) error { return nil }
func (c *infoTestCoordinator) RaftLeaderForKey([]byte) string { return c.raftLeader }
func (c *infoTestCoordinator) Clock() *kv.HLC {
if c.clock == nil {
c.clock = kv.NewHLC()
Expand Down
4 changes: 2 additions & 2 deletions adapter/redis_keys_pattern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *stubAdapterCoordinator) IsLeader() bool {
return true
}

func (s *stubAdapterCoordinator) VerifyLeader() error {
func (s *stubAdapterCoordinator) VerifyLeader(context.Context) error {
s.verifyCalls.Add(1)
return s.verifyLeaderErr
}
Expand All @@ -45,7 +45,7 @@ func (s *stubAdapterCoordinator) IsLeaderForKey([]byte) bool {
return true
}

func (s *stubAdapterCoordinator) VerifyLeaderForKey([]byte) error {
func (s *stubAdapterCoordinator) VerifyLeaderForKey(context.Context, []byte) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions adapter/redis_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *retryOnceCoordinator) IsLeader() bool {
return true
}

func (c *retryOnceCoordinator) VerifyLeader() error {
func (c *retryOnceCoordinator) VerifyLeader(context.Context) error {
return nil
}

Expand All @@ -68,7 +68,7 @@ func (c *retryOnceCoordinator) IsLeaderForKey([]byte) bool {
return true
}

func (c *retryOnceCoordinator) VerifyLeaderForKey([]byte) error {
func (c *retryOnceCoordinator) VerifyLeaderForKey(context.Context, []byte) error {
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions adapter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2297,12 +2297,12 @@ func (s *S3Server) maybeProxyToLeader(w http.ResponseWriter, r *http.Request) bo
}
var leader string
if len(key) > 0 {
if s.coordinator.IsLeaderForKey(key) && s.coordinator.VerifyLeaderForKey(key) == nil {
if s.coordinator.IsLeaderForKey(key) && s.coordinator.VerifyLeaderForKey(r.Context(), key) == nil {
return false
}
leader = s.coordinator.RaftLeaderForKey(key)
} else {
if s.coordinator.IsLeader() && s.coordinator.VerifyLeader() == nil {
if s.coordinator.IsLeader() && s.coordinator.VerifyLeader(r.Context()) == nil {
return false
}
leader = s.coordinator.RaftLeader()
Expand Down Expand Up @@ -2420,7 +2420,7 @@ func (s *S3Server) serveS3LeaderHealthz(w http.ResponseWriter, r *http.Request)
return true
}
status, body := http.StatusOK, "ok"
if !s.isVerifiedS3Leader() {
if !s.isVerifiedS3Leader(r.Context()) {
status, body = http.StatusServiceUnavailable, "not leader"
}
w.WriteHeader(status)
Expand All @@ -2430,11 +2430,11 @@ func (s *S3Server) serveS3LeaderHealthz(w http.ResponseWriter, r *http.Request)
return true
}

func (s *S3Server) isVerifiedS3Leader() bool {
func (s *S3Server) isVerifiedS3Leader(ctx context.Context) bool {
if s.coordinator == nil || !s.coordinator.IsLeader() {
return false
}
return s.coordinator.VerifyLeader() == nil
return s.coordinator.VerifyLeader(ctx) == nil
}

// prepareStreamingPutBody wraps r.Body for aws-chunked framed uploads. When
Expand Down
6 changes: 3 additions & 3 deletions adapter/s3_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (s *S3Server) AdminCreateBucket(ctx context.Context, principal AdminPrincip
if !principal.Role.canWrite() {
return nil, ErrAdminForbidden
}
if !s.isVerifiedS3Leader() {
if !s.isVerifiedS3Leader(ctx) {
return nil, ErrAdminNotLeader
}
if err := validateS3BucketName(name); err != nil {
Expand Down Expand Up @@ -314,7 +314,7 @@ func (s *S3Server) AdminPutBucketAcl(ctx context.Context, principal AdminPrincip
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !s.isVerifiedS3Leader() {
if !s.isVerifiedS3Leader(ctx) {
return ErrAdminNotLeader
}
acl = adminCanonicalACL(acl)
Expand Down Expand Up @@ -406,7 +406,7 @@ func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincip
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !s.isVerifiedS3Leader() {
if !s.isVerifiedS3Leader(ctx) {
return ErrAdminNotLeader
}

Expand Down
4 changes: 2 additions & 2 deletions adapter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func (c *followerS3Coordinator) IsLeader() bool {
return false
}

func (c *followerS3Coordinator) VerifyLeader() error {
func (c *followerS3Coordinator) VerifyLeader(context.Context) error {
return kv.ErrLeaderNotFound
}

Expand Down Expand Up @@ -729,7 +729,7 @@ func (c *routeAwareS3Coordinator) IsLeaderForKey(key []byte) bool {
return c.localForKey(key)
}

func (c *routeAwareS3Coordinator) VerifyLeaderForKey(key []byte) error {
func (c *routeAwareS3Coordinator) VerifyLeaderForKey(_ context.Context, key []byte) error {
if c.IsLeaderForKey(key) {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,18 +498,18 @@ func (s *SQSServer) serveSQSLeaderHealthz(w http.ResponseWriter, r *http.Request
if !writeSQSHealthMethod(w, r) {
return
}
if isVerifiedSQSLeader(s.coordinator) {
if isVerifiedSQSLeader(r.Context(), s.coordinator) {
writeSQSHealthBody(w, r, http.StatusOK, "ok\n")
return
}
writeSQSHealthBody(w, r, http.StatusServiceUnavailable, "not leader\n")
}

func isVerifiedSQSLeader(coordinator kv.Coordinator) bool {
func isVerifiedSQSLeader(ctx context.Context, coordinator kv.Coordinator) bool {
if coordinator == nil || !coordinator.IsLeader() {
return false
}
return coordinator.VerifyLeader() == nil
return coordinator.VerifyLeader(ctx) == nil
}

func writeSQSHealthMethod(w http.ResponseWriter, r *http.Request) bool {
Expand Down
2 changes: 1 addition & 1 deletion adapter/sqs_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincip
if !principal.Role.canWrite() {
return ErrAdminForbidden
}
if !isVerifiedSQSLeader(s.coordinator) {
if !isVerifiedSQSLeader(ctx, s.coordinator) {
return ErrAdminNotLeader
}
if strings.TrimSpace(name) == "" {
Expand Down
Loading
Loading