From 73d17b7af0364a7cb3c08a751bcae567f72c62a0 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 23:01:09 +0900 Subject: [PATCH 1/7] admin: add AdminForward RPC + leader-side dispatcher (P1, partial) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lays down the proto contract and the leader-side handler for the follower-leader forwarding path described in design Section 3.3. The follower-side client + bridge wiring lands in a separate PR to keep this one reviewable. Proto: - proto/admin_forward.proto with AdminForward.Forward(...) - AdminPrincipal carries access_key + role; the leader re-evaluates it against its own role index rather than trusting the follower (Section 3.3.1 invariant) - AdminOperation enum keeps the proto operation-agnostic so adding a future endpoint does not require a wire-format change - payload bytes carry the operation-specific JSON body verbatim - forwarded_from echoes the follower's node id into the leader's audit log line (acceptance criterion 6) Leader-side handler (internal/admin/forward_server.go): - ForwardServer implements pb.AdminForwardServer - RoleStore / MapRoleStore keep the access-key role lookup abstract so tests can swap in an in-memory map - validatePrincipal demotes a follower-claimed full role to forbidden when the leader's view is read-only (criterion 4) - forwardErrorResponse mirrors the HTTP handler's writeTablesError status mapping so a forwarded call is indistinguishable from a leader-direct call to the SPA - audit log entries on success carry forwarded_from; the leader is the source of truth (criterion 6) What is NOT in this PR (follow-ups): - Follower-side AdminForward client + bridge wiring (criterion 2) - Election-period 503 + Retry-After handling (criterion 3) - Rolling-upgrade compatibility flag admin.leader_forward_v2 (criterion 5) — needs a Raft-level cluster version bump - gRPC server registration in main.go Tests cover criteria 1, 4, 6 (leader direct, principal demotion, forwarded_from in audit log) and the structured error mapping (create-table 409, delete missing 404, generic error 500 with sanitised body, bad JSON 400, unknown operation 400). --- internal/admin/forward_server.go | 225 ++++++++++++++++ internal/admin/forward_server_test.go | 182 +++++++++++++ proto/Makefile | 3 + proto/admin_forward.pb.go | 358 ++++++++++++++++++++++++++ proto/admin_forward.proto | 77 ++++++ proto/admin_forward_grpc.pb.go | 167 ++++++++++++ 6 files changed, 1012 insertions(+) create mode 100644 internal/admin/forward_server.go create mode 100644 internal/admin/forward_server_test.go create mode 100644 proto/admin_forward.pb.go create mode 100644 proto/admin_forward.proto create mode 100644 proto/admin_forward_grpc.pb.go diff --git a/internal/admin/forward_server.go b/internal/admin/forward_server.go new file mode 100644 index 000000000..24e0b2cdb --- /dev/null +++ b/internal/admin/forward_server.go @@ -0,0 +1,225 @@ +package admin + +import ( + "context" + "errors" + "log/slog" + "net/http" + + pb "github.com/bootjp/elastickv/proto" + "github.com/goccy/go-json" +) + +// ForwardServer is the leader-side gRPC handler for the AdminForward +// RPC (design Section 3.3). The follower's admin HTTP layer calls it +// when the local node is not the Raft leader; this server then +// re-validates the principal, dispatches the operation against the +// local TablesSource, and serialises the result back to the +// follower in the same JSON shape the SPA would have received from a +// leader-direct call. +// +// The server is deliberately kept independent of the dynamo HTTP +// handler: it runs in the gRPC server's goroutine pool, not in the +// HTTP server's, and shares only the TablesSource interface (which +// the bridge in main_admin.go already implements for the local +// adapter). +type ForwardServer struct { + pb.UnimplementedAdminForwardServer + + source TablesSource + roles RoleStore + logger *slog.Logger +} + +// RoleStore is the access-key → role lookup the leader uses to +// re-validate the inbound principal. Implementations should mirror +// the admin server's `Roles` map; production passes a typed wrapper +// around that map so tests can swap in an in-memory stub. +type RoleStore interface { + // LookupRole returns the role for an access key as understood + // by the leader's view of cluster configuration. The bool is + // false when the access key is not in the admin role index — a + // follower that forwarded the principal should not be able to + // "make up" an admin identity. + LookupRole(accessKey string) (Role, bool) +} + +// MapRoleStore is the trivial in-memory implementation, sufficient +// for tests and for the production wiring (which already keeps the +// role map in memory). +type MapRoleStore map[string]Role + +// LookupRole implements RoleStore. +func (m MapRoleStore) LookupRole(accessKey string) (Role, bool) { + r, ok := m[accessKey] + return r, ok +} + +// NewForwardServer wires a TablesSource and a RoleStore behind the +// gRPC AdminForward service. logger may be nil; defaults to +// slog.Default(). +func NewForwardServer(source TablesSource, roles RoleStore, logger *slog.Logger) *ForwardServer { + if logger == nil { + logger = slog.Default() + } + return &ForwardServer{source: source, roles: roles, logger: logger} +} + +// Forward is the gRPC entrypoint. It performs the principal +// re-evaluation the design mandates, then dispatches by operation. +// Errors that the SPA can act on are returned as a structured +// AdminForwardResponse with status_code + JSON payload; only fatal +// gRPC-layer errors (decode failure, unknown operation) come back as +// status.Errorf to the follower. +func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { + if req == nil || req.GetPrincipal() == nil { + return rejectForward(http.StatusBadRequest, "invalid_request", "missing principal") + } + principal, ok := s.validatePrincipal(req.GetPrincipal()) + if !ok { + // Don't leak why the principal failed — the follower may + // have a different view of the cluster's role config and + // we want operators to investigate from the audit log on + // the leader, not the follower's response body. + s.logger.LogAttrs(ctx, slog.LevelWarn, "admin_forward_principal_rejected", + slog.String("forwarded_from", req.GetForwardedFrom()), + slog.String("claimed_access_key", req.GetPrincipal().GetAccessKey()), + slog.String("claimed_role", req.GetPrincipal().GetRole()), + ) + return rejectForward(http.StatusForbidden, "forbidden", + "this endpoint requires a full-access role") + } + switch req.GetOperation() { + case pb.AdminOperation_ADMIN_OP_CREATE_TABLE: + return s.handleCreate(ctx, principal, req) + case pb.AdminOperation_ADMIN_OP_DELETE_TABLE: + return s.handleDelete(ctx, principal, req) + case pb.AdminOperation_ADMIN_OP_UNSPECIFIED: + return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation") + default: + return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation") + } +} + +func (s *ForwardServer) validatePrincipal(p *pb.AdminPrincipal) (AuthPrincipal, bool) { + accessKey := p.GetAccessKey() + if accessKey == "" { + return AuthPrincipal{}, false + } + role, ok := s.roles.LookupRole(accessKey) + if !ok { + return AuthPrincipal{}, false + } + // Critical re-evaluation: if the leader sees this access key as + // read-only, the operation is forbidden even if the follower + // thought it was full. The reverse — leader sees full, follower + // sees read-only — would have been short-circuited at the + // follower already, so we do not need to check it here. + if !role.AllowsWrite() { + return AuthPrincipal{}, false + } + return AuthPrincipal{AccessKey: accessKey, Role: role}, true +} + +func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipal, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { + var body CreateTableRequest + if err := json.Unmarshal(req.GetPayload(), &body); err != nil { + return rejectForward(http.StatusBadRequest, "invalid_body", "request body is not valid JSON") + } + summary, err := s.source.AdminCreateTable(ctx, principal, body) + if err != nil { + return forwardErrorResponse(err), nil + } + s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", + slog.String("actor", principal.AccessKey), + slog.String("role", string(principal.Role)), + slog.String("forwarded_from", req.GetForwardedFrom()), + slog.String("operation", "create_table"), + slog.String("table", body.TableName), + ) + return jsonForwardResponse(http.StatusCreated, summary) +} + +func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipal, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { + // Delete carries the table name in the payload as JSON so the + // proto stays operation-agnostic — there is no operation-specific + // field in AdminForwardRequest, by design (adding one per op + // would couple every new admin endpoint to the proto schema). + var body struct { + Name string `json:"name"` + } + if err := json.Unmarshal(req.GetPayload(), &body); err != nil || body.Name == "" { + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload missing name") + } + if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil { + return forwardErrorResponse(err), nil + } + s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", + slog.String("actor", principal.AccessKey), + slog.String("role", string(principal.Role)), + slog.String("forwarded_from", req.GetForwardedFrom()), + slog.String("operation", "delete_table"), + slog.String("table", body.Name), + ) + return &pb.AdminForwardResponse{StatusCode: http.StatusNoContent}, nil +} + +// forwardErrorResponse re-encodes a TablesSource error in the +// structured shape the follower's handler can re-emit verbatim. This +// is the leader-side counterpart of writeTablesError: every status / +// JSON code the HTTP handler chooses is mirrored here so a forwarded +// call is indistinguishable to the SPA from a leader-direct call. +func forwardErrorResponse(err error) *pb.AdminForwardResponse { + switch { + case errors.Is(err, ErrTablesForbidden): + return mustForwardJSON(http.StatusForbidden, errorBody{Error: "forbidden", Message: "this endpoint requires a full-access role"}) + case errors.Is(err, ErrTablesNotLeader): + // Should never happen on the leader path — the leader + // just verified itself — but if a leadership transfer + // races with the dispatch, surface it consistently. + return mustForwardJSON(http.StatusServiceUnavailable, errorBody{Error: "leader_unavailable", Message: "leader stepped down mid-request"}) + case errors.Is(err, ErrTablesNotFound): + return mustForwardJSON(http.StatusNotFound, errorBody{Error: "not_found", Message: "table does not exist"}) + case errors.Is(err, ErrTablesAlreadyExists): + return mustForwardJSON(http.StatusConflict, errorBody{Error: "already_exists", Message: "table already exists"}) + } + var verr *ValidationError + if errors.As(err, &verr) { + return mustForwardJSON(http.StatusBadRequest, errorBody{Error: "invalid_request", Message: verr.Error()}) + } + return mustForwardJSON(http.StatusInternalServerError, errorBody{Error: "internal", Message: "internal error; see leader logs"}) +} + +// errorBody is the shared JSON shape for both the HTTP handler's +// writeJSONError and the forward server's encoded responses. +type errorBody struct { + Error string `json:"error"` + Message string `json:"message,omitempty"` +} + +func rejectForward(status int, code, msg string) (*pb.AdminForwardResponse, error) { + return mustForwardJSON(status, errorBody{Error: code, Message: msg}), nil +} + +func mustForwardJSON(status int, body any) *pb.AdminForwardResponse { + payload, err := json.Marshal(body) + if err != nil { + // json.Marshal on a struct of strings cannot fail in + // practice; a 500 with a bare string body is the safest + // fallback if it ever does. + return &pb.AdminForwardResponse{ + StatusCode: http.StatusInternalServerError, + Payload: []byte(`{"error":"internal","message":"failed to encode response"}`), + ContentType: "application/json; charset=utf-8", + } + } + return &pb.AdminForwardResponse{ + StatusCode: int32(status), //nolint:gosec // status fits in int32; net/http codes are 100-599. + Payload: payload, + ContentType: "application/json; charset=utf-8", + } +} + +func jsonForwardResponse(status int, body any) (*pb.AdminForwardResponse, error) { + return mustForwardJSON(status, body), nil +} diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go new file mode 100644 index 000000000..e9e1cd9f6 --- /dev/null +++ b/internal/admin/forward_server_test.go @@ -0,0 +1,182 @@ +package admin + +import ( + "context" + "errors" + "net/http" + "testing" + + pb "github.com/bootjp/elastickv/proto" + "github.com/goccy/go-json" + "github.com/stretchr/testify/require" +) + +// newForwardServerForTest wires a stub source and an in-memory role +// store into a ForwardServer so each test can mutate the inputs +// without re-building the role-lookup boilerplate. +func newForwardServerForTest(src TablesSource, roles MapRoleStore) *ForwardServer { + return NewForwardServer(src, roles, nil) +} + +// fullPrincipalRoleStore returns a role store where AKIA_FULL is a +// full-access admin and AKIA_RO is read-only — the canonical +// fixture for "leader resolves the principal correctly" tests. +func fullPrincipalRoleStore() MapRoleStore { + return MapRoleStore{ + "AKIA_FULL": RoleFull, + "AKIA_RO": RoleReadOnly, + } +} + +func mustJSON(t *testing.T, v any) []byte { + t.Helper() + b, err := json.Marshal(v) + require.NoError(t, err) + return b +} + +func TestForwardServer_RejectsMissingPrincipal(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "missing principal") +} + +func TestForwardServer_RejectsUnknownAccessKey(t *testing.T) { + // A follower could forward a principal that the leader's + // role-store does not know about (rolling-update window). The + // leader must reject without ever touching the source. + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_GHOST", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusForbidden), resp.GetStatusCode()) + require.Empty(t, src.lastCreateInput.TableName, "source must not be touched on principal rejection") +} + +func TestForwardServer_DemotesReadOnlyEvenIfFollowerSaidFull(t *testing.T) { + // Design Section 3.3.2 acceptance criterion 4: follower ships a + // "full" role but the leader's config is read-only. Result: 403. + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_RO", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "t", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusForbidden), resp.GetStatusCode()) +} + +func TestForwardServer_CreateTable_HappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + body := CreateTableRequest{TableName: "users", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}} + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, body), + ForwardedFrom: "node-2", + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusCreated), resp.GetStatusCode()) + require.Equal(t, "users", src.lastCreateInput.TableName) + require.Equal(t, RoleFull, src.lastCreatePrincipal.Role) + + var summary DynamoTableSummary + require.NoError(t, json.Unmarshal(resp.GetPayload(), &summary)) + require.Equal(t, "users", summary.Name) +} + +func TestForwardServer_CreateTable_BadJSONReturns400(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: []byte("{not json"), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "invalid_body") +} + +func TestForwardServer_CreateTable_AlreadyExistsReturns409(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "users": {Name: "users", PartitionKey: "id"}, + }} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "users", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusConflict), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "already_exists") +} + +func TestForwardServer_DeleteTable_HappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "users": {Name: "users"}, + }} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + Payload: []byte(`{"name":"users"}`), + ForwardedFrom: "node-2", + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusNoContent), resp.GetStatusCode()) + require.Empty(t, resp.GetPayload()) + require.Equal(t, "users", src.lastDeleteName) +} + +func TestForwardServer_DeleteTable_MissingReturns404(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + Payload: []byte(`{"name":"absent"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusNotFound), resp.GetStatusCode()) +} + +func TestForwardServer_UnknownOperationRejected(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_UNSPECIFIED, + Payload: []byte(`{}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "unknown admin operation") +} + +func TestForwardServer_CreateTable_GenericErrorReturns500(t *testing.T) { + // A non-sentinel error from the source must surface as 500 with + // a sanitised message. The leader logs the raw error; nothing + // reaches the follower except the structured response. + src := &stubTablesSource{ + createErr: errors.New("storage backing sentinel ZQ-993"), + } + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "users", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusInternalServerError), resp.GetStatusCode()) + require.NotContains(t, string(resp.GetPayload()), "ZQ-993") +} diff --git a/proto/Makefile b/proto/Makefile index 8f811e88c..b2de0a67e 100644 --- a/proto/Makefile +++ b/proto/Makefile @@ -40,3 +40,6 @@ gen: check-tools protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ etcd_raft.proto + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + admin_forward.proto diff --git a/proto/admin_forward.pb.go b/proto/admin_forward.pb.go new file mode 100644 index 000000000..8578279af --- /dev/null +++ b/proto/admin_forward.pb.go @@ -0,0 +1,358 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v7.34.0 +// source: admin_forward.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// AdminOperation enumerates the operations the dashboard can issue. +// Adding a new operation requires both ends of the cluster to +// understand it; mismatches are caught by the version compatibility +// flag (`admin.leader_forward_v2`) before the call goes out. +type AdminOperation int32 + +const ( + AdminOperation_ADMIN_OP_UNSPECIFIED AdminOperation = 0 + AdminOperation_ADMIN_OP_CREATE_TABLE AdminOperation = 1 + AdminOperation_ADMIN_OP_DELETE_TABLE AdminOperation = 2 +) + +// Enum value maps for AdminOperation. +var ( + AdminOperation_name = map[int32]string{ + 0: "ADMIN_OP_UNSPECIFIED", + 1: "ADMIN_OP_CREATE_TABLE", + 2: "ADMIN_OP_DELETE_TABLE", + } + AdminOperation_value = map[string]int32{ + "ADMIN_OP_UNSPECIFIED": 0, + "ADMIN_OP_CREATE_TABLE": 1, + "ADMIN_OP_DELETE_TABLE": 2, + } +) + +func (x AdminOperation) Enum() *AdminOperation { + p := new(AdminOperation) + *p = x + return p +} + +func (x AdminOperation) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (AdminOperation) Descriptor() protoreflect.EnumDescriptor { + return file_admin_forward_proto_enumTypes[0].Descriptor() +} + +func (AdminOperation) Type() protoreflect.EnumType { + return &file_admin_forward_proto_enumTypes[0] +} + +func (x AdminOperation) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use AdminOperation.Descriptor instead. +func (AdminOperation) EnumDescriptor() ([]byte, []int) { + return file_admin_forward_proto_rawDescGZIP(), []int{0} +} + +// AdminPrincipal captures the access-key + role the follower +// resolved from the inbound JWT. The leader re-validates this +// against its own access-key map before acting on it; the follower +// could be operating with a stale role assignment. +type AdminPrincipal struct { + state protoimpl.MessageState `protogen:"open.v1"` + AccessKey string `protobuf:"bytes,1,opt,name=access_key,json=accessKey,proto3" json:"access_key,omitempty"` + Role string `protobuf:"bytes,2,opt,name=role,proto3" json:"role,omitempty"` // "read_only" | "full" + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AdminPrincipal) Reset() { + *x = AdminPrincipal{} + mi := &file_admin_forward_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AdminPrincipal) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AdminPrincipal) ProtoMessage() {} + +func (x *AdminPrincipal) ProtoReflect() protoreflect.Message { + mi := &file_admin_forward_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AdminPrincipal.ProtoReflect.Descriptor instead. +func (*AdminPrincipal) Descriptor() ([]byte, []int) { + return file_admin_forward_proto_rawDescGZIP(), []int{0} +} + +func (x *AdminPrincipal) GetAccessKey() string { + if x != nil { + return x.AccessKey + } + return "" +} + +func (x *AdminPrincipal) GetRole() string { + if x != nil { + return x.Role + } + return "" +} + +type AdminForwardRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Principal *AdminPrincipal `protobuf:"bytes,1,opt,name=principal,proto3" json:"principal,omitempty"` + Operation AdminOperation `protobuf:"varint,2,opt,name=operation,proto3,enum=AdminOperation" json:"operation,omitempty"` + // payload is the JSON-encoded operation-specific request body — + // the same bytes the SPA sent to the follower's HTTP endpoint. + // The leader decodes this with the operation's known schema. + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` + // forwarded_from is the follower's node id; the leader echoes it + // into the audit log line for the operation so operators can + // trace which follower carried the request (design Section 3.3.2 + // acceptance criteria 6). + ForwardedFrom string `protobuf:"bytes,4,opt,name=forwarded_from,json=forwardedFrom,proto3" json:"forwarded_from,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AdminForwardRequest) Reset() { + *x = AdminForwardRequest{} + mi := &file_admin_forward_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AdminForwardRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AdminForwardRequest) ProtoMessage() {} + +func (x *AdminForwardRequest) ProtoReflect() protoreflect.Message { + mi := &file_admin_forward_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AdminForwardRequest.ProtoReflect.Descriptor instead. +func (*AdminForwardRequest) Descriptor() ([]byte, []int) { + return file_admin_forward_proto_rawDescGZIP(), []int{1} +} + +func (x *AdminForwardRequest) GetPrincipal() *AdminPrincipal { + if x != nil { + return x.Principal + } + return nil +} + +func (x *AdminForwardRequest) GetOperation() AdminOperation { + if x != nil { + return x.Operation + } + return AdminOperation_ADMIN_OP_UNSPECIFIED +} + +func (x *AdminForwardRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *AdminForwardRequest) GetForwardedFrom() string { + if x != nil { + return x.ForwardedFrom + } + return "" +} + +type AdminForwardResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // status_code is the HTTP status the leader's handler would have + // written. The follower returns this verbatim to the SPA so the + // failure semantics (404 vs 409 vs 400) match a leader-direct + // call exactly. + StatusCode int32 `protobuf:"varint,1,opt,name=status_code,json=statusCode,proto3" json:"status_code,omitempty"` + // payload is the JSON-encoded response body. Empty when + // status_code is 204 No Content (DELETE success). + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + // content_type is the response content type the leader chose; in + // practice always "application/json; charset=utf-8" but echoed + // explicitly so the follower does not have to guess. + ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AdminForwardResponse) Reset() { + *x = AdminForwardResponse{} + mi := &file_admin_forward_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AdminForwardResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AdminForwardResponse) ProtoMessage() {} + +func (x *AdminForwardResponse) ProtoReflect() protoreflect.Message { + mi := &file_admin_forward_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AdminForwardResponse.ProtoReflect.Descriptor instead. +func (*AdminForwardResponse) Descriptor() ([]byte, []int) { + return file_admin_forward_proto_rawDescGZIP(), []int{2} +} + +func (x *AdminForwardResponse) GetStatusCode() int32 { + if x != nil { + return x.StatusCode + } + return 0 +} + +func (x *AdminForwardResponse) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *AdminForwardResponse) GetContentType() string { + if x != nil { + return x.ContentType + } + return "" +} + +var File_admin_forward_proto protoreflect.FileDescriptor + +const file_admin_forward_proto_rawDesc = "" + + "\n" + + "\x13admin_forward.proto\"C\n" + + "\x0eAdminPrincipal\x12\x1d\n" + + "\n" + + "access_key\x18\x01 \x01(\tR\taccessKey\x12\x12\n" + + "\x04role\x18\x02 \x01(\tR\x04role\"\xb4\x01\n" + + "\x13AdminForwardRequest\x12-\n" + + "\tprincipal\x18\x01 \x01(\v2\x0f.AdminPrincipalR\tprincipal\x12-\n" + + "\toperation\x18\x02 \x01(\x0e2\x0f.AdminOperationR\toperation\x12\x18\n" + + "\apayload\x18\x03 \x01(\fR\apayload\x12%\n" + + "\x0eforwarded_from\x18\x04 \x01(\tR\rforwardedFrom\"t\n" + + "\x14AdminForwardResponse\x12\x1f\n" + + "\vstatus_code\x18\x01 \x01(\x05R\n" + + "statusCode\x12\x18\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x12!\n" + + "\fcontent_type\x18\x03 \x01(\tR\vcontentType*`\n" + + "\x0eAdminOperation\x12\x18\n" + + "\x14ADMIN_OP_UNSPECIFIED\x10\x00\x12\x19\n" + + "\x15ADMIN_OP_CREATE_TABLE\x10\x01\x12\x19\n" + + "\x15ADMIN_OP_DELETE_TABLE\x10\x022H\n" + + "\fAdminForward\x128\n" + + "\aForward\x12\x14.AdminForwardRequest\x1a\x15.AdminForwardResponse\"\x00B#Z!github.com/bootjp/elastickv/protob\x06proto3" + +var ( + file_admin_forward_proto_rawDescOnce sync.Once + file_admin_forward_proto_rawDescData []byte +) + +func file_admin_forward_proto_rawDescGZIP() []byte { + file_admin_forward_proto_rawDescOnce.Do(func() { + file_admin_forward_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_admin_forward_proto_rawDesc), len(file_admin_forward_proto_rawDesc))) + }) + return file_admin_forward_proto_rawDescData +} + +var file_admin_forward_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_admin_forward_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_admin_forward_proto_goTypes = []any{ + (AdminOperation)(0), // 0: AdminOperation + (*AdminPrincipal)(nil), // 1: AdminPrincipal + (*AdminForwardRequest)(nil), // 2: AdminForwardRequest + (*AdminForwardResponse)(nil), // 3: AdminForwardResponse +} +var file_admin_forward_proto_depIdxs = []int32{ + 1, // 0: AdminForwardRequest.principal:type_name -> AdminPrincipal + 0, // 1: AdminForwardRequest.operation:type_name -> AdminOperation + 2, // 2: AdminForward.Forward:input_type -> AdminForwardRequest + 3, // 3: AdminForward.Forward:output_type -> AdminForwardResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_admin_forward_proto_init() } +func file_admin_forward_proto_init() { + if File_admin_forward_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_admin_forward_proto_rawDesc), len(file_admin_forward_proto_rawDesc)), + NumEnums: 1, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_admin_forward_proto_goTypes, + DependencyIndexes: file_admin_forward_proto_depIdxs, + EnumInfos: file_admin_forward_proto_enumTypes, + MessageInfos: file_admin_forward_proto_msgTypes, + }.Build() + File_admin_forward_proto = out.File + file_admin_forward_proto_goTypes = nil + file_admin_forward_proto_depIdxs = nil +} diff --git a/proto/admin_forward.proto b/proto/admin_forward.proto new file mode 100644 index 000000000..c0ddf5cab --- /dev/null +++ b/proto/admin_forward.proto @@ -0,0 +1,77 @@ +syntax = "proto3"; + +option go_package = "github.com/bootjp/elastickv/proto"; + +// AdminForward is the SigV4-bypass RPC the admin dashboard uses to +// reach the Raft leader from a follower. Design Section 3.3. +// +// The admin's HTTP layer authenticates the caller with a JWT cookie, +// resolves an access-key + role principal, and — if the local node +// is not the verified Raft leader — calls Forward over this RPC. +// +// The leader does NOT trust the inbound principal: it re-evaluates +// access-key membership and role against its own access-key +// configuration before dispatching to the adapter (Section 3.3.1 +// "principal-aware internal RPC"). A follower whose configuration is +// mid-rolling-update can therefore not bypass the leader's view of +// authorisation. +// +// AdminForward is intentionally separate from the existing +// `Internal.Forward` RPC, which carries SigV4 traffic and has no +// principal field — sharing the same RPC would force the admin path +// to either smuggle a principal through reserved fields or strip +// the SigV4-only assumptions out of the existing Forward server. +service AdminForward { + // Forward dispatches an admin write operation on the leader's + // behalf. Returns a structured response carrying the same JSON + // payload the leader's handler would have written to the wire. + rpc Forward(AdminForwardRequest) returns (AdminForwardResponse) {} +} + +// AdminPrincipal captures the access-key + role the follower +// resolved from the inbound JWT. The leader re-validates this +// against its own access-key map before acting on it; the follower +// could be operating with a stale role assignment. +message AdminPrincipal { + string access_key = 1; + string role = 2; // "read_only" | "full" +} + +// AdminOperation enumerates the operations the dashboard can issue. +// Adding a new operation requires both ends of the cluster to +// understand it; mismatches are caught by the version compatibility +// flag (`admin.leader_forward_v2`) before the call goes out. +enum AdminOperation { + ADMIN_OP_UNSPECIFIED = 0; + ADMIN_OP_CREATE_TABLE = 1; + ADMIN_OP_DELETE_TABLE = 2; +} + +message AdminForwardRequest { + AdminPrincipal principal = 1; + AdminOperation operation = 2; + // payload is the JSON-encoded operation-specific request body — + // the same bytes the SPA sent to the follower's HTTP endpoint. + // The leader decodes this with the operation's known schema. + bytes payload = 3; + // forwarded_from is the follower's node id; the leader echoes it + // into the audit log line for the operation so operators can + // trace which follower carried the request (design Section 3.3.2 + // acceptance criteria 6). + string forwarded_from = 4; +} + +message AdminForwardResponse { + // status_code is the HTTP status the leader's handler would have + // written. The follower returns this verbatim to the SPA so the + // failure semantics (404 vs 409 vs 400) match a leader-direct + // call exactly. + int32 status_code = 1; + // payload is the JSON-encoded response body. Empty when + // status_code is 204 No Content (DELETE success). + bytes payload = 2; + // content_type is the response content type the leader chose; in + // practice always "application/json; charset=utf-8" but echoed + // explicitly so the follower does not have to guess. + string content_type = 3; +} diff --git a/proto/admin_forward_grpc.pb.go b/proto/admin_forward_grpc.pb.go new file mode 100644 index 000000000..9b9169893 --- /dev/null +++ b/proto/admin_forward_grpc.pb.go @@ -0,0 +1,167 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.1 +// - protoc v7.34.0 +// source: admin_forward.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + AdminForward_Forward_FullMethodName = "/AdminForward/Forward" +) + +// AdminForwardClient is the client API for AdminForward service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// AdminForward is the SigV4-bypass RPC the admin dashboard uses to +// reach the Raft leader from a follower. Design Section 3.3. +// +// The admin's HTTP layer authenticates the caller with a JWT cookie, +// resolves an access-key + role principal, and — if the local node +// is not the verified Raft leader — calls Forward over this RPC. +// +// The leader does NOT trust the inbound principal: it re-evaluates +// access-key membership and role against its own access-key +// configuration before dispatching to the adapter (Section 3.3.1 +// "principal-aware internal RPC"). A follower whose configuration is +// mid-rolling-update can therefore not bypass the leader's view of +// authorisation. +// +// AdminForward is intentionally separate from the existing +// `Internal.Forward` RPC, which carries SigV4 traffic and has no +// principal field — sharing the same RPC would force the admin path +// to either smuggle a principal through reserved fields or strip +// the SigV4-only assumptions out of the existing Forward server. +type AdminForwardClient interface { + // Forward dispatches an admin write operation on the leader's + // behalf. Returns a structured response carrying the same JSON + // payload the leader's handler would have written to the wire. + Forward(ctx context.Context, in *AdminForwardRequest, opts ...grpc.CallOption) (*AdminForwardResponse, error) +} + +type adminForwardClient struct { + cc grpc.ClientConnInterface +} + +func NewAdminForwardClient(cc grpc.ClientConnInterface) AdminForwardClient { + return &adminForwardClient{cc} +} + +func (c *adminForwardClient) Forward(ctx context.Context, in *AdminForwardRequest, opts ...grpc.CallOption) (*AdminForwardResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(AdminForwardResponse) + err := c.cc.Invoke(ctx, AdminForward_Forward_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// AdminForwardServer is the server API for AdminForward service. +// All implementations must embed UnimplementedAdminForwardServer +// for forward compatibility. +// +// AdminForward is the SigV4-bypass RPC the admin dashboard uses to +// reach the Raft leader from a follower. Design Section 3.3. +// +// The admin's HTTP layer authenticates the caller with a JWT cookie, +// resolves an access-key + role principal, and — if the local node +// is not the verified Raft leader — calls Forward over this RPC. +// +// The leader does NOT trust the inbound principal: it re-evaluates +// access-key membership and role against its own access-key +// configuration before dispatching to the adapter (Section 3.3.1 +// "principal-aware internal RPC"). A follower whose configuration is +// mid-rolling-update can therefore not bypass the leader's view of +// authorisation. +// +// AdminForward is intentionally separate from the existing +// `Internal.Forward` RPC, which carries SigV4 traffic and has no +// principal field — sharing the same RPC would force the admin path +// to either smuggle a principal through reserved fields or strip +// the SigV4-only assumptions out of the existing Forward server. +type AdminForwardServer interface { + // Forward dispatches an admin write operation on the leader's + // behalf. Returns a structured response carrying the same JSON + // payload the leader's handler would have written to the wire. + Forward(context.Context, *AdminForwardRequest) (*AdminForwardResponse, error) + mustEmbedUnimplementedAdminForwardServer() +} + +// UnimplementedAdminForwardServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedAdminForwardServer struct{} + +func (UnimplementedAdminForwardServer) Forward(context.Context, *AdminForwardRequest) (*AdminForwardResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Forward not implemented") +} +func (UnimplementedAdminForwardServer) mustEmbedUnimplementedAdminForwardServer() {} +func (UnimplementedAdminForwardServer) testEmbeddedByValue() {} + +// UnsafeAdminForwardServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AdminForwardServer will +// result in compilation errors. +type UnsafeAdminForwardServer interface { + mustEmbedUnimplementedAdminForwardServer() +} + +func RegisterAdminForwardServer(s grpc.ServiceRegistrar, srv AdminForwardServer) { + // If the following call panics, it indicates UnimplementedAdminForwardServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&AdminForward_ServiceDesc, srv) +} + +func _AdminForward_Forward_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AdminForwardRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AdminForwardServer).Forward(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: AdminForward_Forward_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AdminForwardServer).Forward(ctx, req.(*AdminForwardRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// AdminForward_ServiceDesc is the grpc.ServiceDesc for AdminForward service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var AdminForward_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "AdminForward", + HandlerType: (*AdminForwardServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Forward", + Handler: _AdminForward_Forward_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "admin_forward.proto", +} From 9b411227ed0fc4cb775a32576e2634ceda2f5d7c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 23:22:57 +0900 Subject: [PATCH 2/7] admin: tighten Forward decoding (Gemini security + Codex P1) Two findings on the leader-side AdminForward handler: - Gemini security-medium: handleDelete unmarshalled the raw payload with no size cap, so a hostile follower could push a multi-MiB body and consume memory before the JSON parser noticed. Apply a 64 KiB cap (mirrors the HTTP path defaultBodyLimit) on both handleCreate and handleDelete; payloads past the cap return 413 payload_too_large without ever touching json.Unmarshal. - Codex P1: handleCreate decoded with plain json.Unmarshal, bypassing the strict checks the HTTP path runs through decodeCreateTableRequest (DisallowUnknownFields, dec.More() trailing-token rejection, slash-in-name validation, the rest of validateCreateTableRequest). Reuse decodeCreateTableRequest so a forwarded create cannot smuggle past validations a leader-direct create would have caught. handleDelete also gains DisallowUnknownFields + dec.More() so its payload contract matches the create path. Tests cover the new 413 paths, the unknown-field rejection, and the slash-in-name rejection on the forwarded create path. --- internal/admin/forward_server.go | 42 ++++++++++++++-- internal/admin/forward_server_test.go | 69 +++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/internal/admin/forward_server.go b/internal/admin/forward_server.go index 24e0b2cdb..76e1ed074 100644 --- a/internal/admin/forward_server.go +++ b/internal/admin/forward_server.go @@ -1,6 +1,7 @@ package admin import ( + "bytes" "context" "errors" "log/slog" @@ -10,6 +11,15 @@ import ( "github.com/goccy/go-json" ) +// adminForwardPayloadLimit caps the JSON payload the leader will +// decode for any Forward operation. Mirrors defaultBodyLimit on the +// HTTP path (64 KiB) so a single Forward call cannot consume more +// memory than the same operation would over /admin/api/v1/dynamo/. +// gRPC has its own 4 MiB max-message default, but that is way too +// permissive for admin: a follower-forwarded request must obey the +// same 64 KiB ceiling we promise on the public API surface. +const adminForwardPayloadLimit = 64 << 10 + // ForwardServer is the leader-side gRPC handler for the AdminForward // RPC (design Section 3.3). The follower's admin HTTP layer calls it // when the local node is not the Raft leader; this server then @@ -122,9 +132,20 @@ func (s *ForwardServer) validatePrincipal(p *pb.AdminPrincipal) (AuthPrincipal, } func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipal, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { - var body CreateTableRequest - if err := json.Unmarshal(req.GetPayload(), &body); err != nil { - return rejectForward(http.StatusBadRequest, "invalid_body", "request body is not valid JSON") + payload := req.GetPayload() + if len(payload) > adminForwardPayloadLimit { + return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", + "forwarded payload exceeds the 64 KiB admin limit") + } + // Reuse the HTTP handler's strict decoder so the forwarded + // path enforces the same shape contract — DisallowUnknownFields, + // trailing-token rejection, slash-in-name rejection, and the + // rest of validateCreateTableRequest. Bypassing it here would + // let a hostile follower (or a misbehaving SPA on the follower + // side) sneak past validations the leader-direct path enforces. + body, err := decodeCreateTableRequest(bytes.NewReader(payload)) + if err != nil { + return rejectForward(http.StatusBadRequest, "invalid_body", err.Error()) } summary, err := s.source.AdminCreateTable(ctx, principal, body) if err != nil { @@ -145,10 +166,23 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa // proto stays operation-agnostic — there is no operation-specific // field in AdminForwardRequest, by design (adding one per op // would couple every new admin endpoint to the proto schema). + payload := req.GetPayload() + if len(payload) > adminForwardPayloadLimit { + return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", + "forwarded payload exceeds the 64 KiB admin limit") + } + dec := json.NewDecoder(bytes.NewReader(payload)) + dec.DisallowUnknownFields() var body struct { Name string `json:"name"` } - if err := json.Unmarshal(req.GetPayload(), &body); err != nil || body.Name == "" { + if err := dec.Decode(&body); err != nil { + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload is not valid JSON") + } + if dec.More() { + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload has trailing data") + } + if body.Name == "" { return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload missing name") } if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil { diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go index e9e1cd9f6..a94425eed 100644 --- a/internal/admin/forward_server_test.go +++ b/internal/admin/forward_server_test.go @@ -163,6 +163,75 @@ func TestForwardServer_UnknownOperationRejected(t *testing.T) { require.Contains(t, string(resp.GetPayload()), "unknown admin operation") } +// TestForwardServer_CreateTable_PayloadTooLargeReturns413 exercises +// the size cap added in response to the Gemini security-medium +// finding. The leader must refuse to decode payloads bigger than +// the HTTP path's 64 KiB limit; gRPC's own 4 MiB default is too +// permissive for the admin surface. +func TestForwardServer_CreateTable_PayloadTooLargeReturns413(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + oversize := make([]byte, adminForwardPayloadLimit+1) + for i := range oversize { + oversize[i] = 'x' + } + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: oversize, + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusRequestEntityTooLarge), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "payload_too_large") +} + +// TestForwardServer_CreateTable_RejectsUnknownFields confirms the +// decode path now reuses decodeCreateTableRequest's +// DisallowUnknownFields setting (Codex P1). Without this, a +// follower could smuggle silently-ignored fields the leader-direct +// HTTP path would have rejected. +func TestForwardServer_CreateTable_RejectsUnknownFields(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: []byte(`{"table_name":"u","partition_key":{"name":"id","type":"S"},"unknown_field":1}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "invalid_body") +} + +// TestForwardServer_CreateTable_RejectsSlashInName confirms the +// strict validation pulled in via decodeCreateTableRequest also +// catches the slash-rejection rule the HTTP path enforces. +func TestForwardServer_CreateTable_RejectsSlashInName(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: []byte(`{"table_name":"foo/bar","partition_key":{"name":"id","type":"S"}}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "must not contain") +} + +// TestForwardServer_DeleteTable_PayloadTooLargeReturns413 mirrors +// the create-side cap: an oversized delete payload must be refused +// before the JSON decoder runs, regardless of how the gRPC layer +// configures its own message limits. +func TestForwardServer_DeleteTable_PayloadTooLargeReturns413(t *testing.T) { + srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) + oversize := make([]byte, adminForwardPayloadLimit+1) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + Payload: oversize, + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusRequestEntityTooLarge), resp.GetStatusCode()) +} + func TestForwardServer_CreateTable_GenericErrorReturns500(t *testing.T) { // A non-sentinel error from the source must surface as 500 with // a sanitised message. The leader logs the raw error; nothing From 1de2e398487a5d5db765c9aff113da8a8f8ffa69 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 00:14:56 +0900 Subject: [PATCH 3/7] =?UTF-8?q?admin:=20harden=20forwarded=20delete=20(NUL?= =?UTF-8?q?=20+=20slash)=20(Codex=20P2=20=C3=972)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related findings on PR #635 / forward_server.go's handleDelete: - NUL-byte smuggling: handleDelete decoded with goccy/go-json but skipped the explicit NUL scan that decodeCreateTableRequest applies. Same vector as the #634 fix — `{"name":"users"}\x00{"extra":1}` passes dec.More() because goccy treats NUL as end-of-input. Add the same pre-decode NUL rejection. - Slash-in-name divergence: the HTTP handleDelete and handleDescribe both reject `/` in the table name with 404, but the forwarded delete just passed body.Name straight through to AdminDeleteTable. A forwarded call could therefore act on slash-bearing tables that a leader-direct call would 404. Reject symmetrically before invoking the source. Tests: two new ForwardServer cases — NUL payload + slash name. Both confirm the source is never invoked when the precondition fails (defence in depth — an asymmetric stub source would still make the test green if we only checked the response code). --- internal/admin/forward_server.go | 17 ++++++++++++ internal/admin/forward_server_test.go | 37 +++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/internal/admin/forward_server.go b/internal/admin/forward_server.go index 76e1ed074..4bb566d5d 100644 --- a/internal/admin/forward_server.go +++ b/internal/admin/forward_server.go @@ -6,6 +6,7 @@ import ( "errors" "log/slog" "net/http" + "strings" pb "github.com/bootjp/elastickv/proto" "github.com/goccy/go-json" @@ -171,6 +172,14 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", "forwarded payload exceeds the 64 KiB admin limit") } + // Mirror decodeCreateTableRequest's NUL-byte guard: goccy/go-json + // treats raw NUL as end-of-input so dec.More() would otherwise + // miss `{"name":"users"}\x00{"extra":1}` payloads. Codex P2 on + // PR #635 flagged this as the same smuggling vector that the + // HTTP create path already covers. + if bytes.IndexByte(payload, 0) >= 0 { + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload contains a NUL byte") + } dec := json.NewDecoder(bytes.NewReader(payload)) dec.DisallowUnknownFields() var body struct { @@ -185,6 +194,14 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa if body.Name == "" { return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload missing name") } + // Reject slash-bearing names symmetrically with the HTTP + // handleDelete and handleDescribe paths. Without this, a + // forwarded call could act on `foo/bar` while a leader-direct + // call would 404 — divergent behaviour Codex P2 flagged on + // PR #635. + if strings.ContainsRune(body.Name, '/') { + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload name must not contain '/'") + } if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil { return forwardErrorResponse(err), nil } diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go index a94425eed..daef2a2a1 100644 --- a/internal/admin/forward_server_test.go +++ b/internal/admin/forward_server_test.go @@ -151,6 +151,43 @@ func TestForwardServer_DeleteTable_MissingReturns404(t *testing.T) { require.Equal(t, int32(http.StatusNotFound), resp.GetStatusCode()) } +// TestForwardServer_DeleteTable_RejectsNULBytePayload exercises +// the same Codex P2 vector that the create-table path covers: +// goccy/go-json treats raw NUL as end-of-input, so a body like +// `{"name":"users"}\x00{"extra":1}` would otherwise pass dec.More() +// undetected. The handler now scans for NUL before decoding. +func TestForwardServer_DeleteTable_RejectsNULBytePayload(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{"users": {Name: "users"}}} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + Payload: []byte("{\"name\":\"users\"}\x00{\"extra\":1}"), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "NUL byte") + // Source must not be reached — the table still exists in the + // stub map after the rejection. + require.Contains(t, src.tables, "users") +} + +// TestForwardServer_DeleteTable_RejectsSlashInName mirrors the +// HTTP path's slash rejection on forwarded delete requests so the +// two surfaces cannot diverge (Codex P2). +func TestForwardServer_DeleteTable_RejectsSlashInName(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + Payload: []byte(`{"name":"foo/bar"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusBadRequest), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "must not contain") +} + func TestForwardServer_UnknownOperationRejected(t *testing.T) { srv := newForwardServerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}, fullPrincipalRoleStore()) resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ From 02143bc613da14558d826249124aa225bf74b247 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 00:50:23 +0900 Subject: [PATCH 4/7] =?UTF-8?q?admin:=20forward=20500=20parity=20+=20log?= =?UTF-8?q?=20unexpected=20source=20errors=20(Codex=20P2=20=C3=972)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related findings on the leader-side AdminForward dispatcher. - Operation-specific 500 codes: forwardErrorResponse mapped any unmapped failure to {"error":"internal", ...}, but the HTTP handler's writeTablesError emits dynamo_create_failed / dynamo_delete_failed. A forwarded write that hit the same backend/storage error therefore returned a different code than a leader-direct write — clients branching on those codes saw divergent behaviour. Fix: forwardErrorResponse now takes an `op` argument ("create" or "delete") so the fallthrough emits dynamo__failed, matching leader-direct parity exactly. - Log unexpected source errors: handleCreate / handleDelete on the forward path returned 500 with "see leader logs" in the body but never logged anything for non-sentinel source failures, leaving operators with no breadcrumb for forwarded 500s. Add logUnexpectedSourceError that emits an admin_forward__failed line at LevelError carrying the table, forwarded_from, and raw error. Sentinels (forbidden, not-found, already-exists, validation) skip the log — they're routine client-side outcomes, and logging at LevelError would drown the operational signal (same policy as the HTTP path's writeTablesError). Tests: - TestForwardServer_CreateTable_GenericErrorReturns500 now also asserts the response body contains dynamo_create_failed. - New TestForwardServer_DeleteTable_GenericErrorUsesOperationSpecificCode pins the delete-side code parity. - New TestForwardServer_LogsUnexpectedSourceError captures the slog output and confirms the table, forwarded_from, and raw error all appear. - New TestForwardServer_DoesNotLogStructuredSourceErrors sweeps the four sentinel cases and confirms the LevelError handler never emits anything. --- internal/admin/forward_server.go | 55 +++++++++++++++-- internal/admin/forward_server_test.go | 85 +++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 4 deletions(-) diff --git a/internal/admin/forward_server.go b/internal/admin/forward_server.go index 4bb566d5d..7113d1628 100644 --- a/internal/admin/forward_server.go +++ b/internal/admin/forward_server.go @@ -150,7 +150,8 @@ func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipa } summary, err := s.source.AdminCreateTable(ctx, principal, body) if err != nil { - return forwardErrorResponse(err), nil + s.logUnexpectedSourceError(ctx, "create_table", body.TableName, req.GetForwardedFrom(), err) + return forwardErrorResponse("create", err), nil } s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", slog.String("actor", principal.AccessKey), @@ -203,7 +204,8 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload name must not contain '/'") } if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil { - return forwardErrorResponse(err), nil + s.logUnexpectedSourceError(ctx, "delete_table", body.Name, req.GetForwardedFrom(), err) + return forwardErrorResponse("delete", err), nil } s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", slog.String("actor", principal.AccessKey), @@ -220,7 +222,14 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa // is the leader-side counterpart of writeTablesError: every status / // JSON code the HTTP handler chooses is mirrored here so a forwarded // call is indistinguishable to the SPA from a leader-direct call. -func forwardErrorResponse(err error) *pb.AdminForwardResponse { +// +// op is "create" or "delete" so the unmapped 500 fallthrough emits +// dynamo_create_failed / dynamo_delete_failed — the same +// operation-specific codes the leader-direct HTTP path produces in +// writeTablesError. Without this, forwarded write failures showed +// up to clients as a generic "internal" code, breaking parity with +// the leader-direct path (Codex P2 on PR #635). +func forwardErrorResponse(op string, err error) *pb.AdminForwardResponse { switch { case errors.Is(err, ErrTablesForbidden): return mustForwardJSON(http.StatusForbidden, errorBody{Error: "forbidden", Message: "this endpoint requires a full-access role"}) @@ -238,7 +247,45 @@ func forwardErrorResponse(err error) *pb.AdminForwardResponse { if errors.As(err, &verr) { return mustForwardJSON(http.StatusBadRequest, errorBody{Error: "invalid_request", Message: verr.Error()}) } - return mustForwardJSON(http.StatusInternalServerError, errorBody{Error: "internal", Message: "internal error; see leader logs"}) + return mustForwardJSON(http.StatusInternalServerError, errorBody{ + Error: "dynamo_" + op + "_failed", + Message: "failed to " + op + " table; see leader logs", + }) +} + +// logUnexpectedSourceError emits an error log for non-sentinel +// source failures so operators have a breadcrumb when forwarded +// writes 500. Sentinel errors that map to specific HTTP statuses +// (forbidden, not-found, validation, ...) are deliberately +// silent: those are routine client-side failures, not server +// regressions, and logging them at LevelError would drown the +// operational signal. The HTTP path's writeTablesError applies +// the same policy (Codex P2 on PR #635 flagged the silent path). +func (s *ForwardServer) logUnexpectedSourceError(ctx context.Context, op, table, forwardedFrom string, err error) { + if isStructuredSourceError(err) { + return + } + s.logger.LogAttrs(ctx, slog.LevelError, "admin_forward_"+op+"_failed", + slog.String("table", table), + slog.String("forwarded_from", forwardedFrom), + slog.String("error", err.Error()), + ) +} + +// isStructuredSourceError reports whether err is one of the +// admin-package sentinels or a ValidationError — i.e., a known +// failure mode the handler maps to a non-500 status. These are +// expected and not log-worthy. +func isStructuredSourceError(err error) bool { + switch { + case errors.Is(err, ErrTablesForbidden), + errors.Is(err, ErrTablesNotLeader), + errors.Is(err, ErrTablesNotFound), + errors.Is(err, ErrTablesAlreadyExists): + return true + } + var verr *ValidationError + return errors.As(err, &verr) } // errorBody is the shared JSON shape for both the HTTP handler's diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go index daef2a2a1..a6d044a24 100644 --- a/internal/admin/forward_server_test.go +++ b/internal/admin/forward_server_test.go @@ -1,8 +1,10 @@ package admin import ( + "bytes" "context" "errors" + "log/slog" "net/http" "testing" @@ -285,4 +287,87 @@ func TestForwardServer_CreateTable_GenericErrorReturns500(t *testing.T) { require.NoError(t, err) require.Equal(t, int32(http.StatusInternalServerError), resp.GetStatusCode()) require.NotContains(t, string(resp.GetPayload()), "ZQ-993") + // Error code parity with leader-direct path: must use + // dynamo_create_failed, not the previous generic "internal" + // (Codex P2 on PR #635). + require.Contains(t, string(resp.GetPayload()), `"error":"dynamo_create_failed"`) +} + +// TestForwardServer_DeleteTable_GenericErrorUsesOperationSpecificCode +// is the delete-side counterpart: 500 fallthrough must use +// dynamo_delete_failed so client retry/branching logic that +// already handles the leader-direct path works unchanged. +func TestForwardServer_DeleteTable_GenericErrorUsesOperationSpecificCode(t *testing.T) { + src := &stubTablesSource{ + tables: map[string]*DynamoTableSummary{"users": {Name: "users"}}, + deleteErr: errors.New("storage backing sentinel DEL-1"), + } + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + Payload: []byte(`{"name":"users"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusInternalServerError), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), `"error":"dynamo_delete_failed"`) + require.NotContains(t, string(resp.GetPayload()), "DEL-1") +} + +// TestForwardServer_LogsUnexpectedSourceError confirms the leader +// emits an error log line for non-sentinel source failures so +// operators can investigate forwarded write 500s without +// reverse-engineering the response body. The HTTP path's +// writeTablesError already logs; without the symmetric emit on +// the forward path, forwarded failures were operationally invisible +// (Codex P2 on PR #635). +func TestForwardServer_LogsUnexpectedSourceError(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelError})) + src := &stubTablesSource{createErr: errors.New("storage sentinel SENT-42")} + srv := NewForwardServer(src, fullPrincipalRoleStore(), logger) + + _, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "u", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + ForwardedFrom: "follower-9", + }) + require.NoError(t, err) + logged := buf.String() + require.Contains(t, logged, "admin_forward_create_table_failed") + require.Contains(t, logged, "SENT-42") + require.Contains(t, logged, "follower-9") +} + +// TestForwardServer_DoesNotLogStructuredSourceErrors confirms the +// "expected" sentinels (forbidden, not-found, already-exists, +// validation) do NOT emit error-level logs. They are routine +// client-side outcomes, not server regressions, so logging them +// at LevelError would drown the operational signal. +func TestForwardServer_DoesNotLogStructuredSourceErrors(t *testing.T) { + cases := []struct { + name string + err error + }{ + {"forbidden", ErrTablesForbidden}, + {"not_found", ErrTablesNotFound}, + {"already_exists", ErrTablesAlreadyExists}, + {"validation", &ValidationError{Message: "field foo invalid"}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelError})) + src := &stubTablesSource{createErr: tc.err} + srv := NewForwardServer(src, fullPrincipalRoleStore(), logger) + _, _ = srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "u", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + }) + require.NotContains(t, buf.String(), "admin_forward_create_table_failed", + "sentinel error %q must not produce an error log", tc.name) + }) + } } From b65d5bfda08a96c59b157f0e863f1b91c15f073e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:08:46 +0900 Subject: [PATCH 5/7] admin: cover forwarded ErrTablesNotLeader race-window (Claude review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit forwardErrorResponse already maps ErrTablesNotLeader to 503 leader_unavailable for the case where the leader passes its isVerifiedDynamoLeader guard but then drops leadership mid- Dispatch — the comment at forward_server.go:248 documents the race. No test exercised that path though, which Claude's review on PR #635 called out as a gap. Add two tests that pin both the create and delete sides: TestForwardServer_CreateTable_LeaderSteppedDownReturns503 and TestForwardServer_DeleteTable_LeaderSteppedDownReturns503. Both inject ErrTablesNotLeader through the stub source and assert 503 + leader_unavailable in the response body. These are pure stub-driven unit tests so they run alongside the existing 17 forward-server cases without spinning up a real Raft cluster. --- internal/admin/forward_server_test.go | 40 +++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go index a6d044a24..d2934506d 100644 --- a/internal/admin/forward_server_test.go +++ b/internal/admin/forward_server_test.go @@ -314,6 +314,46 @@ func TestForwardServer_DeleteTable_GenericErrorUsesOperationSpecificCode(t *test require.NotContains(t, string(resp.GetPayload()), "DEL-1") } +// TestForwardServer_CreateTable_LeaderSteppedDownReturns503 covers +// the race-window comment in forwardErrorResponse: even though the +// leader passed isVerifiedDynamoLeader at the top of +// AdminCreateTable, leadership can still drop mid-dispatch and the +// adapter then returns ErrTablesNotLeader. The forward server must +// surface that as a 503 leader_unavailable response so the +// follower's bridge can re-emit the exact 503 contract clients +// already know — Claude review on PR #635 flagged the gap (no test +// exercised this code path despite the comment). +func TestForwardServer_CreateTable_LeaderSteppedDownReturns503(t *testing.T) { + src := &stubTablesSource{createErr: ErrTablesNotLeader} + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "users", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusServiceUnavailable), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "leader_unavailable") +} + +// TestForwardServer_DeleteTable_LeaderSteppedDownReturns503 mirrors +// the create-side coverage on the delete path. +func TestForwardServer_DeleteTable_LeaderSteppedDownReturns503(t *testing.T) { + src := &stubTablesSource{ + tables: map[string]*DynamoTableSummary{"users": {Name: "users"}}, + deleteErr: ErrTablesNotLeader, + } + srv := newForwardServerForTest(src, fullPrincipalRoleStore()) + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_DELETE_TABLE, + Payload: []byte(`{"name":"users"}`), + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusServiceUnavailable), resp.GetStatusCode()) + require.Contains(t, string(resp.GetPayload()), "leader_unavailable") +} + // TestForwardServer_LogsUnexpectedSourceError confirms the leader // emits an error log line for non-sentinel source failures so // operators can investigate forwarded write 500s without From b5dc48d23ae58bbb17a317a746d8c081f8e8ea2a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:17:07 +0900 Subject: [PATCH 6/7] admin: drop dup RoleStore + Retry-After in forward 503 (Codex P2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two changes on the AdminForward leader-side dispatcher: - Codex P2 on PR #635: forwarded 503 leader_unavailable lost the Retry-After: 1 header that the leader-direct HTTP path emits. Add retry_after_seconds to AdminForwardResponse so the follower's bridge can rebuild the same HTTP header (the bridge side wires it in the next PR; the proto + leader populate it here). Direct HTTP path is unaffected — the proto field is only consumed during forwarding. - Drop the duplicate RoleStore/MapRoleStore declarations from forward_server.go. They now live in role_store.go (added on PR #634 for HTTP-side role revalidation) so both surfaces share one definition. Tests: extend TestForwardServer_CreateTable_LeaderSteppedDownReturns503 to pin the retry_after_seconds=1 hint. --- internal/admin/forward_server.go | 33 +++++++-------------------- internal/admin/forward_server_test.go | 6 +++++ proto/admin_forward.pb.go | 28 +++++++++++++++++++---- proto/admin_forward.proto | 10 ++++++++ 4 files changed, 47 insertions(+), 30 deletions(-) diff --git a/internal/admin/forward_server.go b/internal/admin/forward_server.go index 7113d1628..0caaefff1 100644 --- a/internal/admin/forward_server.go +++ b/internal/admin/forward_server.go @@ -42,30 +42,6 @@ type ForwardServer struct { logger *slog.Logger } -// RoleStore is the access-key → role lookup the leader uses to -// re-validate the inbound principal. Implementations should mirror -// the admin server's `Roles` map; production passes a typed wrapper -// around that map so tests can swap in an in-memory stub. -type RoleStore interface { - // LookupRole returns the role for an access key as understood - // by the leader's view of cluster configuration. The bool is - // false when the access key is not in the admin role index — a - // follower that forwarded the principal should not be able to - // "make up" an admin identity. - LookupRole(accessKey string) (Role, bool) -} - -// MapRoleStore is the trivial in-memory implementation, sufficient -// for tests and for the production wiring (which already keeps the -// role map in memory). -type MapRoleStore map[string]Role - -// LookupRole implements RoleStore. -func (m MapRoleStore) LookupRole(accessKey string) (Role, bool) { - r, ok := m[accessKey] - return r, ok -} - // NewForwardServer wires a TablesSource and a RoleStore behind the // gRPC AdminForward service. logger may be nil; defaults to // slog.Default(). @@ -237,7 +213,14 @@ func forwardErrorResponse(op string, err error) *pb.AdminForwardResponse { // Should never happen on the leader path — the leader // just verified itself — but if a leadership transfer // races with the dispatch, surface it consistently. - return mustForwardJSON(http.StatusServiceUnavailable, errorBody{Error: "leader_unavailable", Message: "leader stepped down mid-request"}) + // Carry retry_after_seconds=1 so the follower's bridge + // translates it back into the same HTTP Retry-After + // header the leader-direct path emits (Codex P2 on + // PR #635 — without this the forwarded 503 would lose + // its retry timing). + resp := mustForwardJSON(http.StatusServiceUnavailable, errorBody{Error: "leader_unavailable", Message: "leader stepped down mid-request"}) + resp.RetryAfterSeconds = 1 + return resp case errors.Is(err, ErrTablesNotFound): return mustForwardJSON(http.StatusNotFound, errorBody{Error: "not_found", Message: "table does not exist"}) case errors.Is(err, ErrTablesAlreadyExists): diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go index d2934506d..b156d094e 100644 --- a/internal/admin/forward_server_test.go +++ b/internal/admin/forward_server_test.go @@ -334,6 +334,12 @@ func TestForwardServer_CreateTable_LeaderSteppedDownReturns503(t *testing.T) { require.NoError(t, err) require.Equal(t, int32(http.StatusServiceUnavailable), resp.GetStatusCode()) require.Contains(t, string(resp.GetPayload()), "leader_unavailable") + // Codex P2 on PR #635: the leader's 503 must carry the + // retry-after hint over the wire so the follower's bridge can + // rebuild the same HTTP Retry-After header a leader-direct + // 503 would have produced. Without this the forwarded 503 + // silently strips the retry timing. + require.Equal(t, int32(1), resp.GetRetryAfterSeconds()) } // TestForwardServer_DeleteTable_LeaderSteppedDownReturns503 mirrors diff --git a/proto/admin_forward.pb.go b/proto/admin_forward.pb.go index 8578279af..bdedffe68 100644 --- a/proto/admin_forward.pb.go +++ b/proto/admin_forward.pb.go @@ -218,9 +218,19 @@ type AdminForwardResponse struct { // content_type is the response content type the leader chose; in // practice always "application/json; charset=utf-8" but echoed // explicitly so the follower does not have to guess. - ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"` + // retry_after_seconds carries a Retry-After hint the follower's + // bridge translates back into an HTTP Retry-After header. The + // direct HTTP path sets Retry-After: 1 on 503 leader_unavailable + // (mid-dispatch leadership churn); without this field the same + // response shape would lose its retry timing once forwarded — + // clients would see 503 with no Retry-After and fall back to + // their default backoff. Codex P2 on PR #635 flagged the gap. + // Zero means "no Retry-After header"; the field is only meaningful + // for status codes that traditionally carry one (typically 503). + RetryAfterSeconds int32 `protobuf:"varint,4,opt,name=retry_after_seconds,json=retryAfterSeconds,proto3" json:"retry_after_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *AdminForwardResponse) Reset() { @@ -274,6 +284,13 @@ func (x *AdminForwardResponse) GetContentType() string { return "" } +func (x *AdminForwardResponse) GetRetryAfterSeconds() int32 { + if x != nil { + return x.RetryAfterSeconds + } + return 0 +} + var File_admin_forward_proto protoreflect.FileDescriptor const file_admin_forward_proto_rawDesc = "" + @@ -287,12 +304,13 @@ const file_admin_forward_proto_rawDesc = "" + "\tprincipal\x18\x01 \x01(\v2\x0f.AdminPrincipalR\tprincipal\x12-\n" + "\toperation\x18\x02 \x01(\x0e2\x0f.AdminOperationR\toperation\x12\x18\n" + "\apayload\x18\x03 \x01(\fR\apayload\x12%\n" + - "\x0eforwarded_from\x18\x04 \x01(\tR\rforwardedFrom\"t\n" + + "\x0eforwarded_from\x18\x04 \x01(\tR\rforwardedFrom\"\xa4\x01\n" + "\x14AdminForwardResponse\x12\x1f\n" + "\vstatus_code\x18\x01 \x01(\x05R\n" + "statusCode\x12\x18\n" + "\apayload\x18\x02 \x01(\fR\apayload\x12!\n" + - "\fcontent_type\x18\x03 \x01(\tR\vcontentType*`\n" + + "\fcontent_type\x18\x03 \x01(\tR\vcontentType\x12.\n" + + "\x13retry_after_seconds\x18\x04 \x01(\x05R\x11retryAfterSeconds*`\n" + "\x0eAdminOperation\x12\x18\n" + "\x14ADMIN_OP_UNSPECIFIED\x10\x00\x12\x19\n" + "\x15ADMIN_OP_CREATE_TABLE\x10\x01\x12\x19\n" + diff --git a/proto/admin_forward.proto b/proto/admin_forward.proto index c0ddf5cab..99d2d47f8 100644 --- a/proto/admin_forward.proto +++ b/proto/admin_forward.proto @@ -74,4 +74,14 @@ message AdminForwardResponse { // practice always "application/json; charset=utf-8" but echoed // explicitly so the follower does not have to guess. string content_type = 3; + // retry_after_seconds carries a Retry-After hint the follower's + // bridge translates back into an HTTP Retry-After header. The + // direct HTTP path sets Retry-After: 1 on 503 leader_unavailable + // (mid-dispatch leadership churn); without this field the same + // response shape would lose its retry timing once forwarded — + // clients would see 503 with no Retry-After and fall back to + // their default backoff. Codex P2 on PR #635 flagged the gap. + // Zero means "no Retry-After header"; the field is only meaningful + // for status codes that traditionally carry one (typically 503). + int32 retry_after_seconds = 4; } From e6e35964e188aa36aac67fb15b8d5e641fff6725 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:32:00 +0900 Subject: [PATCH 7/7] admin: dedupe errorBody + sanitise forwarded_from (Claude review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two carry-overs from Claude's review on PR #635: - forward_server.go defined errorBody{Error, Message} which was byte-for-byte identical to errorResponse in router.go (same package, same JSON tags). Drop errorBody and use the existing errorResponse type — no functional change, eliminates drift. - ForwardedFrom from the gRPC request was written into slog's LogAttrs verbatim. With JSON output the encoder escapes \n/\r for us, but a text-format handler would let a malicious follower-supplied node id split one audit line into two — defeating log-aggregation parsing or spoofing a synthetic log entry. Sanitise once at the RPC entry point in Forward() and thread the cleaned string through handleCreate / handleDelete / the warning-log paths. Test: TestForwardServer_SanitisesForwardedFromInLog uses a real slog text-format handler (where JSON's auto-escape doesn't help) and confirms a forwarded_from value containing "\n" comes out as spaces in the log line. --- internal/admin/forward_server.go | 65 +++++++++++++++++---------- internal/admin/forward_server_test.go | 28 ++++++++++++ 2 files changed, 70 insertions(+), 23 deletions(-) diff --git a/internal/admin/forward_server.go b/internal/admin/forward_server.go index 0caaefff1..068f3bee6 100644 --- a/internal/admin/forward_server.go +++ b/internal/admin/forward_server.go @@ -62,6 +62,16 @@ func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest if req == nil || req.GetPrincipal() == nil { return rejectForward(http.StatusBadRequest, "invalid_request", "missing principal") } + // Sanitise forwarded_from before it ever reaches a slog + // handler. With JSON output the encoder escapes newlines on + // our behalf, but with a text-format handler an attacker who + // controlled the follower side could embed `\n` in the value + // and split a single audit line into two — defeating + // log-aggregation parsing or spoofing a synthetic entry. + // Replacing CR/LF with spaces at the entry point keeps every + // downstream call site on the leader trivially safe (Claude + // review on PR #635). + forwardedFrom := sanitiseForwardedFrom(req.GetForwardedFrom()) principal, ok := s.validatePrincipal(req.GetPrincipal()) if !ok { // Don't leak why the principal failed — the follower may @@ -69,7 +79,7 @@ func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest // we want operators to investigate from the audit log on // the leader, not the follower's response body. s.logger.LogAttrs(ctx, slog.LevelWarn, "admin_forward_principal_rejected", - slog.String("forwarded_from", req.GetForwardedFrom()), + slog.String("forwarded_from", forwardedFrom), slog.String("claimed_access_key", req.GetPrincipal().GetAccessKey()), slog.String("claimed_role", req.GetPrincipal().GetRole()), ) @@ -78,9 +88,9 @@ func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest } switch req.GetOperation() { case pb.AdminOperation_ADMIN_OP_CREATE_TABLE: - return s.handleCreate(ctx, principal, req) + return s.handleCreate(ctx, principal, forwardedFrom, req) case pb.AdminOperation_ADMIN_OP_DELETE_TABLE: - return s.handleDelete(ctx, principal, req) + return s.handleDelete(ctx, principal, forwardedFrom, req) case pb.AdminOperation_ADMIN_OP_UNSPECIFIED: return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation") default: @@ -108,7 +118,7 @@ func (s *ForwardServer) validatePrincipal(p *pb.AdminPrincipal) (AuthPrincipal, return AuthPrincipal{AccessKey: accessKey, Role: role}, true } -func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipal, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { +func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { payload := req.GetPayload() if len(payload) > adminForwardPayloadLimit { return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", @@ -126,20 +136,20 @@ func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipa } summary, err := s.source.AdminCreateTable(ctx, principal, body) if err != nil { - s.logUnexpectedSourceError(ctx, "create_table", body.TableName, req.GetForwardedFrom(), err) + s.logUnexpectedSourceError(ctx, "create_table", body.TableName, forwardedFrom, err) return forwardErrorResponse("create", err), nil } s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", slog.String("actor", principal.AccessKey), slog.String("role", string(principal.Role)), - slog.String("forwarded_from", req.GetForwardedFrom()), + slog.String("forwarded_from", forwardedFrom), slog.String("operation", "create_table"), slog.String("table", body.TableName), ) return jsonForwardResponse(http.StatusCreated, summary) } -func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipal, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { +func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { // Delete carries the table name in the payload as JSON so the // proto stays operation-agnostic — there is no operation-specific // field in AdminForwardRequest, by design (adding one per op @@ -180,19 +190,35 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload name must not contain '/'") } if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil { - s.logUnexpectedSourceError(ctx, "delete_table", body.Name, req.GetForwardedFrom(), err) + s.logUnexpectedSourceError(ctx, "delete_table", body.Name, forwardedFrom, err) return forwardErrorResponse("delete", err), nil } s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", slog.String("actor", principal.AccessKey), slog.String("role", string(principal.Role)), - slog.String("forwarded_from", req.GetForwardedFrom()), + slog.String("forwarded_from", forwardedFrom), slog.String("operation", "delete_table"), slog.String("table", body.Name), ) return &pb.AdminForwardResponse{StatusCode: http.StatusNoContent}, nil } +// sanitiseForwardedFrom strips CR/LF from a follower-supplied +// node id so a malicious value cannot split a single audit log +// line into two when slog is using a text-format handler. JSON +// handlers escape these characters automatically; this is a +// defence-in-depth pass for handler-format-agnostic safety. +// Other control characters are deliberately preserved — only the +// line-splitting characters matter for log spoofing. +func sanitiseForwardedFrom(s string) string { + return strings.Map(func(r rune) rune { + if r == '\n' || r == '\r' { + return ' ' + } + return r + }, s) +} + // forwardErrorResponse re-encodes a TablesSource error in the // structured shape the follower's handler can re-emit verbatim. This // is the leader-side counterpart of writeTablesError: every status / @@ -208,7 +234,7 @@ func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipa func forwardErrorResponse(op string, err error) *pb.AdminForwardResponse { switch { case errors.Is(err, ErrTablesForbidden): - return mustForwardJSON(http.StatusForbidden, errorBody{Error: "forbidden", Message: "this endpoint requires a full-access role"}) + return mustForwardJSON(http.StatusForbidden, errorResponse{Error: "forbidden", Message: "this endpoint requires a full-access role"}) case errors.Is(err, ErrTablesNotLeader): // Should never happen on the leader path — the leader // just verified itself — but if a leadership transfer @@ -218,19 +244,19 @@ func forwardErrorResponse(op string, err error) *pb.AdminForwardResponse { // header the leader-direct path emits (Codex P2 on // PR #635 — without this the forwarded 503 would lose // its retry timing). - resp := mustForwardJSON(http.StatusServiceUnavailable, errorBody{Error: "leader_unavailable", Message: "leader stepped down mid-request"}) + resp := mustForwardJSON(http.StatusServiceUnavailable, errorResponse{Error: "leader_unavailable", Message: "leader stepped down mid-request"}) resp.RetryAfterSeconds = 1 return resp case errors.Is(err, ErrTablesNotFound): - return mustForwardJSON(http.StatusNotFound, errorBody{Error: "not_found", Message: "table does not exist"}) + return mustForwardJSON(http.StatusNotFound, errorResponse{Error: "not_found", Message: "table does not exist"}) case errors.Is(err, ErrTablesAlreadyExists): - return mustForwardJSON(http.StatusConflict, errorBody{Error: "already_exists", Message: "table already exists"}) + return mustForwardJSON(http.StatusConflict, errorResponse{Error: "already_exists", Message: "table already exists"}) } var verr *ValidationError if errors.As(err, &verr) { - return mustForwardJSON(http.StatusBadRequest, errorBody{Error: "invalid_request", Message: verr.Error()}) + return mustForwardJSON(http.StatusBadRequest, errorResponse{Error: "invalid_request", Message: verr.Error()}) } - return mustForwardJSON(http.StatusInternalServerError, errorBody{ + return mustForwardJSON(http.StatusInternalServerError, errorResponse{ Error: "dynamo_" + op + "_failed", Message: "failed to " + op + " table; see leader logs", }) @@ -271,15 +297,8 @@ func isStructuredSourceError(err error) bool { return errors.As(err, &verr) } -// errorBody is the shared JSON shape for both the HTTP handler's -// writeJSONError and the forward server's encoded responses. -type errorBody struct { - Error string `json:"error"` - Message string `json:"message,omitempty"` -} - func rejectForward(status int, code, msg string) (*pb.AdminForwardResponse, error) { - return mustForwardJSON(status, errorBody{Error: code, Message: msg}), nil + return mustForwardJSON(status, errorResponse{Error: code, Message: msg}), nil } func mustForwardJSON(status int, body any) *pb.AdminForwardResponse { diff --git a/internal/admin/forward_server_test.go b/internal/admin/forward_server_test.go index b156d094e..72a6bcf72 100644 --- a/internal/admin/forward_server_test.go +++ b/internal/admin/forward_server_test.go @@ -360,6 +360,34 @@ func TestForwardServer_DeleteTable_LeaderSteppedDownReturns503(t *testing.T) { require.Contains(t, string(resp.GetPayload()), "leader_unavailable") } +// TestForwardServer_SanitisesForwardedFromInLog confirms the +// CR/LF stripping pass at the RPC entry point: a malicious +// follower-supplied node id with embedded newlines must not be +// able to split a single audit/error line into two when slog is +// using a text-format handler. Claude review on PR #635. +func TestForwardServer_SanitisesForwardedFromInLog(t *testing.T) { + var buf bytes.Buffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelInfo})) + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{}} + srv := NewForwardServer(src, fullPrincipalRoleStore(), logger) + + resp, err := srv.Forward(context.Background(), &pb.AdminForwardRequest{ + Principal: &pb.AdminPrincipal{AccessKey: "AKIA_FULL", Role: "full"}, + Operation: pb.AdminOperation_ADMIN_OP_CREATE_TABLE, + Payload: mustJSON(t, CreateTableRequest{TableName: "users", PartitionKey: CreateTableAttribute{Name: "id", Type: "S"}}), + ForwardedFrom: "follower\nfake_actor=evil\nrole=full", + }) + require.NoError(t, err) + require.Equal(t, int32(http.StatusCreated), resp.GetStatusCode()) + logged := buf.String() + // Sanitised value (newlines replaced with spaces) must be + // present. + require.Contains(t, logged, "follower fake_actor=evil role=full") + // Raw newline-bearing value must NOT be in the log — that + // would mean the sanitisation did not run. + require.NotContains(t, logged, "follower\nfake_actor=evil") +} + // TestForwardServer_LogsUnexpectedSourceError confirms the leader // emits an error log line for non-sentinel source failures so // operators can investigate forwarded write 500s without