diff --git a/adapter/dynamodb_admin.go b/adapter/dynamodb_admin.go new file mode 100644 index 000000000..efb730cd7 --- /dev/null +++ b/adapter/dynamodb_admin.go @@ -0,0 +1,92 @@ +package adapter + +import ( + "context" + "sort" +) + +// AdminTableSummary is the table-level information the admin dashboard +// surfaces for a single Dynamo-compatible table. It deliberately +// projects only the fields the dashboard needs so the package's +// wire-format types (dynamoTableSchema and friends) stay internal. +type AdminTableSummary struct { + Name string + PartitionKey string + SortKey string + Generation uint64 + GlobalSecondaryIndexes []AdminGSISummary +} + +// AdminGSISummary mirrors AdminTableSummary for a single GSI. +type AdminGSISummary struct { + Name string + PartitionKey string + SortKey string + ProjectionType string +} + +// AdminListTables returns every Dynamo-style table this server knows +// about, in the lexicographic order the metadata index produces. +// Intended for the in-process admin listener as the SigV4-free +// counterpart to the listTables HTTP handler; both share the same +// underlying lookup so the two views cannot drift. +func (d *DynamoDBServer) AdminListTables(ctx context.Context) ([]string, error) { + return d.listTableNames(ctx) +} + +// AdminDescribeTable returns a schema snapshot for name. The triple +// (result, present, error) lets admin callers distinguish a genuine +// "not found" from a storage error without sniffing sentinels: when +// the table is missing the function returns (nil, false, nil). +// +// Unlike the SigV4 describeTable handler, AdminDescribeTable does +// NOT invoke ensureLegacyTableMigration. The admin dashboard is a +// strictly read-only surface (Gemini medium review on PR #633), so +// triggering Raft-coordinated key-encoding migrations as a side +// effect of routine polling would (a) violate the read-only +// contract and (b) cause every dashboard refresh to write to the +// cluster. Migration still runs lazily on the next SigV4 read or +// write of the same table — the schema we return here is just a +// snapshot for display, not a guarantee that the table is +// up-to-date for serving. +func (d *DynamoDBServer) AdminDescribeTable(ctx context.Context, name string) (*AdminTableSummary, bool, error) { + schema, exists, err := d.loadTableSchema(ctx, name) + if err != nil { + return nil, false, err + } + if !exists { + return nil, false, nil + } + return summaryFromSchema(schema), true, nil +} + +func summaryFromSchema(s *dynamoTableSchema) *AdminTableSummary { + out := &AdminTableSummary{ + Name: s.TableName, + PartitionKey: s.PrimaryKey.HashKey, + SortKey: s.PrimaryKey.RangeKey, + Generation: s.Generation, + } + if len(s.GlobalSecondaryIndexes) == 0 { + return out + } + names := make([]string, 0, len(s.GlobalSecondaryIndexes)) + for n := range s.GlobalSecondaryIndexes { + names = append(names, n) + } + // Sort so the JSON the admin handler emits is deterministic; map + // iteration order would otherwise produce an unstable output that + // breaks both UI diffing and snapshot tests. + sort.Strings(names) + out.GlobalSecondaryIndexes = make([]AdminGSISummary, 0, len(names)) + for _, name := range names { + gsi := s.GlobalSecondaryIndexes[name] + out.GlobalSecondaryIndexes = append(out.GlobalSecondaryIndexes, AdminGSISummary{ + Name: name, + PartitionKey: gsi.KeySchema.HashKey, + SortKey: gsi.KeySchema.RangeKey, + ProjectionType: gsi.Projection.ProjectionType, + }) + } + return out +} diff --git a/adapter/dynamodb_admin_test.go b/adapter/dynamodb_admin_test.go new file mode 100644 index 000000000..126d125b0 --- /dev/null +++ b/adapter/dynamodb_admin_test.go @@ -0,0 +1,189 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbTypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/require" +) + +// TestDynamoDB_AdminListTables_Empty exercises the SigV4-bypass admin +// entrypoint on a server that has no Dynamo tables. The expected shape +// is an empty (non-nil) slice so the admin JSON response stays a valid +// array rather than `null`, matching the design doc 4.3 contract. +func TestDynamoDB_AdminListTables_Empty(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + got, err := nodes[0].dynamoServer.AdminListTables(context.Background()) + require.NoError(t, err) + require.Empty(t, got) +} + +// TestDynamoDB_AdminListTables_Sorted verifies that the admin entrypoint +// returns table names in lexicographic order, matching the listTables +// HTTP handler so the two admin views (SigV4 and bypass) cannot drift. +func TestDynamoDB_AdminListTables_Sorted(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + client := newDynamoClient(t, nodes[0].dynamoAddress) + ctx := context.Background() + + for _, name := range []string{"zeta", "alpha", "mu"} { + _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{ + TableName: aws.String(name), + BillingMode: ddbTypes.BillingModePayPerRequest, + AttributeDefinitions: []ddbTypes.AttributeDefinition{ + {AttributeName: aws.String("pk"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + }, + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("pk"), KeyType: ddbTypes.KeyTypeHash}, + }, + }) + require.NoError(t, err) + } + + got, err := nodes[0].dynamoServer.AdminListTables(ctx) + require.NoError(t, err) + require.Equal(t, []string{"alpha", "mu", "zeta"}, got) +} + +// TestDynamoDB_AdminDescribeTable_Missing checks the (nil, false, nil) +// "not found" contract — admin callers must be able to tell a missing +// table apart from a storage error without sniffing sentinels. +func TestDynamoDB_AdminDescribeTable_Missing(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + summary, exists, err := nodes[0].dynamoServer.AdminDescribeTable(context.Background(), "absent") + require.NoError(t, err) + require.False(t, exists) + require.Nil(t, summary) +} + +// TestDynamoDB_AdminDescribeTable_Composite covers the simple-key happy +// path: a table with hash + range key and no GSIs. The admin summary +// must mirror the schema's primary key fields exactly. +func TestDynamoDB_AdminDescribeTable_Composite(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + client := newDynamoClient(t, nodes[0].dynamoAddress) + ctx := context.Background() + + _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{ + TableName: aws.String("orders"), + BillingMode: ddbTypes.BillingModePayPerRequest, + AttributeDefinitions: []ddbTypes.AttributeDefinition{ + {AttributeName: aws.String("customer"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("orderID"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + }, + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("customer"), KeyType: ddbTypes.KeyTypeHash}, + {AttributeName: aws.String("orderID"), KeyType: ddbTypes.KeyTypeRange}, + }, + }) + require.NoError(t, err) + + summary, exists, err := nodes[0].dynamoServer.AdminDescribeTable(ctx, "orders") + require.NoError(t, err) + require.True(t, exists) + require.NotNil(t, summary) + require.Equal(t, "orders", summary.Name) + require.Equal(t, "customer", summary.PartitionKey) + require.Equal(t, "orderID", summary.SortKey) + require.NotZero(t, summary.Generation) + require.Empty(t, summary.GlobalSecondaryIndexes) +} + +// TestDynamoDB_AdminDescribeTable_GSI_SortedDeterministic exercises the +// GSI projection path. Two indexes are added in deliberately reversed +// alphabetical order to confirm summaryFromSchema's Sort.Strings call — +// without it, map iteration order would produce a flaky output. +func TestDynamoDB_AdminDescribeTable_GSI_SortedDeterministic(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + client := newDynamoClient(t, nodes[0].dynamoAddress) + ctx := context.Background() + + _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{ + TableName: aws.String("threads"), + BillingMode: ddbTypes.BillingModePayPerRequest, + AttributeDefinitions: []ddbTypes.AttributeDefinition{ + {AttributeName: aws.String("threadId"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("status"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("owner"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + {AttributeName: aws.String("createdAt"), AttributeType: ddbTypes.ScalarAttributeTypeS}, + }, + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("threadId"), KeyType: ddbTypes.KeyTypeHash}, + }, + GlobalSecondaryIndexes: []ddbTypes.GlobalSecondaryIndex{ + { + IndexName: aws.String("zStatusIndex"), + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("status"), KeyType: ddbTypes.KeyTypeHash}, + {AttributeName: aws.String("createdAt"), KeyType: ddbTypes.KeyTypeRange}, + }, + Projection: &ddbTypes.Projection{ProjectionType: ddbTypes.ProjectionTypeAll}, + }, + { + IndexName: aws.String("aOwnerIndex"), + KeySchema: []ddbTypes.KeySchemaElement{ + {AttributeName: aws.String("owner"), KeyType: ddbTypes.KeyTypeHash}, + }, + Projection: &ddbTypes.Projection{ProjectionType: ddbTypes.ProjectionTypeKeysOnly}, + }, + }, + }) + require.NoError(t, err) + + summary, exists, err := nodes[0].dynamoServer.AdminDescribeTable(ctx, "threads") + require.NoError(t, err) + require.True(t, exists) + require.NotNil(t, summary) + require.Equal(t, "threadId", summary.PartitionKey) + require.Empty(t, summary.SortKey) + + require.Len(t, summary.GlobalSecondaryIndexes, 2) + // Names sorted lexicographically: "aOwnerIndex" < "zStatusIndex". + require.Equal(t, "aOwnerIndex", summary.GlobalSecondaryIndexes[0].Name) + require.Equal(t, "owner", summary.GlobalSecondaryIndexes[0].PartitionKey) + require.Empty(t, summary.GlobalSecondaryIndexes[0].SortKey) + require.Equal(t, string(ddbTypes.ProjectionTypeKeysOnly), summary.GlobalSecondaryIndexes[0].ProjectionType) + + require.Equal(t, "zStatusIndex", summary.GlobalSecondaryIndexes[1].Name) + require.Equal(t, "status", summary.GlobalSecondaryIndexes[1].PartitionKey) + require.Equal(t, "createdAt", summary.GlobalSecondaryIndexes[1].SortKey) + require.Equal(t, string(ddbTypes.ProjectionTypeAll), summary.GlobalSecondaryIndexes[1].ProjectionType) +} + +func newDynamoClient(t *testing.T, address string) *dynamodb.Client { + t.Helper() + // Region is intentionally arbitrary here. The test DynamoDB + // server does not enforce a region match in its SigV4 path — + // every existing adapter test uses "us-west-2" as a placeholder + // for the same reason. If the server later starts requiring a + // specific region, source it from the same constant the server + // reads instead of hardcoding it on each side independently. + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-west-2"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")), + ) + require.NoError(t, err) + return dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://" + address) + }) +} diff --git a/internal/admin/dynamo_handler.go b/internal/admin/dynamo_handler.go new file mode 100644 index 000000000..9d1fe19a3 --- /dev/null +++ b/internal/admin/dynamo_handler.go @@ -0,0 +1,311 @@ +package admin + +import ( + "context" + "encoding/base64" + "errors" + "log/slog" + "net/http" + "sort" + "strconv" + "strings" + + "github.com/goccy/go-json" +) + +// Pagination knobs for the read-only Dynamo table list endpoint. +// +// defaultDynamoListLimit matches the design doc Section 4.3 default +// (100). dynamoListLimitMax is the hard ceiling; oversized client +// requests are clamped silently rather than rejected so the SPA can +// pass through an opaque "max" without a round-trip on validation. +const ( + defaultDynamoListLimit = 100 + dynamoListLimitMax = 1000 +) + +// pathPrefixDynamoTables is the URL prefix the dynamo handler owns. +// "" + suffix "/tables" produces /admin/api/v1/dynamo/tables; the +// trailing slash variant routes to the per-table sub-handler. +const ( + pathDynamoTables = "/admin/api/v1/dynamo/tables" + pathPrefixDynamoTables = pathDynamoTables + "/" +) + +// DynamoTableSummary is the JSON shape the admin dashboard consumes. +// Defined in the admin package — rather than reusing the adapter's +// AdminTableSummary directly — so the admin HTTP layer does not pull +// in the heavyweight adapter dependency tree (gRPC, Raft, etc.) and +// remains testable in isolation. main_admin.go translates between +// adapter.AdminTableSummary and this type. +type DynamoTableSummary struct { + Name string `json:"name"` + PartitionKey string `json:"partition_key"` + SortKey string `json:"sort_key,omitempty"` + Generation uint64 `json:"generation"` + GlobalSecondaryIndexes []DynamoGSISummary `json:"global_secondary_indexes,omitempty"` +} + +// DynamoGSISummary mirrors DynamoTableSummary for a single GSI. +type DynamoGSISummary struct { + Name string `json:"name"` + PartitionKey string `json:"partition_key"` + SortKey string `json:"sort_key,omitempty"` + ProjectionType string `json:"projection_type"` +} + +// TablesSource is the contract the dynamo handler depends on. Wired in +// production to *adapter.DynamoDBServer via a small bridge in +// main_admin.go; tests use a stub. +// +// AdminDescribeTable returns (nil, false, nil) for a missing table so +// callers can distinguish "not found" from a storage error without +// sniffing sentinels. This mirrors the adapter signature exactly so +// the bridge remains a thin pass-through. +type TablesSource interface { + AdminListTables(ctx context.Context) ([]string, error) + AdminDescribeTable(ctx context.Context, name string) (*DynamoTableSummary, bool, error) +} + +// DynamoHandler serves /admin/api/v1/dynamo/tables and +// /admin/api/v1/dynamo/tables/{name}. Only GET is supported here — +// table creation and deletion live behind the protected write chain +// in a follow-up handler. +type DynamoHandler struct { + source TablesSource + logger *slog.Logger +} + +// NewDynamoHandler binds the source and seeds logging with +// slog.Default(). Use WithLogger to attach a tagged logger. +func NewDynamoHandler(source TablesSource) *DynamoHandler { + return &DynamoHandler{source: source, logger: slog.Default()} +} + +// WithLogger overrides the default slog destination. +func (h *DynamoHandler) WithLogger(l *slog.Logger) *DynamoHandler { + if l == nil { + return h + } + h.logger = l + return h +} + +// ServeHTTP routes /tables and /tables/{name}. We do not use +// http.ServeMux because the admin router already guards the +// /admin/api/v1/* prefix — adding another mux here would just +// duplicate the path-parsing logic. +func (h *DynamoHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET is implemented") + return + } + switch { + case r.URL.Path == pathDynamoTables: + h.handleList(w, r) + case strings.HasPrefix(r.URL.Path, pathPrefixDynamoTables): + name := strings.TrimPrefix(r.URL.Path, pathPrefixDynamoTables) + h.handleDescribe(w, r, name) + default: + writeJSONError(w, http.StatusNotFound, "not_found", "") + } +} + +// dynamoListResponse is the JSON shape returned by GET /tables. +// NextToken is omitted when there is no further page so the client +// can use a presence check rather than parsing an empty string. +type dynamoListResponse struct { + Tables []string `json:"tables"` + NextToken string `json:"next_token,omitempty"` +} + +func (h *DynamoHandler) handleList(w http.ResponseWriter, r *http.Request) { + limit, err := parseDynamoListLimit(r.URL.Query().Get("limit")) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_limit", err.Error()) + return + } + startAfter, err := decodeDynamoNextToken(r.URL.Query().Get("next_token")) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_next_token", err.Error()) + return + } + + // AdminListTables returns the full lex-sorted name list that + // the adapter's metadata prefix scan produces; we then slice + // to the requested page. The adapter's listTableNames already + // materialises the same list for the SigV4 listTables path + // (adapter/dynamodb.go:1146), which has been in production + // since DynamoDB-compat shipped — admin's memory profile is + // strictly the SigV4 path's, not a regression on top of it. + // + // Worst-case bound: a Dynamo table name caps at 255 bytes, so + // 1k tables ≈ 256 KiB and 10k tables ≈ 2.5 MiB of name + // strings on the heap during a single list call. That is well + // inside the per-request budget the admin listener targets. + // Beyond that scale the right fix is to teach the adapter to + // stream the metadata scan via a callback (and plumb it + // through here), not to bolt a streaming layer on top of the + // already-materialised slice. Tracked separately; this + // endpoint is not the limiting factor. + names, err := h.source.AdminListTables(r.Context()) + if err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin dynamo list tables failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "dynamo_list_failed", + "failed to list tables; see server logs") + return + } + + page, next := paginateDynamoTableNames(names, startAfter, limit) + resp := dynamoListResponse{Tables: page} + if next != "" { + resp.NextToken = encodeDynamoNextToken(next) + } + // paginateDynamoTableNames is total over its input — it always + // returns a non-nil slice (an empty []string{} on the + // "cursor past end" branch, a real sub-slice otherwise) so the + // JSON shape is always `"tables": []` rather than `null` even + // without an explicit nil-check here. The Tables array + // contract is enforced at the producer. + writeAdminJSON(w, r.Context(), h.logger, resp) +} + +func (h *DynamoHandler) handleDescribe(w http.ResponseWriter, r *http.Request, name string) { + if name == "" || strings.ContainsRune(name, '/') { + writeJSONError(w, http.StatusNotFound, "not_found", "") + return + } + summary, exists, err := h.source.AdminDescribeTable(r.Context(), name) + if err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin dynamo describe table failed", + slog.String("table", name), + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "dynamo_describe_failed", + "failed to describe table; see server logs") + return + } + if !exists { + writeJSONError(w, http.StatusNotFound, "not_found", "table does not exist") + return + } + writeAdminJSON(w, r.Context(), h.logger, summary) +} + +// parseDynamoListLimit translates the ?limit= query parameter into a +// concrete page size. Empty falls back to the design-doc default; +// negatives or non-numerics are an outright client error; values past +// the ceiling are silently clamped (not an error) so the SPA's +// "request the maximum" pattern works without a probe round-trip. +func parseDynamoListLimit(raw string) (int, error) { + if raw == "" { + return defaultDynamoListLimit, nil + } + n, err := strconv.Atoi(raw) + if err != nil { + return 0, errors.New("limit must be an integer") + } + if n <= 0 { + return 0, errors.New("limit must be positive") + } + if n > dynamoListLimitMax { + return dynamoListLimitMax, nil + } + return n, nil +} + +// decodeDynamoNextToken reverses encodeDynamoNextToken. We base64-wrap +// the raw last-table-name so the wire token is opaque from the +// client's perspective and we can change the cursor representation +// later without breaking the API contract. +func decodeDynamoNextToken(raw string) (string, error) { + if raw == "" { + return "", nil + } + decoded, err := base64.RawURLEncoding.DecodeString(raw) + if err != nil { + return "", errors.New("next_token is not valid base64url") + } + return string(decoded), nil +} + +func encodeDynamoNextToken(name string) string { + return base64.RawURLEncoding.EncodeToString([]byte(name)) +} + +// paginateDynamoTableNames slices `names` (already lex-sorted by the +// adapter) into a single page starting strictly after `startAfter`. +// The second return is the opaque cursor the client should pass back +// for the next call, or "" if this is the last page. +func paginateDynamoTableNames(names []string, startAfter string, limit int) ([]string, string) { + start := 0 + if startAfter != "" { + // sort.SearchStrings returns the first index >= startAfter; + // adding 1 only when the entry equals startAfter gives us + // "strictly after" semantics. A startAfter that no longer + // exists in the sorted list still produces a sane resume + // (we pick up at the first name greater than the cursor). + idx := sort.SearchStrings(names, startAfter) + switch { + case idx >= len(names): + return []string{}, "" + case names[idx] == startAfter: + start = idx + 1 + default: + start = idx + } + } + end := start + limit + if end > len(names) { + end = len(names) + } + page := names[start:end] + if end < len(names) && len(page) > 0 { + return page, page[len(page)-1] + } + return page, "" +} + +// writeAdminJSON marshals `body` to a buffer first, *then* writes +// status + body — never streaming an encoder directly to the +// ResponseWriter. The streaming form would commit a 200 header and +// then truncate mid-body if json.Marshal failed on a value deep in +// the struct (an unsupported type, a Marshaler returning an error, +// etc.), leaving a malformed JSON object on the wire that the SPA +// has no way to recover from. Marshalling first lets us upgrade the +// encode failure to a 500 with a well-formed error envelope. +func writeAdminJSON(w http.ResponseWriter, ctx context.Context, logger *slog.Logger, body any) { + payload, err := json.Marshal(body) + if err != nil { + if logger == nil { + logger = slog.Default() + } + logger.LogAttrs(ctx, slog.LevelError, "admin response marshal failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "internal", "failed to encode response") + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + // Defence-in-depth: tell the browser not to MIME-sniff the + // response body. The admin surface is JSON-only, so a sniffed + // "this might be HTML" guess is never useful and could enable + // XSS-via-sniffing on a hostile payload that somehow reached + // here. Cookie-gated admin endpoints + a single static + // Content-Type make this cheap and standard. + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + if _, werr := w.Write(payload); werr != nil { + // Status is already on the wire, so we can only log. Write + // failures here usually mean the client closed the connection. + if logger == nil { + logger = slog.Default() + } + logger.LogAttrs(ctx, slog.LevelWarn, "admin response write failed", + slog.String("error", werr.Error()), + ) + } +} diff --git a/internal/admin/dynamo_handler_test.go b/internal/admin/dynamo_handler_test.go new file mode 100644 index 000000000..2b3981848 --- /dev/null +++ b/internal/admin/dynamo_handler_test.go @@ -0,0 +1,327 @@ +package admin + +import ( + "context" + "encoding/base64" + "errors" + "net/http" + "net/http/httptest" + "sort" + "strings" + "testing" + + "github.com/goccy/go-json" + "github.com/stretchr/testify/require" +) + +// stubTablesSource is the in-memory test double the dynamo handler +// tests use. AdminListTables returns names in lex order, matching +// the adapter's contract. +type stubTablesSource struct { + tables map[string]*DynamoTableSummary + listErr error + descErr error +} + +func (s *stubTablesSource) AdminListTables(_ context.Context) ([]string, error) { + if s.listErr != nil { + return nil, s.listErr + } + out := make([]string, 0, len(s.tables)) + for k := range s.tables { + out = append(out, k) + } + sort.Strings(out) + return out, nil +} + +func (s *stubTablesSource) AdminDescribeTable(_ context.Context, name string) (*DynamoTableSummary, bool, error) { + if s.descErr != nil { + return nil, false, s.descErr + } + t, ok := s.tables[name] + if !ok { + return nil, false, nil + } + return t, true, nil +} + +func newDynamoHandlerForTest(src TablesSource) *DynamoHandler { + return NewDynamoHandler(src) +} + +func TestDynamoHandler_ListTables_EmptyArrayNotNull(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Contains(t, rec.Body.String(), `"tables":[]`) + require.NotContains(t, rec.Body.String(), `"next_token"`) +} + +func TestDynamoHandler_ListTables_DefaultLimitAppliesAt100(t *testing.T) { + tables := make(map[string]*DynamoTableSummary, 250) + for i := 0; i < 250; i++ { + name := tableNameForIndex(i) + tables[name] = &DynamoTableSummary{Name: name} + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.Len(t, resp.Tables, 100) + require.NotEmpty(t, resp.NextToken) + + // next_token must round-trip through base64url back to the last + // page entry — opaque to the client but stable enough that the + // SPA's "next page" call resumes deterministically. + decoded, err := base64.RawURLEncoding.DecodeString(resp.NextToken) + require.NoError(t, err) + require.Equal(t, resp.Tables[len(resp.Tables)-1], string(decoded)) +} + +func TestDynamoHandler_ListTables_LimitClampedToMax(t *testing.T) { + tables := make(map[string]*DynamoTableSummary, 1500) + for i := 0; i < 1500; i++ { + name := tableNameForIndex(i) + tables[name] = &DynamoTableSummary{Name: name} + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=99999", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.Len(t, resp.Tables, dynamoListLimitMax) +} + +func TestDynamoHandler_ListTables_PaginationResumesAfterCursor(t *testing.T) { + tables := map[string]*DynamoTableSummary{ + "alpha": {Name: "alpha"}, + "bravo": {Name: "bravo"}, + "charlie": {Name: "charlie"}, + "delta": {Name: "delta"}, + "echo": {Name: "echo"}, + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + + // First page: limit=2 → ["alpha", "bravo"], next_token=base64("bravo"). + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=2", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + var page1 dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &page1)) + require.Equal(t, []string{"alpha", "bravo"}, page1.Tables) + require.NotEmpty(t, page1.NextToken) + + // Second page: forward the opaque token verbatim — contract + // must not require the client to URL-escape it again. + req2 := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=2&next_token="+page1.NextToken, nil) + rec2 := httptest.NewRecorder() + h.ServeHTTP(rec2, req2) + var page2 dynamoListResponse + require.NoError(t, json.Unmarshal(rec2.Body.Bytes(), &page2)) + require.Equal(t, []string{"charlie", "delta"}, page2.Tables) + require.NotEmpty(t, page2.NextToken) + + // Third page exhausts the list and must omit next_token. + req3 := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=2&next_token="+page2.NextToken, nil) + rec3 := httptest.NewRecorder() + h.ServeHTTP(rec3, req3) + var page3 dynamoListResponse + require.NoError(t, json.Unmarshal(rec3.Body.Bytes(), &page3)) + require.Equal(t, []string{"echo"}, page3.Tables) + require.Empty(t, page3.NextToken) +} + +func TestDynamoHandler_ListTables_NextTokenForVanishedNameFastForwards(t *testing.T) { + // A cursor for a name that was deleted between pages must resume + // at the next surviving entry, not silently swallow the page. + tables := map[string]*DynamoTableSummary{ + "alpha": {Name: "alpha"}, + "delta": {Name: "delta"}, + } + h := newDynamoHandlerForTest(&stubTablesSource{tables: tables}) + + cursor := base64.RawURLEncoding.EncodeToString([]byte("bravo")) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit=10&next_token="+cursor, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + var resp dynamoListResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.Equal(t, []string{"delta"}, resp.Tables) + require.Empty(t, resp.NextToken) +} + +func TestDynamoHandler_ListTables_RejectsBadLimit(t *testing.T) { + cases := []struct { + raw string + expect string + }{ + {"abc", "invalid_limit"}, + {"-5", "invalid_limit"}, + {"0", "invalid_limit"}, + } + for _, tc := range cases { + t.Run(tc.raw, func(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?limit="+tc.raw, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), tc.expect) + }) + } +} + +func TestDynamoHandler_ListTables_RejectsBadNextToken(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"?next_token=!!!not-base64!!!", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "invalid_next_token") +} + +func TestDynamoHandler_ListTables_SourceErrorIsHidden(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{listErr: errors.New("kv backing sentinel ZZZ-471")}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Contains(t, rec.Body.String(), "dynamo_list_failed") + require.NotContains(t, rec.Body.String(), "ZZZ-471") + require.NotContains(t, rec.Body.String(), "kv backing sentinel") +} + +func TestDynamoHandler_DescribeTable_HappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": { + Name: "orders", + PartitionKey: "customer", + SortKey: "orderID", + Generation: 42, + GlobalSecondaryIndexes: []DynamoGSISummary{ + {Name: "by-status", PartitionKey: "status", SortKey: "createdAt", ProjectionType: "ALL"}, + }, + }, + }} + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/orders", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var got DynamoTableSummary + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got)) + require.Equal(t, "orders", got.Name) + require.Equal(t, "customer", got.PartitionKey) + require.Equal(t, "orderID", got.SortKey) + require.EqualValues(t, 42, got.Generation) + require.Len(t, got.GlobalSecondaryIndexes, 1) + require.Equal(t, "by-status", got.GlobalSecondaryIndexes[0].Name) + require.Equal(t, "ALL", got.GlobalSecondaryIndexes[0].ProjectionType) +} + +func TestDynamoHandler_DescribeTable_MissingReturns404(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/absent", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusNotFound, rec.Code) + require.Contains(t, rec.Body.String(), "not_found") +} + +func TestDynamoHandler_DescribeTable_RejectsSlashInName(t *testing.T) { + // /admin/api/v1/dynamo/tables/foo/bar must not let the handler + // call AdminDescribeTable with a "/"-bearing name. Returning 404 + // is preferable to 400 here because the URL itself is the only + // way to express the table name; an embedded "/" simply does not + // route to a real table. + src := &stubTablesSource{ + descErr: errors.New("must not be invoked with slash-bearing name"), + } + h := newDynamoHandlerForTest(src) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/foo/bar", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +func TestDynamoHandler_DescribeTable_SourceErrorIsHidden(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{descErr: errors.New("storage sentinel QQ-808")}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/orders", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusInternalServerError, rec.Code) + require.Contains(t, rec.Body.String(), "dynamo_describe_failed") + require.NotContains(t, rec.Body.String(), "QQ-808") +} + +func TestDynamoHandler_OnlyGET(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + for _, m := range []string{http.MethodPost, http.MethodPut, http.MethodDelete, http.MethodPatch} { + req := httptest.NewRequest(m, pathDynamoTables, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusMethodNotAllowed, rec.Code, "method %s", m) + } +} + +func TestDynamoHandler_UnknownSubpathReturns404(t *testing.T) { + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + // /admin/api/v1/dynamo/something-else falls outside the prefix the + // handler owns; the handler must answer 404 so the admin router's + // "API took it" prefix routing stays correct. + req := httptest.NewRequest(http.MethodGet, "/admin/api/v1/dynamo/things", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +func TestDynamoHandler_DescribeTable_TrailingSlashIsRejected(t *testing.T) { + // /admin/api/v1/dynamo/tables/ with an empty trailing component + // would otherwise pass an empty name down to the source. + h := newDynamoHandlerForTest(&stubTablesSource{tables: map[string]*DynamoTableSummary{}}) + req := httptest.NewRequest(http.MethodGet, pathDynamoTables+"/", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +// tableNameForIndex generates lex-sortable monotonically increasing +// names so list-pagination tests can assert deterministic ordering. +func tableNameForIndex(i int) string { + const width = 4 + digits := []byte("0000") + for k := width - 1; k >= 0 && i > 0; k-- { + digits[k] = byte('0' + i%10) + i /= 10 + } + return "tbl-" + string(digits) +} + +// Sanity check on the helper itself so the pagination assertions +// have a stable backing dataset. +func TestTableNameForIndex_LexSortable(t *testing.T) { + prev := "" + for i := 0; i < 20; i++ { + cur := tableNameForIndex(i) + if prev != "" { + require.True(t, strings.Compare(prev, cur) < 0, "non-monotonic at %d: %s !< %s", i, prev, cur) + } + prev = cur + } +} diff --git a/internal/admin/router.go b/internal/admin/router.go index 658d41f45..8b5a5b3c9 100644 --- a/internal/admin/router.go +++ b/internal/admin/router.go @@ -277,6 +277,11 @@ func writeJSONNotFound(w http.ResponseWriter, _ *http.Request) { func writeJSONError(w http.ResponseWriter, status int, code, msg string) { w.Header().Set("Content-Type", "application/json; charset=utf-8") + // Defence-in-depth header: the admin surface is JSON-only, so + // declare nosniff to prevent a misbehaving browser from + // content-sniffing an error body into something executable. + // Cheap and standard for cookie-gated admin endpoints. + w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(errorResponse{Error: code, Message: msg}) diff --git a/internal/admin/server.go b/internal/admin/server.go index 93b23c78e..132ce6b4c 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -5,6 +5,7 @@ import ( "log/slog" "net/http" "reflect" + "strings" ) // ServerDeps bundles the collaborators the admin HTTP surface needs. All @@ -31,6 +32,12 @@ type ServerDeps struct { // ClusterInfo describes the local node's Raft state. ClusterInfo ClusterInfoSource + // Tables is the read-only DynamoDB admin source. Optional: a nil + // value disables /admin/api/v1/dynamo/tables{,/{name}} (the mux + // answers them with 404). This lets a build that ships only the + // cluster page deploy without standing up the dynamo bridge. + Tables TablesSource + // StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be // nil during early development; the router renders 404 for // /admin/assets/* and the SPA fallback in that case. @@ -92,7 +99,11 @@ func NewServer(deps ServerDeps) (*Server, error) { } auth := NewAuthService(deps.Signer, deps.Credentials, deps.Roles, authOpts) cluster := NewClusterHandler(deps.ClusterInfo).WithLogger(logger) - mux := buildAPIMux(auth, deps.Verifier, cluster, logger) + var dynamo http.Handler + if deps.Tables != nil { + dynamo = NewDynamoHandler(deps.Tables).WithLogger(logger) + } + mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, logger) router := NewRouter(mux, deps.StaticFS) return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil } @@ -119,15 +130,21 @@ func (s *Server) APIHandler() http.Handler { // // Layout: // -// POST /admin/api/v1/auth/login (no auth, rate-limited) -// POST /admin/api/v1/auth/logout (no auth required) -// GET /admin/api/v1/cluster (auth required) +// POST /admin/api/v1/auth/login (no auth, rate-limited) +// POST /admin/api/v1/auth/logout (auth required) +// GET /admin/api/v1/cluster (auth required) +// GET /admin/api/v1/dynamo/tables (auth required, read-only) +// GET /admin/api/v1/dynamo/tables/{name} (auth required, read-only) // // Body limit applies uniformly. CSRF and Audit middleware apply to // write-capable protected endpoints; login and logout carry their own // audit path inside AuthService because the generic Audit middleware // cannot see the claimed actor at that point in the chain. -func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler http.Handler, logger *slog.Logger) http.Handler { +// +// dynamoHandler may be nil; in that case the dynamo paths fall through +// to the unknown-endpoint 404, matching the behaviour of any other +// unregistered admin path. +func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler http.Handler, logger *slog.Logger) http.Handler { loginHandler := http.HandlerFunc(auth.HandleLogin) logoutHandler := http.HandlerFunc(auth.HandleLogout) @@ -177,15 +194,27 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler http.Hand loginChain := publicAuth(loginHandler) logoutChain := protectNoAudit(logoutHandler) clusterChain := protect(clusterHandler) + // Read-only endpoints share the protect chain so a missing + // session or CSRF token still 401s/403s the same way as a write. + // The Audit middleware is a no-op for GET (it only logs state- + // changing methods) so we get the consistent guards without the + // noise of an audit line per dashboard poll. + var dynamoChain http.Handler + if dynamoHandler != nil { + dynamoChain = protect(dynamoHandler) + } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/admin/api/v1/auth/login": + switch { + case r.URL.Path == "/admin/api/v1/auth/login": loginChain.ServeHTTP(w, r) - case "/admin/api/v1/auth/logout": + case r.URL.Path == "/admin/api/v1/auth/logout": logoutChain.ServeHTTP(w, r) - case "/admin/api/v1/cluster": + case r.URL.Path == "/admin/api/v1/cluster": clusterChain.ServeHTTP(w, r) + case dynamoChain != nil && (r.URL.Path == pathDynamoTables || + strings.HasPrefix(r.URL.Path, pathPrefixDynamoTables)): + dynamoChain.ServeHTTP(w, r) default: writeJSONError(w, http.StatusNotFound, "unknown_endpoint", "no admin API handler is registered for this path") diff --git a/internal/admin/server_test.go b/internal/admin/server_test.go index 59344e0a5..ef6bbbdc0 100644 --- a/internal/admin/server_test.go +++ b/internal/admin/server_test.go @@ -3,6 +3,7 @@ package admin import ( "bytes" "context" + "io" "log/slog" "net/http" "net/http/httptest" @@ -169,6 +170,163 @@ func TestServer_APIHandlerReturnsBareMux(t *testing.T) { require.Equal(t, http.StatusNotFound, rec.Code) } +// newServerWithTablesForTest mirrors newServerForTest but also wires +// in a stub TablesSource so the dynamo paths are reachable. The same +// test setup pattern (single fixed clock, two access keys, JSON +// logger) keeps the assertion surface compact. +func newServerWithTablesForTest(t *testing.T, src TablesSource) *Server { + t.Helper() + clk := fixedClock(time.Unix(1_700_000_000, 0).UTC()) + signer := newSignerForTest(t, 1, clk) + verifier := newVerifierForTest(t, []byte{1}, clk) + creds := MapCredentialStore{ + "AKIA_ADMIN": "ADMIN_SECRET", + "AKIA_RO": "RO_SECRET", + } + roles := map[string]Role{ + "AKIA_ADMIN": RoleFull, + "AKIA_RO": RoleReadOnly, + } + cluster := ClusterInfoFunc(func(_ context.Context) (ClusterInfo, error) { + return ClusterInfo{NodeID: "node-1", Version: "0.1.0"}, nil + }) + buf := &bytes.Buffer{} + logger := slog.New(slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelInfo})) + srv, err := NewServer(ServerDeps{ + Signer: signer, + Verifier: verifier, + Credentials: creds, + Roles: roles, + ClusterInfo: cluster, + Tables: src, + AuthOpts: AuthServiceOpts{Clock: clk}, + Logger: logger, + }) + require.NoError(t, err) + return srv +} + +// loginAndCookies completes a successful login and returns the +// session + CSRF cookies the SPA would carry on follow-up requests. +// Tests that exercise protected GET endpoints reuse this helper to +// avoid copy-pasting the login dance everywhere. +func loginAndCookies(t *testing.T, ts *httptest.Server) []*http.Cookie { + t.Helper() + body, _ := json.Marshal(loginRequest{AccessKey: "AKIA_RO", SecretKey: "RO_SECRET"}) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + ts.URL+"/admin/api/v1/auth/login", strings.NewReader(string(body))) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + cookies := resp.Cookies() + _ = resp.Body.Close() + return cookies +} + +func TestServer_DynamoTables_RequiresSession(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": {Name: "orders", PartitionKey: "id"}, + }} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables", nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusUnauthorized, resp.StatusCode) + _ = resp.Body.Close() +} + +func TestServer_DynamoTables_ReadOnlyRoleAllowed(t *testing.T) { + // Tables list is a GET — the read-only role must succeed without + // any extra opt-in. This guards against accidentally bolting + // RequireWriteRole onto the read chain. + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": {Name: "orders", PartitionKey: "id"}, + "products": {Name: "products", PartitionKey: "sku"}, + }} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAndCookies(t, ts) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables", nil) + require.NoError(t, err) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + var body dynamoListResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) + require.Equal(t, []string{"orders", "products"}, body.Tables) + _ = resp.Body.Close() +} + +func TestServer_DynamoDescribeTable_AuthenticatedHappyPath(t *testing.T) { + src := &stubTablesSource{tables: map[string]*DynamoTableSummary{ + "orders": { + Name: "orders", + PartitionKey: "id", + SortKey: "ts", + Generation: 7, + }, + }} + srv := newServerWithTablesForTest(t, src) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAndCookies(t, ts) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables/orders", nil) + require.NoError(t, err) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + var got DynamoTableSummary + require.NoError(t, json.NewDecoder(resp.Body).Decode(&got)) + require.Equal(t, "orders", got.Name) + require.Equal(t, "id", got.PartitionKey) + require.Equal(t, "ts", got.SortKey) + require.EqualValues(t, 7, got.Generation) + _ = resp.Body.Close() +} + +func TestServer_DynamoTables_NilSourceFallsThroughTo404(t *testing.T) { + // A build that ships only the cluster page (Tables nil) must keep + // the dynamo paths off the wire entirely. The expected response is + // the standard "unknown_endpoint" JSON 404 — same as any other + // unregistered admin URL — so the SPA can detect the feature is + // disabled by HTTP status alone. + srv := newServerForTest(t) // built without Tables + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + cookies := loginAndCookies(t, ts) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + ts.URL+"/admin/api/v1/dynamo/tables", nil) + require.NoError(t, err) + for _, c := range cookies { + req.AddCookie(c) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + require.Contains(t, string(body), "unknown_endpoint") + _ = resp.Body.Close() +} + func TestServer_WriteRejectsMissingCSRF(t *testing.T) { // Login to obtain a session, then hit cluster with POST to trigger // CSRF on what the router normally rejects as method_not_allowed. diff --git a/main.go b/main.go index 85b51e137..f6e38cc89 100644 --- a/main.go +++ b/main.go @@ -688,7 +688,12 @@ func startServers(in serversInput) error { if err := runner.start(); err != nil { return err } - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes); err != nil { + // runner.start() populates runner.dynamoServer for the admin + // listener's SigV4-bypass entrypoints (see adapter/dynamodb_admin.go). + // Passing nil here would leave the admin dashboard with no + // access to table metadata; the admin handler answers + // /admin/api/v1/dynamo/* with 404 in that case. + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil @@ -1037,10 +1042,10 @@ func startRedisServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Gr return nil } -func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator, leaderDynamo map[string]string, metricsRegistry *monitoring.Registry, readTracker *kv.ActiveTimestampTracker) error { +func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator, leaderDynamo map[string]string, metricsRegistry *monitoring.Registry, readTracker *kv.ActiveTimestampTracker) (*adapter.DynamoDBServer, error) { dynamoL, err := lc.Listen(ctx, "tcp", dynamoAddr) if err != nil { - return errors.Wrapf(err, "failed to listen on %s", dynamoAddr) + return nil, errors.Wrapf(err, "failed to listen on %s", dynamoAddr) } dynamoServer := adapter.NewDynamoDBServer( dynamoL, @@ -1067,7 +1072,7 @@ func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup } return errors.WithStack(err) }) - return nil + return dynamoServer, nil } func startPprofServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, pprofAddr string, pprofToken string) error { @@ -1207,9 +1212,17 @@ type runtimeServerRunner struct { pprofAddress string pprofToken string metricsRegistry *monitoring.Registry + + // dynamoServer is populated by start() and made available to + // startAdminFromFlags in this package so the admin listener can + // call SigV4-bypass admin entrypoints (see + // adapter/dynamodb_admin.go) without going through HTTP. The + // field is unexported on purpose — it is package-private state, + // not a public API. Nil until start() reaches the dynamo step. + dynamoServer *adapter.DynamoDBServer } -func (r runtimeServerRunner) start() error { +func (r *runtimeServerRunner) start() error { if err := startRedisServer(r.ctx, r.lc, r.eg, r.redisAddress, r.shardStore, r.coordinate, r.leaderRedis, r.pubsubRelay, r.metricsRegistry, r.readTracker); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } @@ -1230,9 +1243,11 @@ func (r runtimeServerRunner) start() error { ); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - if err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker); err != nil { + dynamoServer, err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.leaderDynamo, r.metricsRegistry, r.readTracker) + if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } + r.dynamoServer = dynamoServer if err := startS3Server(r.ctx, r.lc, r.eg, r.s3Address, r.shardStore, r.coordinate, r.leaderS3, r.s3Region, r.s3CredsFile, r.s3PathStyleOnly, r.readTracker); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_admin.go b/main_admin.go index abd4bb09f..8295bd5eb 100644 --- a/main_admin.go +++ b/main_admin.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/internal/admin" "github.com/bootjp/elastickv/internal/raftengine" "github.com/cockroachdb/errors" @@ -66,7 +67,7 @@ type adminListenerConfig struct { // without touching --s3CredentialsFile: pulling the admin feature into // a hard dependency on that file would break deployments that never // intended to use it. -func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime) error { +func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer) error { if !*adminEnabled { return nil } @@ -106,10 +107,74 @@ func startAdminFromFlags(ctx context.Context, lc *net.ListenConfig, eg *errgroup fullAccessKeys: parseCSV(*adminFullAccessKeys), } clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes) - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, buildVersion()) + tablesSrc := newDynamoTablesSource(dynamoServer) + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, buildVersion()) return err } +// newDynamoTablesSource adapts *adapter.DynamoDBServer to the +// admin.TablesSource interface. The bridge stays in this file (rather +// than internal/admin) so the admin package stays free of the heavy +// adapter-package dependency tree (gRPC, Raft, store). +// +// Returns nil when dynamoServer is nil; admin.NewServer handles a nil +// Tables field by leaving the dynamo paths off the wire entirely, +// which is the right behaviour for builds that ship without the +// Dynamo adapter. +func newDynamoTablesSource(dynamoServer *adapter.DynamoDBServer) admin.TablesSource { + if dynamoServer == nil { + return nil + } + return &dynamoTablesBridge{server: dynamoServer} +} + +// dynamoTablesBridge is the thin adapter that re-shapes the adapter's +// AdminTableSummary DTO into the admin package's DynamoTableSummary. +// The two structs are deliberately isomorphic so this translation +// does no allocation more than necessary; if a future GSI field is +// added on one side, the build breaks here, which is exactly the +// drift signal we want. +type dynamoTablesBridge struct { + server *adapter.DynamoDBServer +} + +func (b *dynamoTablesBridge) AdminListTables(ctx context.Context) ([]string, error) { + return b.server.AdminListTables(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context. +} + +func (b *dynamoTablesBridge) AdminDescribeTable(ctx context.Context, name string) (*admin.DynamoTableSummary, bool, error) { + summary, exists, err := b.server.AdminDescribeTable(ctx, name) + if err != nil { + return nil, false, err //nolint:wrapcheck // adapter wraps internally. + } + if !exists { + return nil, false, nil + } + return convertAdminTableSummary(summary), true, nil +} + +func convertAdminTableSummary(in *adapter.AdminTableSummary) *admin.DynamoTableSummary { + out := &admin.DynamoTableSummary{ + Name: in.Name, + PartitionKey: in.PartitionKey, + SortKey: in.SortKey, + Generation: in.Generation, + } + if len(in.GlobalSecondaryIndexes) == 0 { + return out + } + out.GlobalSecondaryIndexes = make([]admin.DynamoGSISummary, len(in.GlobalSecondaryIndexes)) + for i, g := range in.GlobalSecondaryIndexes { + out.GlobalSecondaryIndexes[i] = admin.DynamoGSISummary{ + Name: g.Name, + PartitionKey: g.PartitionKey, + SortKey: g.SortKey, + ProjectionType: g.ProjectionType, + } + } + return out +} + // buildAdminConfig translates flag values into an admin.Config. func buildAdminConfig(in adminListenerConfig) admin.Config { return admin.Config{ @@ -144,6 +209,7 @@ func startAdminServer( cfg adminListenerConfig, creds map[string]string, cluster admin.ClusterInfoSource, + tables admin.TablesSource, version string, ) (string, error) { adminCfg := buildAdminConfig(cfg) @@ -151,7 +217,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables) if err != nil { return "", err } @@ -191,7 +257,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -210,6 +276,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust Credentials: admin.MapCredentialStore(creds), Roles: adminCfg.RoleIndex(), ClusterInfo: cluster, + Tables: tables, StaticFS: nil, AuthOpts: admin.AuthServiceOpts{ InsecureCookie: adminCfg.AllowInsecureDevCookie, diff --git a/main_admin_test.go b/main_admin_test.go index eb961311c..fc3d21ade 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -196,7 +196,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) defer func() { _ = eg.Wait() }() var lc net.ListenConfig - _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, "") require.NoError(t, err) } @@ -209,7 +209,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) { listen: "127.0.0.1:0", // missing signing key } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") require.Error(t, err) } @@ -222,7 +222,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) { listen: "0.0.0.0:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "TLS") } @@ -236,7 +236,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) { listen: "127.0.0.1:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, "") require.Error(t, err) require.Contains(t, err.Error(), "cluster info source") } @@ -259,7 +259,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, "test") require.NoError(t, err) // Poll /admin/healthz until success or the test deadline. @@ -302,7 +302,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, "test") require.NoError(t, err) transport := &http.Transport{TLSClientConfig: &tls.Config{