From ec66090525db1e2d279b428cf91e92986038f08a Mon Sep 17 00:00:00 2001 From: Simon Carr Date: Fri, 8 May 2026 08:16:02 +0100 Subject: [PATCH] feat(ctops): add CT Ops inventory connector --- README.md | 44 +- cmd/ct-cve/main.go | 6 + compose.yaml | 1 + internal/config/config.go | 121 ++++++ internal/config/ctops_config_test.go | 74 ++++ internal/ctops/handler.go | 333 ++++++++++++++ internal/ctops/handler_test.go | 284 ++++++++++++ internal/ctops/service_token.go | 242 +++++++++++ internal/ctops/service_token_test.go | 206 +++++++++ internal/ctops/types.go | 157 +++++++ internal/store/ctops.go | 627 +++++++++++++++++++++++++++ migrations/0005_ct_ops_connector.sql | 105 +++++ 12 files changed, 2198 insertions(+), 2 deletions(-) create mode 100644 internal/config/ctops_config_test.go create mode 100644 internal/ctops/handler.go create mode 100644 internal/ctops/handler_test.go create mode 100644 internal/ctops/service_token.go create mode 100644 internal/ctops/service_token_test.go create mode 100644 internal/ctops/types.go create mode 100644 internal/store/ctops.go create mode 100644 migrations/0005_ct_ops_connector.sql diff --git a/README.md b/README.md index 0d5b6f6..9b15749 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,8 @@ increment provides: when releases are created. - A built-in operational status GUI, editable source configuration forms, and a JSON status endpoint with recent feed/API activity logs. +- A CT Ops connector API that accepts signed inventory snapshots, matches + installed packages, and delivers signed finding batches back to CT Ops. ## Local Development @@ -54,6 +56,13 @@ Recent feed sync outcomes and source configuration API changes are recorded as operational logs and shown on the status page and JSON endpoint. Logs report whether an NVD API key is configured but never include the key value. +CT Ops pushes inventory to CT-CVE at +`POST /api/v1/ct-ops/inventory-snapshots`. CT-CVE verifies the signed service +request, stores the org-scoped host and software inventory, immediately matches +active packages against the persisted vulnerability catalog, and posts finding +batches back to CT Ops at +`/api/integrations/ct-cve/v1/finding-batches`. + ## Container Images Release-please manages CT-CVE GitHub releases from Conventional Commit history. @@ -84,14 +93,45 @@ published release. | `CT_CVE_NVD_REQUEST_DELAY` | No | `6s` without an API key, `600ms` with an API key | Minimum delay between NVD API requests. | | `CT_CVE_CISA_KEV_ENABLED` | No | `true` | Enables the CISA Known Exploited Vulnerabilities source. | | `CT_CVE_CISA_KEV_BASE_URL` | No | `https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json` | CISA KEV JSON feed URL. | +| `CT_CVE_CT_OPS_CONNECTIONS` | No | `[]` | JSON array of CT Ops connector definitions with org-scoped inbound inventory tokens and outbound CT Ops callback token. | The status page reports whether the NVD API key is configured, but it never returns the key value in HTML or JSON responses. +Example CT Ops connector configuration: + +```json +[ + { + "name": "Primary CT Ops", + "orgId": "org_123", + "ctOpsBaseUrl": "https://ctops.example.com", + "inventoryTokens": [ + { + "id": "ctops-inventory", + "secret": "replace-with-at-least-32-bytes-of-secret", + "scopes": ["inventory:write", "connection:read"] + } + ], + "ctOpsToken": { + "id": "ctcve-outbound", + "secret": "replace-with-at-least-32-bytes-of-secret", + "scopes": ["findings:write", "connection:read"] + } + } +] +``` + +The same HMAC request-signing contract is used in both directions: +`Authorization: CT-ServiceToken `, `X-CT-Timestamp`, `X-CT-Nonce`, +`X-CT-Content-SHA256`, and `X-CT-Signature: v1=`. + ## Migration Status This bootstrap now includes feed sync workers for the NVD and CISA KEV catalogs, including persistence of CVE metadata, known-exploited CVE metadata, and source status, plus the first CT-CVE status GUI/API slice with editable source -configuration and feed/API activity logs. CT-CVE subscription status from CT Ops -and the CT Ops connector remain outstanding migration work. +configuration and feed/API activity logs. The first CT Ops connector path is +implemented for signed inventory ingestion, immediate matching, and signed +finding delivery. CT-CVE subscription status from CT Ops remains outstanding +migration work. diff --git a/cmd/ct-cve/main.go b/cmd/ct-cve/main.go index eaafc2d..4edc561 100644 --- a/cmd/ct-cve/main.go +++ b/cmd/ct-cve/main.go @@ -11,6 +11,7 @@ import ( "time" "github.com/carrtech-dev/ct-cve/internal/config" + "github.com/carrtech-dev/ct-cve/internal/ctops" "github.com/carrtech-dev/ct-cve/internal/feed" "github.com/carrtech-dev/ct-cve/internal/gui" "github.com/carrtech-dev/ct-cve/internal/store" @@ -49,6 +50,11 @@ func main() { mux := http.NewServeMux() gui.NewHandler(cfg, db).Register(mux) + ctops.NewHandler(ctops.HandlerOptions{ + Connections: cfg.CTOpsConnections, + Store: db, + HTTPClient: &http.Client{Timeout: cfg.FeedHTTPTimeout}, + }).Register(mux) mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) diff --git a/compose.yaml b/compose.yaml index 1443a60..db21839 100644 --- a/compose.yaml +++ b/compose.yaml @@ -10,6 +10,7 @@ services: CT_CVE_NVD_ENABLED: "true" CT_CVE_NVD_REQUEST_DELAY: 6s CT_CVE_CISA_KEV_ENABLED: "true" + CT_CVE_CT_OPS_CONNECTIONS: "[]" ports: - "8080:8080" depends_on: diff --git a/internal/config/config.go b/internal/config/config.go index 7f4998f..49d6668 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "encoding/json" "errors" "fmt" "net/url" @@ -8,8 +9,27 @@ import ( "strconv" "strings" "time" + + "github.com/carrtech-dev/ct-cve/internal/ctops" ) +const serviceTokenMinBytes = 32 + +type rawCTOpsConnection struct { + Name string `json:"name"` + OrgID string `json:"orgId"` + CTOpsBaseURL string `json:"ctOpsBaseUrl"` + InventoryTokens []rawCTOpsServiceToken `json:"inventoryTokens"` + CTOpsToken rawCTOpsServiceToken `json:"ctOpsToken"` +} + +type rawCTOpsServiceToken struct { + ID string `json:"id"` + Secret string `json:"secret"` + Scopes []string `json:"scopes"` + Revoked bool `json:"revoked"` +} + type Config struct { HTTPAddr string DatabaseURL string @@ -17,6 +37,7 @@ type Config struct { FeedSyncOnStartup bool FeedHTTPTimeout time.Duration Sources SourceConfig + CTOpsConnections []ctops.Connection } type SourceConfig struct { @@ -83,6 +104,10 @@ func Load() (Config, error) { if err != nil { return Config{}, err } + ctOpsConnections, err := ctOpsConnectionsFromEnv(os.Getenv("CT_CVE_CT_OPS_CONNECTIONS")) + if err != nil { + return Config{}, err + } cfg := Config{ HTTPAddr: valueOrDefault(os.Getenv("CT_CVE_HTTP_ADDR"), ":8080"), @@ -90,6 +115,7 @@ func Load() (Config, error) { FeedSyncInterval: feedSyncInterval, FeedSyncOnStartup: feedSyncOnStartup, FeedHTTPTimeout: feedHTTPTimeout, + CTOpsConnections: ctOpsConnections, Sources: SourceConfig{ NVD: NVDSourceConfig{ Enabled: nvdEnabled, @@ -109,6 +135,101 @@ func Load() (Config, error) { return cfg, nil } +func ctOpsConnectionsFromEnv(value string) ([]ctops.Connection, error) { + if strings.TrimSpace(value) == "" { + return nil, nil + } + var raw []rawCTOpsConnection + if err := json.Unmarshal([]byte(value), &raw); err != nil { + return nil, fmt.Errorf("CT_CVE_CT_OPS_CONNECTIONS must be valid JSON: %w", err) + } + connections := make([]ctops.Connection, 0, len(raw)) + for index, entry := range raw { + path := fmt.Sprintf("CT_CVE_CT_OPS_CONNECTIONS[%d]", index) + name := strings.TrimSpace(entry.Name) + orgID := strings.TrimSpace(entry.OrgID) + if name == "" || orgID == "" { + return nil, fmt.Errorf("%s must include name and orgId", path) + } + baseURL, err := normalizeBaseURL(entry.CTOpsBaseURL, path+".ctOpsBaseUrl") + if err != nil { + return nil, err + } + inventoryTokens := make([]ctops.ServiceToken, 0, len(entry.InventoryTokens)) + for tokenIndex, rawToken := range entry.InventoryTokens { + token, err := parseServiceToken(rawToken, orgID, fmt.Sprintf("%s.inventoryTokens[%d]", path, tokenIndex)) + if err != nil { + return nil, err + } + inventoryTokens = append(inventoryTokens, token) + } + if len(inventoryTokens) == 0 { + return nil, fmt.Errorf("%s.inventoryTokens must include at least one token", path) + } + ctOpsToken, err := parseServiceToken(entry.CTOpsToken, orgID, path+".ctOpsToken") + if err != nil { + return nil, err + } + connections = append(connections, ctops.Connection{ + Name: name, + OrgID: orgID, + CTOpsBaseURL: baseURL, + InventoryTokens: inventoryTokens, + CTOpsToken: ctOpsToken, + }) + } + return connections, nil +} + +func parseServiceToken(raw rawCTOpsServiceToken, orgID, path string) (ctops.ServiceToken, error) { + id := strings.TrimSpace(raw.ID) + if id == "" || raw.Secret == "" { + return ctops.ServiceToken{}, fmt.Errorf("%s must include id and secret", path) + } + if !secretHasEnoughEntropy(raw.Secret) { + return ctops.ServiceToken{}, fmt.Errorf("%s.secret must contain at least 32 bytes of entropy", path) + } + scopes := make([]ctops.ServiceTokenScope, 0, len(raw.Scopes)) + for _, rawScope := range raw.Scopes { + scope := ctops.ServiceTokenScope(strings.TrimSpace(rawScope)) + switch scope { + case ctops.ScopeInventoryWrite, ctops.ScopeFindingsWrite, ctops.ScopeConnectionRead: + scopes = append(scopes, scope) + default: + return ctops.ServiceToken{}, fmt.Errorf("%s.scopes contains unsupported scope %q", path, rawScope) + } + } + if len(scopes) == 0 { + return ctops.ServiceToken{}, fmt.Errorf("%s.scopes must include at least one scope", path) + } + return ctops.ServiceToken{ + ID: id, + Secret: raw.Secret, + OrgID: orgID, + Scopes: scopes, + Revoked: raw.Revoked, + }, nil +} + +func normalizeBaseURL(value, name string) (string, error) { + parsedURL, err := validateHTTPURL(name, strings.TrimSpace(value)) + if err != nil { + return "", err + } + parsed, err := url.Parse(parsedURL) + if err != nil { + return "", err + } + parsed.Path = strings.TrimRight(parsed.Path, "/") + parsed.RawQuery = "" + parsed.Fragment = "" + return strings.TrimRight(parsed.String(), "/"), nil +} + +func secretHasEnoughEntropy(value string) bool { + return len(value) >= serviceTokenMinBytes +} + func (cfg Config) ApplySourceSettings(settings []SourceSettings) Config { next := cfg for _, setting := range settings { diff --git a/internal/config/ctops_config_test.go b/internal/config/ctops_config_test.go new file mode 100644 index 0000000..7cb6fe6 --- /dev/null +++ b/internal/config/ctops_config_test.go @@ -0,0 +1,74 @@ +package config + +import ( + "testing" + + "github.com/carrtech-dev/ct-cve/internal/ctops" +) + +func TestLoadParsesCTOpsConnections(t *testing.T) { + t.Setenv("CT_CVE_DATABASE_URL", "postgres://ct_cve:ct_cve@localhost:5432/ct_cve?sslmode=disable") + t.Setenv("CT_CVE_CT_OPS_CONNECTIONS", `[ + { + "name": "Primary CT Ops", + "orgId": "org_123", + "ctOpsBaseUrl": "https://ctops.example.test/", + "inventoryTokens": [ + { + "id": "ctops-inventory", + "secret": "ctops inventory signing secret 12345", + "scopes": ["inventory:write"] + } + ], + "ctOpsToken": { + "id": "ctcve-outbound", + "secret": "ctcve outbound signing secret 12345", + "scopes": ["findings:write", "connection:read"] + } + } + ]`) + + cfg, err := Load() + if err != nil { + t.Fatalf("Load: %v", err) + } + if len(cfg.CTOpsConnections) != 1 { + t.Fatalf("connections = %d, want 1", len(cfg.CTOpsConnections)) + } + connection := cfg.CTOpsConnections[0] + if connection.Name != "Primary CT Ops" { + t.Fatalf("Name = %q", connection.Name) + } + if connection.CTOpsBaseURL != "https://ctops.example.test" { + t.Fatalf("CTOpsBaseURL = %q, want normalized URL", connection.CTOpsBaseURL) + } + if len(connection.InventoryTokens) != 1 || connection.InventoryTokens[0].Scopes[0] != ctops.ScopeInventoryWrite { + t.Fatalf("InventoryTokens = %#v", connection.InventoryTokens) + } + if connection.CTOpsToken.ID != "ctcve-outbound" || len(connection.CTOpsToken.Scopes) != 2 { + t.Fatalf("CTOpsToken = %#v", connection.CTOpsToken) + } +} + +func TestLoadRejectsInvalidCTOpsConnectionConfig(t *testing.T) { + tests := []struct { + name string + value string + }{ + {name: "not json", value: `{nope`}, + {name: "missing org", value: `[{"name":"x","ctOpsBaseUrl":"https://ctops.example.test"}]`}, + {name: "non http url", value: `[{"name":"x","orgId":"org_123","ctOpsBaseUrl":"file:///tmp/x"}]`}, + {name: "weak token", value: `[{"name":"x","orgId":"org_123","ctOpsBaseUrl":"https://ctops.example.test","inventoryTokens":[{"id":"t","secret":"short","scopes":["inventory:write"]}]}]`}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("CT_CVE_DATABASE_URL", "postgres://ct_cve:ct_cve@localhost:5432/ct_cve?sslmode=disable") + t.Setenv("CT_CVE_CT_OPS_CONNECTIONS", tt.value) + + if _, err := Load(); err == nil { + t.Fatal("Load returned nil error, want invalid CT Ops connection error") + } + }) + } +} diff --git a/internal/ctops/handler.go b/internal/ctops/handler.go new file mode 100644 index 0000000..8654212 --- /dev/null +++ b/internal/ctops/handler.go @@ -0,0 +1,333 @@ +package ctops + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +type Store interface { + NonceStore + ApplyInventorySnapshot(context.Context, InventorySnapshot, time.Time) (InventorySnapshotApplyResult, error) + RecordFindingDelivery(context.Context, string, string, int) error + RecordConnectionError(context.Context, string, string) error + UpdateInventorySnapshotResponse(context.Context, string, string, InventorySnapshotResult) error + ConnectionHealth(context.Context, string) (ConnectionHealth, error) + RecordConnectionHealth(context.Context, string) error +} + +type HandlerOptions struct { + Connections []Connection + Store Store + HTTPClient *http.Client + Now func() time.Time +} + +type Handler struct { + connections []Connection + store Store + httpClient *http.Client + now func() time.Time +} + +const ( + inventoryPath = "/api/v1/ct-ops/inventory-snapshots" + connectionHealthPath = "/api/v1/ct-ops/connection-health" + findingBatchPath = "/api/integrations/ct-cve/v1/finding-batches" + maxBodyBytes = 25 * 1024 * 1024 + maxHosts = 500 + maxPackages = 25_000 + maxFindingsPerBatch = 5_000 +) + +func NewHandler(opts HandlerOptions) Handler { + client := opts.HTTPClient + if client == nil { + client = &http.Client{Timeout: 30 * time.Second} + } + now := opts.Now + if now == nil { + now = time.Now + } + return Handler{ + connections: opts.Connections, + store: opts.Store, + httpClient: client, + now: now, + } +} + +func (h Handler) Register(mux *http.ServeMux) http.Handler { + mux.HandleFunc(inventoryPath, h.serveInventorySnapshot) + mux.HandleFunc(connectionHealthPath, h.serveConnectionHealth) + return mux +} + +func (h Handler) serveInventorySnapshot(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.Header().Set("Allow", http.MethodPost) + errorResponse(w, http.StatusMethodNotAllowed, "method_not_allowed", "method not allowed", false) + return + } + body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, maxBodyBytes+1)) + if err != nil { + errorResponse(w, http.StatusRequestEntityTooLarge, "payload_too_large", "CT-CVE inventory snapshot payload exceeds the 25 MiB limit.", false) + return + } + if len(body) > maxBodyBytes { + errorResponse(w, http.StatusRequestEntityTooLarge, "payload_too_large", "CT-CVE inventory snapshot payload exceeds the 25 MiB limit.", false) + return + } + + var snapshot InventorySnapshot + if err := json.Unmarshal(body, &snapshot); err != nil { + errorResponse(w, http.StatusBadRequest, "invalid_json", "CT-CVE inventory snapshot payload must be valid JSON.", false) + return + } + if err := validateSnapshot(snapshot); err != nil { + errorResponse(w, http.StatusBadRequest, "invalid_payload", err.Error(), false) + return + } + connection, ok := h.connectionForOrg(snapshot.OrgID) + if !ok || len(connection.InventoryTokens) == 0 { + errorResponse(w, http.StatusForbidden, "unknown_org", "No CT Ops connection is configured for this organisation.", false) + return + } + if _, err := VerifyServiceRequest(r.Context(), VerifyServiceRequestOptions{ + Method: r.Method, + Path: r.URL.Path, + Body: body, + Headers: r.Header, + RequiredScope: ScopeInventoryWrite, + OrgID: snapshot.OrgID, + Tokens: connection.InventoryTokens, + NonceStore: h.store, + Now: h.now(), + }); err != nil { + serviceError(w, err) + return + } + + applied, err := h.store.ApplyInventorySnapshot(r.Context(), snapshot, h.now().UTC()) + if err != nil { + errorResponse(w, http.StatusInternalServerError, "inventory_snapshot_failed", "Failed to process CT Ops inventory snapshot.", true) + return + } + result := applied.Response + if result.NextAction == "" { + result.NextAction = "none" + } + + if len(applied.Findings) > 0 && !applied.Replayed { + if err := h.deliverFindings(r.Context(), connection, applied.Findings); err != nil { + _ = h.store.RecordConnectionError(r.Context(), snapshot.OrgID, "finding_delivery_failed") + result.NextAction = "retry_findings" + } + _ = h.store.UpdateInventorySnapshotResponse(r.Context(), snapshot.OrgID, snapshot.SnapshotID, result) + } + + writeJSON(w, http.StatusAccepted, result) +} + +func (h Handler) serveConnectionHealth(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet && r.Method != http.MethodHead { + w.Header().Set("Allow", http.MethodGet+", "+http.MethodHead) + errorResponse(w, http.StatusMethodNotAllowed, "method_not_allowed", "method not allowed", false) + return + } + orgID := strings.TrimSpace(r.URL.Query().Get("orgId")) + if orgID == "" { + errorResponse(w, http.StatusBadRequest, "missing_org_id", "orgId query parameter is required.", false) + return + } + connection, ok := h.connectionForOrg(orgID) + if !ok || len(connection.InventoryTokens) == 0 { + errorResponse(w, http.StatusForbidden, "unknown_org", "No CT Ops connection is configured for this organisation.", false) + return + } + if _, err := VerifyServiceRequest(r.Context(), VerifyServiceRequestOptions{ + Method: r.Method, + Path: r.URL.Path, + Body: nil, + Headers: r.Header, + RequiredScope: ScopeConnectionRead, + OrgID: orgID, + Tokens: connection.InventoryTokens, + NonceStore: h.store, + Now: h.now(), + }); err != nil { + serviceError(w, err) + return + } + if err := h.store.RecordConnectionHealth(r.Context(), orgID); err != nil { + errorResponse(w, http.StatusInternalServerError, "connection_health_failed", "Failed to record CT Ops connection health.", true) + return + } + health, err := h.store.ConnectionHealth(r.Context(), orgID) + if err != nil { + errorResponse(w, http.StatusInternalServerError, "connection_health_failed", "Failed to load CT Ops connection health.", true) + return + } + health.Configured = true + health.ContractVersion = ContractVersion + if r.Method == http.MethodHead { + w.WriteHeader(http.StatusOK) + return + } + writeJSON(w, http.StatusOK, health) +} + +func (h Handler) connectionForOrg(orgID string) (Connection, bool) { + for _, connection := range h.connections { + if connection.OrgID == orgID { + return connection, true + } + } + return Connection{}, false +} + +func (h Handler) deliverFindings(ctx context.Context, connection Connection, findings []Finding) error { + if connection.CTOpsBaseURL == "" || connection.CTOpsToken.ID == "" { + return fmt.Errorf("CT Ops callback token is not configured") + } + for start := 0; start < len(findings); start += maxFindingsPerBatch { + end := start + maxFindingsPerBatch + if end > len(findings) { + end = len(findings) + } + batch := FindingBatch{ + ContractVersion: ContractVersion, + OrgID: connection.OrgID, + BatchID: batchID(connection.OrgID, h.now().UTC(), start/maxFindingsPerBatch), + GeneratedAt: h.now().UTC(), + Findings: findings[start:end], + } + body, err := json.Marshal(batch) + if err != nil { + return err + } + endpoint, err := callbackURL(connection.CTOpsBaseURL) + if err != nil { + return err + } + request, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return err + } + headers := SignServiceRequest(SignServiceRequestOptions{ + Method: http.MethodPost, + Path: findingBatchPath, + Body: body, + Token: connection.CTOpsToken, + Timestamp: h.now().UTC(), + }) + for key, values := range headers { + for _, value := range values { + request.Header.Add(key, value) + } + } + request.Header.Set("Content-Type", "application/json") + response, err := h.httpClient.Do(request) + if err != nil { + return err + } + _, _ = io.Copy(io.Discard, response.Body) + _ = response.Body.Close() + if response.StatusCode < 200 || response.StatusCode >= 300 { + return fmt.Errorf("CT Ops finding callback returned HTTP %d", response.StatusCode) + } + if err := h.store.RecordFindingDelivery(ctx, connection.OrgID, batch.BatchID, len(batch.Findings)); err != nil { + return err + } + } + return nil +} + +func validateSnapshot(snapshot InventorySnapshot) error { + if snapshot.ContractVersion != ContractVersion { + return fmt.Errorf("unsupported contractVersion") + } + if strings.TrimSpace(snapshot.OrgID) == "" { + return fmt.Errorf("orgId is required") + } + if strings.TrimSpace(snapshot.SnapshotID) == "" { + return fmt.Errorf("snapshotId is required") + } + if snapshot.SnapshotType != "full" && snapshot.SnapshotType != "incremental" && snapshot.SnapshotType != "tombstone" { + return fmt.Errorf("snapshotType must be full, incremental, or tombstone") + } + if snapshot.GeneratedAt.IsZero() { + return fmt.Errorf("generatedAt is required") + } + if len(snapshot.Hosts) > maxHosts { + return fmt.Errorf("hosts must contain at most 500 rows") + } + if len(snapshot.Packages) > maxPackages { + return fmt.Errorf("packages must contain at most 25000 rows") + } + for _, host := range snapshot.Hosts { + if strings.TrimSpace(host.HostID) == "" || strings.TrimSpace(host.Hostname) == "" { + return fmt.Errorf("hostId and hostname are required for every host") + } + } + for _, pkg := range snapshot.Packages { + if strings.TrimSpace(pkg.SoftwarePackageID) == "" || strings.TrimSpace(pkg.HostID) == "" || strings.TrimSpace(pkg.Name) == "" || strings.TrimSpace(pkg.Version) == "" || strings.TrimSpace(pkg.Source) == "" { + return fmt.Errorf("softwarePackageId, hostId, name, version, and source are required for every package") + } + if pkg.FirstSeenAt.IsZero() || pkg.LastSeenAt.IsZero() { + return fmt.Errorf("firstSeenAt and lastSeenAt are required for every package") + } + } + return nil +} + +func callbackURL(baseURL string) (string, error) { + parsed, err := url.Parse(baseURL) + if err != nil { + return "", err + } + parsed.Path = strings.TrimRight(parsed.Path, "/") + findingBatchPath + parsed.RawQuery = "" + parsed.Fragment = "" + return parsed.String(), nil +} + +func batchID(orgID string, now time.Time, page int) string { + compact := now.UTC().Format("20060102_150405") + return fmt.Sprintf("findings_%s_%s_%d", compact, orgID, page) +} + +func serviceError(w http.ResponseWriter, err error) { + var authErr *ServiceAuthError + if errorsAsServiceAuth(err, &authErr) { + errorResponse(w, authErr.Status, authErr.Code, authErr.Message, authErr.Retryable) + return + } + errorResponse(w, http.StatusInternalServerError, "service_auth_failed", "Failed to verify CT-CVE service request.", true) +} + +func errorsAsServiceAuth(err error, target **ServiceAuthError) bool { + return AsServiceAuthError(err, target) +} + +func errorResponse(w http.ResponseWriter, status int, code, message string, retryable bool) { + writeJSON(w, status, map[string]any{ + "error": map[string]any{ + "code": code, + "message": message, + "retryable": retryable, + }, + }) +} + +func writeJSON(w http.ResponseWriter, status int, value any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(value) +} diff --git a/internal/ctops/handler_test.go b/internal/ctops/handler_test.go new file mode 100644 index 0000000..e7341ee --- /dev/null +++ b/internal/ctops/handler_test.go @@ -0,0 +1,284 @@ +package ctops + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + "time" +) + +func TestInventorySnapshotEndpointMatchesAndDeliversFindings(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + inbound := ServiceToken{ID: "ctops-inventory", Secret: "ctops inventory signing secret 12345", OrgID: "org_123", Scopes: []ServiceTokenScope{ScopeInventoryWrite}} + outbound := ServiceToken{ID: "ctcve-outbound", Secret: "ctcve outbound signing secret 12345", OrgID: "org_123", Scopes: []ServiceTokenScope{ScopeFindingsWrite, ScopeConnectionRead}} + + var delivered FindingBatch + ctopsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/integrations/ct-cve/v1/finding-batches" { + t.Fatalf("unexpected callback path %s", r.URL.Path) + } + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("ReadAll: %v", err) + } + if _, err := VerifyServiceRequest(r.Context(), VerifyServiceRequestOptions{ + Method: r.Method, + Path: r.URL.Path, + Body: body, + Headers: r.Header, + RequiredScope: ScopeFindingsWrite, + OrgID: "org_123", + Tokens: []ServiceToken{outbound}, + NonceStore: NewMemoryNonceStore(), + Now: now, + }); err != nil { + t.Fatalf("callback signature verification failed: %v", err) + } + if err := json.Unmarshal(body, &delivered); err != nil { + t.Fatalf("Unmarshal callback: %v", err) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte(`{"accepted":true,"batchId":"` + delivered.BatchID + `","findingsAccepted":1,"findingsRejected":0,"findingsSkipped":0}`)) + })) + defer ctopsServer.Close() + + store := &fakeStore{ + result: InventorySnapshotResult{ + Accepted: true, + SnapshotID: "inv_1", + HostsAccepted: 1, + PackagesAccepted: 1, + RowsRejected: 0, + NextAction: "none", + }, + findings: []Finding{{ + FindingID: "ctcve_find_1", + HostID: "host_1", + SoftwarePackageID: "pkg_1", + CVEID: "CVE-2026-12345", + Status: FindingStatusOpen, + PackageName: "openssl", + InstalledVersion: "3.0.13-0ubuntu3.2", + FixedVersion: "3.0.13-0ubuntu3.3", + Source: "ubuntu-osv", + Severity: "high", + CVSSScore: ptrFloat(8.1), + KnownExploited: false, + Confidence: "confirmed", + MatchReason: "installed version is below fixed version", + FirstSeenAt: now, + LastSeenAt: now, + CVE: CVESummary{ + Title: "OpenSSL vulnerability", + Description: "Short normalized summary.", + PublishedAt: &now, + ModifiedAt: &now, + }, + }}, + } + handler := NewHandler(HandlerOptions{ + Connections: []Connection{{ + Name: "Primary CT Ops", + OrgID: "org_123", + CTOpsBaseURL: ctopsServer.URL, + InventoryTokens: []ServiceToken{inbound}, + CTOpsToken: outbound, + }}, + Store: store, + HTTPClient: ctopsServer.Client(), + Now: func() time.Time { return now }, + }) + + body := `{ + "contractVersion": "2026-04-30", + "orgId": "org_123", + "orgSlug": "acme", + "snapshotId": "inv_1", + "snapshotType": "full", + "generatedAt": "2026-05-08T12:00:00Z", + "cursor": null, + "hosts": [{"hostId":"host_1","hostname":"web-01","status":"online","updatedAt":"2026-05-08T12:00:00Z"}], + "packages": [{"softwarePackageId":"pkg_1","hostId":"host_1","name":"openssl","version":"3.0.13-0ubuntu3.2","source":"dpkg","firstSeenAt":"2026-05-08T11:00:00Z","lastSeenAt":"2026-05-08T12:00:00Z"}] + }` + request := httptest.NewRequest(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", strings.NewReader(body)) + request.Header = signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, inbound.ID, inbound.Secret, "snapshot-nonce", now) + request.Header.Set("Content-Type", "application/json") + response := httptest.NewRecorder() + + handler.Register(http.NewServeMux()).ServeHTTP(response, request) + + if response.Code != http.StatusAccepted { + t.Fatalf("status = %d, want 202; body=%s", response.Code, response.Body.String()) + } + var result InventorySnapshotResult + if err := json.Unmarshal(response.Body.Bytes(), &result); err != nil { + t.Fatalf("Unmarshal response: %v", err) + } + if !result.Accepted || result.NextAction != "none" { + t.Fatalf("response = %#v", result) + } + if len(store.snapshots) != 1 || store.snapshots[0].SnapshotID != "inv_1" { + t.Fatalf("stored snapshots = %#v", store.snapshots) + } + if delivered.OrgID != "org_123" || len(delivered.Findings) != 1 { + t.Fatalf("delivered batch = %#v", delivered) + } + if delivered.Findings[0].CVEID != "CVE-2026-12345" || delivered.Findings[0].HostID != "host_1" { + t.Fatalf("delivered finding = %#v", delivered.Findings[0]) + } +} + +func TestInventorySnapshotEndpointReturnsRetryActionWhenCallbackFails(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + inbound := ServiceToken{ID: "ctops-inventory", Secret: "ctops inventory signing secret 12345", OrgID: "org_123", Scopes: []ServiceTokenScope{ScopeInventoryWrite}} + outbound := ServiceToken{ID: "ctcve-outbound", Secret: "ctcve outbound signing secret 12345", OrgID: "org_123", Scopes: []ServiceTokenScope{ScopeFindingsWrite}} + ctopsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "down", http.StatusBadGateway) + })) + defer ctopsServer.Close() + + store := &fakeStore{ + result: InventorySnapshotResult{Accepted: true, SnapshotID: "inv_1", HostsAccepted: 1, PackagesAccepted: 1, NextAction: "none"}, + findings: []Finding{{ + FindingID: "ctcve_find_1", HostID: "host_1", SoftwarePackageID: "pkg_1", CVEID: "CVE-2026-12345", + Status: FindingStatusOpen, PackageName: "openssl", InstalledVersion: "1", Source: "ubuntu-osv", + Severity: "high", Confidence: "confirmed", FirstSeenAt: now, LastSeenAt: now, + }}, + } + handler := NewHandler(HandlerOptions{ + Connections: []Connection{{Name: "Primary CT Ops", OrgID: "org_123", CTOpsBaseURL: ctopsServer.URL, InventoryTokens: []ServiceToken{inbound}, CTOpsToken: outbound}}, + Store: store, + HTTPClient: ctopsServer.Client(), + Now: func() time.Time { return now }, + }) + body := `{"contractVersion":"2026-04-30","orgId":"org_123","snapshotId":"inv_1","snapshotType":"full","generatedAt":"2026-05-08T12:00:00Z","hosts":[],"packages":[]}` + request := httptest.NewRequest(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", strings.NewReader(body)) + request.Header = signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, inbound.ID, inbound.Secret, "snapshot-nonce", now) + response := httptest.NewRecorder() + + handler.Register(http.NewServeMux()).ServeHTTP(response, request) + + if response.Code != http.StatusAccepted { + t.Fatalf("status = %d, want 202; body=%s", response.Code, response.Body.String()) + } + var result InventorySnapshotResult + if err := json.Unmarshal(response.Body.Bytes(), &result); err != nil { + t.Fatalf("Unmarshal response: %v", err) + } + if result.NextAction != "retry_findings" { + t.Fatalf("NextAction = %q, want retry_findings", result.NextAction) + } + if store.lastErrorCode != "finding_delivery_failed" { + t.Fatalf("lastErrorCode = %q", store.lastErrorCode) + } +} + +func TestInventorySnapshotEndpointRejectsHostRowLimit(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + inbound := ServiceToken{ID: "ctops-inventory", Secret: "ctops inventory signing secret 12345", OrgID: "org_123", Scopes: []ServiceTokenScope{ScopeInventoryWrite}} + hosts := make([]string, 0, maxHosts+1) + for i := 0; i < maxHosts+1; i++ { + hosts = append(hosts, `{"hostId":"host_`+strconv.Itoa(i)+`","hostname":"web-`+strconv.Itoa(i)+`","status":"online","updatedAt":"2026-05-08T12:00:00Z"}`) + } + body := `{"contractVersion":"2026-04-30","orgId":"org_123","snapshotId":"inv_1","snapshotType":"full","generatedAt":"2026-05-08T12:00:00Z","hosts":[` + strings.Join(hosts, ",") + `],"packages":[]}` + handler := NewHandler(HandlerOptions{ + Connections: []Connection{{Name: "Primary CT Ops", OrgID: "org_123", InventoryTokens: []ServiceToken{inbound}}}, + Store: &fakeStore{}, + Now: func() time.Time { return now }, + }) + request := httptest.NewRequest(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", strings.NewReader(body)) + request.Header = signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, inbound.ID, inbound.Secret, "limit-nonce", now) + response := httptest.NewRecorder() + + handler.Register(http.NewServeMux()).ServeHTTP(response, request) + + if response.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400; body=%s", response.Code, response.Body.String()) + } + if !strings.Contains(response.Body.String(), "hosts must contain at most 500 rows") { + t.Fatalf("body = %s", response.Body.String()) + } +} + +func TestConnectionHealthEndpointIsOrgScopedAndSigned(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + inbound := ServiceToken{ID: "ctops-health", Secret: "ctops health signing secret 123456789", OrgID: "org_123", Scopes: []ServiceTokenScope{ScopeConnectionRead}} + store := &fakeStore{health: ConnectionHealth{Configured: true, Enabled: true, ContractVersion: ContractVersion}} + handler := NewHandler(HandlerOptions{ + Connections: []Connection{{Name: "Primary CT Ops", OrgID: "org_123", InventoryTokens: []ServiceToken{inbound}}}, + Store: store, + Now: func() time.Time { return now }, + }) + body := "" + request := httptest.NewRequest(http.MethodGet, "/api/v1/ct-ops/connection-health?orgId=org_123", nil) + request.Header = signedHeaders(http.MethodGet, "/api/v1/ct-ops/connection-health", body, inbound.ID, inbound.Secret, "health-nonce", now) + response := httptest.NewRecorder() + + handler.Register(http.NewServeMux()).ServeHTTP(response, request) + + if response.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", response.Code, response.Body.String()) + } + if !store.healthRecorded { + t.Fatal("health check was not recorded") + } +} + +type fakeStore struct { + snapshots []InventorySnapshot + result InventorySnapshotResult + findings []Finding + health ConnectionHealth + lastErrorCode string + healthRecorded bool +} + +func (s *fakeStore) RememberNonce(_ context.Context, _ string, _ string, _ time.Time, _ time.Time) (bool, error) { + return true, nil +} + +func (s *fakeStore) ApplyInventorySnapshot(_ context.Context, snapshot InventorySnapshot, _ time.Time) (InventorySnapshotApplyResult, error) { + s.snapshots = append(s.snapshots, snapshot) + return InventorySnapshotApplyResult{Response: s.result, Findings: s.findings}, nil +} + +func (s *fakeStore) RecordFindingDelivery(_ context.Context, _ string, _ string, _ int) error { + return nil +} + +func (s *fakeStore) RecordConnectionError(_ context.Context, _ string, code string) error { + s.lastErrorCode = code + return nil +} + +func (s *fakeStore) UpdateInventorySnapshotResponse(_ context.Context, _ string, _ string, result InventorySnapshotResult) error { + s.result = result + return nil +} + +func (s *fakeStore) ConnectionHealth(_ context.Context, _ string) (ConnectionHealth, error) { + return s.health, nil +} + +func (s *fakeStore) RecordConnectionHealth(_ context.Context, _ string) error { + s.healthRecorded = true + return nil +} + +func ptrFloat(value float64) *float64 { + return &value +} diff --git a/internal/ctops/service_token.go b/internal/ctops/service_token.go new file mode 100644 index 0000000..8f1e222 --- /dev/null +++ b/internal/ctops/service_token.go @@ -0,0 +1,242 @@ +package ctops + +import ( + "context" + "crypto/hmac" + "crypto/rand" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "errors" + "fmt" + "net/http" + "strings" + "sync" + "time" +) + +type NonceStore interface { + RememberNonce(ctx context.Context, tokenID, nonce string, expiresAt, now time.Time) (bool, error) +} + +type ServiceAuthError struct { + Code string + Message string + Retryable bool + Status int +} + +func (e *ServiceAuthError) Error() string { + return e.Message +} + +type VerifyServiceRequestOptions struct { + Method string + Path string + Body []byte + Headers http.Header + RequiredScope ServiceTokenScope + OrgID string + Tokens []ServiceToken + NonceStore NonceStore + Now time.Time +} + +type VerifyServiceRequestResult struct { + Token ServiceToken +} + +type SignServiceRequestOptions struct { + Method string + Path string + Body []byte + Token ServiceToken + Nonce string + Timestamp time.Time +} + +const ( + authScheme = "CT-ServiceToken" + signaturePrefix = "v1=" + maxClockSkew = 5 * time.Minute + nonceTTL = 10 * time.Minute +) + +func AsServiceAuthError(err error, target **ServiceAuthError) bool { + return errors.As(err, target) +} + +func NewMemoryNonceStore() NonceStore { + return &memoryNonceStore{seen: map[string]time.Time{}} +} + +type memoryNonceStore struct { + mu sync.Mutex + seen map[string]time.Time +} + +func (s *memoryNonceStore) RememberNonce(_ context.Context, tokenID, nonce string, expiresAt, now time.Time) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + + for key, expiry := range s.seen { + if !expiry.After(now) { + delete(s.seen, key) + } + } + key := tokenID + ":" + nonce + if _, ok := s.seen[key]; ok { + return false, nil + } + s.seen[key] = expiresAt + return true, nil +} + +func VerifyServiceRequest(ctx context.Context, opts VerifyServiceRequestOptions) (VerifyServiceRequestResult, error) { + auth := strings.TrimSpace(opts.Headers.Get("Authorization")) + if auth == "" { + return VerifyServiceRequestResult{}, authError("missing_authorization", http.StatusUnauthorized, "Missing CT-CVE service token authorization header.", false) + } + parts := strings.Fields(auth) + if len(parts) != 2 || parts[0] != authScheme { + return VerifyServiceRequestResult{}, authError("invalid_authorization", http.StatusUnauthorized, "Invalid CT-CVE service token authorization header.", false) + } + + tokenID := parts[1] + token, ok := findToken(opts.Tokens, tokenID) + if !ok { + return VerifyServiceRequestResult{}, authError("unknown_token", http.StatusUnauthorized, "CT-CVE service token was not found.", false) + } + if token.Revoked { + return VerifyServiceRequestResult{}, authError("revoked_token", http.StatusUnauthorized, "CT-CVE service token is revoked.", false) + } + if !hasScope(token, opts.RequiredScope) { + return VerifyServiceRequestResult{}, authError("insufficient_scope", http.StatusForbidden, "CT-CVE service token is not allowed to perform this action.", false) + } + if token.OrgID != opts.OrgID { + return VerifyServiceRequestResult{}, authError("org_scope_mismatch", http.StatusForbidden, "CT-CVE service token is not scoped to the requested organisation.", false) + } + + timestamp := opts.Headers.Get("X-CT-Timestamp") + nonce := opts.Headers.Get("X-CT-Nonce") + contentHash := opts.Headers.Get("X-CT-Content-SHA256") + signatureHeader := opts.Headers.Get("X-CT-Signature") + if timestamp == "" || nonce == "" || contentHash == "" || signatureHeader == "" { + return VerifyServiceRequestResult{}, authError("missing_header", http.StatusUnauthorized, "Missing required CT-CVE service signature header.", false) + } + + requestTime, err := time.Parse(time.RFC3339, timestamp) + if err != nil { + requestTime, err = time.Parse(time.RFC3339Nano, timestamp) + } + if err != nil { + return VerifyServiceRequestResult{}, authError("invalid_timestamp", http.StatusUnauthorized, "CT-CVE service timestamp is invalid.", false) + } + now := opts.Now + if now.IsZero() { + now = time.Now() + } + if absDuration(now.Sub(requestTime)) > maxClockSkew { + return VerifyServiceRequestResult{}, authError("timestamp_out_of_range", http.StatusUnauthorized, "CT-CVE service timestamp is outside the allowed replay window.", false) + } + + bodyHash := sha256Hex(opts.Body) + if len(contentHash) != 64 || !safeEqual(contentHash, bodyHash) { + return VerifyServiceRequestResult{}, authError("content_hash_mismatch", http.StatusUnauthorized, "CT-CVE service content hash does not match the request body.", false) + } + if !strings.HasPrefix(signatureHeader, signaturePrefix) { + return VerifyServiceRequestResult{}, authError("invalid_signature", http.StatusUnauthorized, "CT-CVE service signature could not be verified.", false) + } + actualSignature := strings.TrimPrefix(signatureHeader, signaturePrefix) + expectedSignature := signature(opts.Method, opts.Path, timestamp, nonce, bodyHash, token.Secret) + if !safeEqual(actualSignature, expectedSignature) { + return VerifyServiceRequestResult{}, authError("invalid_signature", http.StatusUnauthorized, "CT-CVE service signature could not be verified.", false) + } + + if opts.NonceStore == nil { + return VerifyServiceRequestResult{}, fmt.Errorf("nonce store is required") + } + remembered, err := opts.NonceStore.RememberNonce(ctx, token.ID, nonce, now.Add(nonceTTL), now) + if err != nil { + return VerifyServiceRequestResult{}, err + } + if !remembered { + return VerifyServiceRequestResult{}, authError("replayed_nonce", http.StatusUnauthorized, "CT-CVE service nonce has already been used.", false) + } + + return VerifyServiceRequestResult{Token: token}, nil +} + +func SignServiceRequest(opts SignServiceRequestOptions) http.Header { + timestamp := opts.Timestamp + if timestamp.IsZero() { + timestamp = time.Now() + } + nonce := opts.Nonce + if nonce == "" { + nonce = randomNonce() + } + bodyHash := sha256Hex(opts.Body) + headers := http.Header{} + headers.Set("Authorization", authScheme+" "+opts.Token.ID) + headers.Set("X-CT-Timestamp", timestamp.UTC().Format(time.RFC3339)) + headers.Set("X-CT-Nonce", nonce) + headers.Set("X-CT-Content-SHA256", bodyHash) + headers.Set("X-CT-Signature", signaturePrefix+signature(opts.Method, opts.Path, headers.Get("X-CT-Timestamp"), nonce, bodyHash, opts.Token.Secret)) + return headers +} + +func authError(code string, status int, message string, retryable bool) *ServiceAuthError { + return &ServiceAuthError{Code: code, Status: status, Message: message, Retryable: retryable} +} + +func findToken(tokens []ServiceToken, id string) (ServiceToken, bool) { + for _, token := range tokens { + if token.ID == id { + return token, true + } + } + return ServiceToken{}, false +} + +func hasScope(token ServiceToken, scope ServiceTokenScope) bool { + for _, candidate := range token.Scopes { + if candidate == scope { + return true + } + } + return false +} + +func sha256Hex(body []byte) string { + sum := sha256.Sum256(body) + return hex.EncodeToString(sum[:]) +} + +func signature(method, path, timestamp, nonce, bodyHash, secret string) string { + input := strings.Join([]string{strings.ToUpper(method), path, timestamp, nonce, bodyHash}, "\n") + mac := hmac.New(sha256.New, []byte(secret)) + _, _ = mac.Write([]byte(input)) + return base64.RawURLEncoding.EncodeToString(mac.Sum(nil)) +} + +func safeEqual(a, b string) bool { + left := []byte(a) + right := []byte(b) + return len(left) == len(right) && hmac.Equal(left, right) +} + +func absDuration(value time.Duration) time.Duration { + if value < 0 { + return -value + } + return value +} + +func randomNonce() string { + var raw [16]byte + if _, err := rand.Read(raw[:]); err != nil { + return fmt.Sprintf("%d", time.Now().UnixNano()) + } + return hex.EncodeToString(raw[:]) +} diff --git a/internal/ctops/service_token_test.go b/internal/ctops/service_token_test.go new file mode 100644 index 0000000..7fc0e64 --- /dev/null +++ b/internal/ctops/service_token_test.go @@ -0,0 +1,206 @@ +package ctops + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "net/http" + "strings" + "testing" + "time" +) + +func signedHeaders(method, path, body, tokenID, secret, nonce string, now time.Time) http.Header { + bodySum := sha256.Sum256([]byte(body)) + bodyHash := hex.EncodeToString(bodySum[:]) + timestamp := now.UTC().Format(time.RFC3339) + input := strings.Join([]string{method, path, timestamp, nonce, bodyHash}, "\n") + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write([]byte(input)) + + headers := http.Header{} + headers.Set("Authorization", "CT-ServiceToken "+tokenID) + headers.Set("X-CT-Timestamp", timestamp) + headers.Set("X-CT-Nonce", nonce) + headers.Set("X-CT-Content-SHA256", bodyHash) + headers.Set("X-CT-Signature", "v1="+base64.RawURLEncoding.EncodeToString(mac.Sum(nil))) + return headers +} + +func TestVerifyServiceRequestAcceptsSignedRequest(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + body := `{"orgId":"org_123"}` + token := ServiceToken{ + ID: "inventory-token", + Secret: "ct-cve unit test signing secret 123456", + OrgID: "org_123", + Scopes: []ServiceTokenScope{ScopeInventoryWrite}, + } + + result, err := VerifyServiceRequest(context.Background(), VerifyServiceRequestOptions{ + Method: http.MethodPost, + Path: "/api/v1/ct-ops/inventory-snapshots", + Body: []byte(body), + Headers: signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, token.ID, token.Secret, "nonce-1", now), + RequiredScope: ScopeInventoryWrite, + OrgID: "org_123", + Tokens: []ServiceToken{token}, + NonceStore: NewMemoryNonceStore(), + Now: now, + }) + + if err != nil { + t.Fatalf("VerifyServiceRequest returned error: %v", err) + } + if result.Token.ID != token.ID { + t.Fatalf("token id = %q, want %q", result.Token.ID, token.ID) + } +} + +func TestVerifyServiceRequestRejectsBadHashWrongOrgScopeAndReplay(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + body := `{"orgId":"org_123"}` + token := ServiceToken{ + ID: "inventory-token", + Secret: "ct-cve unit test signing secret 123456", + OrgID: "org_123", + Scopes: []ServiceTokenScope{ScopeInventoryWrite}, + } + nonceStore := NewMemoryNonceStore() + + tests := []struct { + name string + body []byte + orgID string + scope ServiceTokenScope + headers http.Header + code string + }{ + { + name: "bad body hash", + body: []byte(`{"orgId":"org_123","changed":true}`), + orgID: "org_123", + scope: ScopeInventoryWrite, + headers: signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, token.ID, token.Secret, "nonce-bad-hash", now), + code: "content_hash_mismatch", + }, + { + name: "wrong org", + body: []byte(body), + orgID: "org_other", + scope: ScopeInventoryWrite, + headers: signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, token.ID, token.Secret, "nonce-wrong-org", now), + code: "org_scope_mismatch", + }, + { + name: "wrong scope", + body: []byte(body), + orgID: "org_123", + scope: ScopeConnectionRead, + headers: signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, token.ID, token.Secret, "nonce-wrong-scope", now), + code: "insufficient_scope", + }, + { + name: "expired timestamp", + body: []byte(body), + orgID: "org_123", + scope: ScopeInventoryWrite, + headers: signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, token.ID, token.Secret, "nonce-expired", now.Add(-6*time.Minute)), + code: "timestamp_out_of_range", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := VerifyServiceRequest(context.Background(), VerifyServiceRequestOptions{ + Method: http.MethodPost, + Path: "/api/v1/ct-ops/inventory-snapshots", + Body: tt.body, + Headers: tt.headers, + RequiredScope: tt.scope, + OrgID: tt.orgID, + Tokens: []ServiceToken{token}, + NonceStore: nonceStore, + Now: now, + }) + var authErr *ServiceAuthError + if err == nil || !AsServiceAuthError(err, &authErr) { + t.Fatalf("error = %v, want ServiceAuthError", err) + } + if authErr.Code != tt.code { + t.Fatalf("code = %q, want %q", authErr.Code, tt.code) + } + }) + } + + headers := signedHeaders(http.MethodPost, "/api/v1/ct-ops/inventory-snapshots", body, token.ID, token.Secret, "nonce-replay", now) + for attempt := 0; attempt < 2; attempt++ { + _, err := VerifyServiceRequest(context.Background(), VerifyServiceRequestOptions{ + Method: http.MethodPost, + Path: "/api/v1/ct-ops/inventory-snapshots", + Body: []byte(body), + Headers: headers, + RequiredScope: ScopeInventoryWrite, + OrgID: "org_123", + Tokens: []ServiceToken{token}, + NonceStore: nonceStore, + Now: now, + }) + if attempt == 0 && err != nil { + t.Fatalf("first replay setup request returned error: %v", err) + } + if attempt == 1 { + var authErr *ServiceAuthError + if err == nil || !AsServiceAuthError(err, &authErr) || authErr.Code != "replayed_nonce" { + t.Fatalf("second replay error = %v, want replayed_nonce", err) + } + } + } +} + +func TestSignServiceRequestUsesCTOpsSignatureContract(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + body := []byte(`{"accepted":true}`) + token := ServiceToken{ + ID: "ct-cve-outbound", + Secret: "ct-cve outbound signing secret 12345", + OrgID: "org_123", + Scopes: []ServiceTokenScope{ScopeFindingsWrite, ScopeConnectionRead}, + } + + headers := SignServiceRequest(SignServiceRequestOptions{ + Method: http.MethodPost, + Path: "/api/integrations/ct-cve/v1/finding-batches", + Body: body, + Token: token, + Nonce: "nonce-sign", + Timestamp: now, + }) + + verified, err := VerifyServiceRequest(context.Background(), VerifyServiceRequestOptions{ + Method: http.MethodPost, + Path: "/api/integrations/ct-cve/v1/finding-batches", + Body: bytes.Clone(body), + Headers: headers, + RequiredScope: ScopeFindingsWrite, + OrgID: "org_123", + Tokens: []ServiceToken{token}, + NonceStore: NewMemoryNonceStore(), + Now: now, + }) + if err != nil { + t.Fatalf("VerifyServiceRequest returned error for signed headers: %v", err) + } + if verified.Token.ID != token.ID { + t.Fatalf("verified token = %q, want %q", verified.Token.ID, token.ID) + } +} diff --git a/internal/ctops/types.go b/internal/ctops/types.go new file mode 100644 index 0000000..4dfb63e --- /dev/null +++ b/internal/ctops/types.go @@ -0,0 +1,157 @@ +package ctops + +import "time" + +const ContractVersion = "2026-04-30" + +type ServiceTokenScope string + +const ( + ScopeInventoryWrite ServiceTokenScope = "inventory:write" + ScopeFindingsWrite ServiceTokenScope = "findings:write" + ScopeConnectionRead ServiceTokenScope = "connection:read" +) + +type ServiceToken struct { + ID string + Secret string + OrgID string + Scopes []ServiceTokenScope + Revoked bool +} + +type Connection struct { + Name string + OrgID string + CTOpsBaseURL string + InventoryTokens []ServiceToken + CTOpsToken ServiceToken +} + +type InventorySnapshot struct { + ContractVersion string `json:"contractVersion"` + OrgID string `json:"orgId"` + OrgSlug string `json:"orgSlug"` + SnapshotID string `json:"snapshotId"` + SnapshotType string `json:"snapshotType"` + GeneratedAt time.Time `json:"generatedAt"` + Cursor *string `json:"cursor"` + Hosts []InventoryHost `json:"hosts"` + Packages []InventoryPackage `json:"packages"` +} + +type InventoryHost struct { + HostID string `json:"hostId"` + AgentID *string `json:"agentId"` + Hostname string `json:"hostname"` + DisplayName *string `json:"displayName"` + OS *string `json:"os"` + OSVersion *string `json:"osVersion"` + Arch *string `json:"arch"` + IPAddresses []string `json:"ipAddresses"` + Status string `json:"status"` + LastSeenAt *time.Time `json:"lastSeenAt"` + UpdatedAt time.Time `json:"updatedAt"` + DeletedAt *time.Time `json:"deletedAt"` +} + +type InventoryPackage struct { + SoftwarePackageID string `json:"softwarePackageId"` + HostID string `json:"hostId"` + Name string `json:"name"` + Version string `json:"version"` + Architecture *string `json:"architecture"` + Source string `json:"source"` + Fingerprint string `json:"fingerprint"` + DistroID *string `json:"distroId"` + DistroVersionID *string `json:"distroVersionId"` + DistroCodename *string `json:"distroCodename"` + DistroIDLike []string `json:"distroIdLike"` + SourceName *string `json:"sourceName"` + SourceVersion *string `json:"sourceVersion"` + PackageEpoch *string `json:"packageEpoch"` + PackageRelease *string `json:"packageRelease"` + Repository *string `json:"repository"` + Origin *string `json:"origin"` + InstallDate *time.Time `json:"installDate"` + FirstSeenAt time.Time `json:"firstSeenAt"` + LastSeenAt time.Time `json:"lastSeenAt"` + RemovedAt *time.Time `json:"removedAt"` + DeletedAt *time.Time `json:"deletedAt"` +} + +type InventorySnapshotResult struct { + Accepted bool `json:"accepted"` + SnapshotID string `json:"snapshotId"` + HostsAccepted int `json:"hostsAccepted"` + PackagesAccepted int `json:"packagesAccepted"` + RowsRejected int `json:"rowsRejected"` + NextAction string `json:"nextAction"` +} + +type InventorySnapshotApplyResult struct { + Response InventorySnapshotResult + Findings []Finding + Replayed bool +} + +type FindingStatus string + +const ( + FindingStatusOpen FindingStatus = "open" + FindingStatusResolved FindingStatus = "resolved" +) + +type Finding struct { + FindingID string `json:"findingId"` + HostID string `json:"hostId"` + SoftwarePackageID string `json:"softwarePackageId"` + CVEID string `json:"cveId"` + Status FindingStatus `json:"status"` + PackageName string `json:"packageName"` + InstalledVersion string `json:"installedVersion"` + FixedVersion string `json:"fixedVersion,omitempty"` + Source string `json:"source"` + Severity string `json:"severity"` + CVSSScore *float64 `json:"cvssScore"` + KnownExploited bool `json:"knownExploited"` + Confidence string `json:"confidence"` + MatchReason string `json:"matchReason,omitempty"` + FirstSeenAt time.Time `json:"firstSeenAt"` + LastSeenAt time.Time `json:"lastSeenAt"` + ResolvedAt *time.Time `json:"resolvedAt"` + CVE CVESummary `json:"cve"` + References []string `json:"references,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +type CVESummary struct { + Title string `json:"title"` + Description string `json:"description"` + PublishedAt *time.Time `json:"publishedAt"` + ModifiedAt *time.Time `json:"modifiedAt"` + Rejected bool `json:"rejected"` + KEVDueDate *time.Time `json:"kevDueDate"` + KEVVendorProject string `json:"kevVendorProject"` + KEVProduct string `json:"kevProduct"` + KEVRequiredAction string `json:"kevRequiredAction"` +} + +type FindingBatch struct { + ContractVersion string `json:"contractVersion"` + OrgID string `json:"orgId"` + BatchID string `json:"batchId"` + GeneratedAt time.Time `json:"generatedAt"` + Findings []Finding `json:"findings"` +} + +type ConnectionHealth struct { + Configured bool `json:"configured"` + Enabled bool `json:"enabled"` + LastInventoryPushAt *time.Time `json:"lastInventoryPushAt"` + LastFindingIngestAt *time.Time `json:"lastFindingIngestAt"` + LastHealthCheckAt *time.Time `json:"lastHealthCheckAt"` + LastErrorCode string `json:"lastErrorCode"` + LastErrorAt *time.Time `json:"lastErrorAt"` + ContractVersion string `json:"contractVersion"` +} diff --git a/internal/store/ctops.go b/internal/store/ctops.go new file mode 100644 index 0000000..eca5538 --- /dev/null +++ b/internal/store/ctops.go @@ -0,0 +1,627 @@ +package store + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/carrtech-dev/ct-cve/internal/ctops" + "github.com/carrtech-dev/ct-cve/internal/vuln" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" +) + +type affectedRecord struct { + Affected vuln.AffectedPackage + CVE ctops.CVESummary + CVSS *float64 + KEV bool +} + +func (s *PostgresStore) RememberNonce(ctx context.Context, tokenID, nonce string, expiresAt, now time.Time) (bool, error) { + if _, err := s.pool.Exec(ctx, `DELETE FROM ct_cve_service_nonces WHERE expires_at <= $1`, now); err != nil { + return false, err + } + tag, err := s.pool.Exec(ctx, ` + INSERT INTO ct_cve_service_nonces (token_id, nonce, expires_at) + VALUES ($1,$2,$3) + ON CONFLICT DO NOTHING + `, tokenID, nonce, expiresAt) + if err != nil { + return false, err + } + return tag.RowsAffected() == 1, nil +} + +func (s *PostgresStore) ApplyInventorySnapshot(ctx context.Context, snapshot ctops.InventorySnapshot, now time.Time) (ctops.InventorySnapshotApplyResult, error) { + tx, err := s.pool.Begin(ctx) + if err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } + defer tx.Rollback(ctx) + + if replayed, ok, err := getSnapshotResponse(ctx, tx, snapshot.OrgID, snapshot.SnapshotID); err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } else if ok { + return ctops.InventorySnapshotApplyResult{Response: replayed, Replayed: true}, tx.Commit(ctx) + } + + for _, host := range snapshot.Hosts { + if err := upsertCTOpsHost(ctx, tx, snapshot.OrgID, host); err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } + } + for _, pkg := range snapshot.Packages { + if err := upsertCTOpsPackage(ctx, tx, snapshot.OrgID, pkg); err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } + } + + findings, err := matchSnapshotFindings(ctx, tx, snapshot, now) + if err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } + + result := ctops.InventorySnapshotResult{ + Accepted: true, + SnapshotID: snapshot.SnapshotID, + HostsAccepted: len(snapshot.Hosts), + PackagesAccepted: len(snapshot.Packages), + RowsRejected: 0, + NextAction: "none", + } + if err := insertSnapshotResponse(ctx, tx, snapshot, result); err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } + if err := touchInventoryStatus(ctx, tx, snapshot.OrgID, now); err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } + if err := tx.Commit(ctx); err != nil { + return ctops.InventorySnapshotApplyResult{}, err + } + return ctops.InventorySnapshotApplyResult{Response: result, Findings: findings}, nil +} + +func (s *PostgresStore) RecordFindingDelivery(ctx context.Context, orgID, _ string, _ int) error { + _, err := s.pool.Exec(ctx, ` + INSERT INTO ct_ops_connection_status (org_id, configured, enabled, last_finding_ingest_at, last_error_code, last_error_at, updated_at) + VALUES ($1, true, true, NOW(), '', NULL, NOW()) + ON CONFLICT (org_id) DO UPDATE SET + configured = true, + enabled = true, + last_finding_ingest_at = EXCLUDED.last_finding_ingest_at, + last_error_code = '', + last_error_at = NULL, + updated_at = NOW() + `, orgID) + return err +} + +func (s *PostgresStore) RecordConnectionError(ctx context.Context, orgID, code string) error { + _, err := s.pool.Exec(ctx, ` + INSERT INTO ct_ops_connection_status (org_id, configured, enabled, last_error_code, last_error_at, updated_at) + VALUES ($1, true, true, $2, NOW(), NOW()) + ON CONFLICT (org_id) DO UPDATE SET + configured = true, + enabled = true, + last_error_code = EXCLUDED.last_error_code, + last_error_at = EXCLUDED.last_error_at, + updated_at = NOW() + `, orgID, code) + return err +} + +func (s *PostgresStore) UpdateInventorySnapshotResponse(ctx context.Context, orgID, snapshotID string, result ctops.InventorySnapshotResult) error { + raw, err := json.Marshal(result) + if err != nil { + return err + } + _, err = s.pool.Exec(ctx, ` + UPDATE ct_ops_inventory_snapshots + SET response_json = $3, updated_at = NOW() + WHERE org_id = $1 AND snapshot_id = $2 + `, orgID, snapshotID, raw) + return err +} + +func (s *PostgresStore) ConnectionHealth(ctx context.Context, orgID string) (ctops.ConnectionHealth, error) { + var health ctops.ConnectionHealth + var lastInventory, lastFinding, lastHealth, lastError pgtype.Timestamptz + err := s.pool.QueryRow(ctx, ` + SELECT configured, enabled, last_inventory_push_at, last_finding_ingest_at, + last_health_check_at, last_error_code, last_error_at + FROM ct_ops_connection_status + WHERE org_id = $1 + `, orgID).Scan( + &health.Configured, + &health.Enabled, + &lastInventory, + &lastFinding, + &lastHealth, + &health.LastErrorCode, + &lastError, + ) + if err == pgx.ErrNoRows { + return ctops.ConnectionHealth{Configured: true, Enabled: true, ContractVersion: ctops.ContractVersion}, nil + } + if err != nil { + return ctops.ConnectionHealth{}, err + } + if lastInventory.Valid { + health.LastInventoryPushAt = &lastInventory.Time + } + if lastFinding.Valid { + health.LastFindingIngestAt = &lastFinding.Time + } + if lastHealth.Valid { + health.LastHealthCheckAt = &lastHealth.Time + } + if lastError.Valid { + health.LastErrorAt = &lastError.Time + } + health.ContractVersion = ctops.ContractVersion + return health, nil +} + +func (s *PostgresStore) RecordConnectionHealth(ctx context.Context, orgID string) error { + _, err := s.pool.Exec(ctx, ` + INSERT INTO ct_ops_connection_status (org_id, configured, enabled, last_health_check_at, updated_at) + VALUES ($1, true, true, NOW(), NOW()) + ON CONFLICT (org_id) DO UPDATE SET + configured = true, + enabled = true, + last_health_check_at = EXCLUDED.last_health_check_at, + updated_at = NOW() + `, orgID) + return err +} + +func getSnapshotResponse(ctx context.Context, tx pgx.Tx, orgID, snapshotID string) (ctops.InventorySnapshotResult, bool, error) { + var raw []byte + err := tx.QueryRow(ctx, ` + SELECT response_json + FROM ct_ops_inventory_snapshots + WHERE org_id = $1 AND snapshot_id = $2 + `, orgID, snapshotID).Scan(&raw) + if err == pgx.ErrNoRows { + return ctops.InventorySnapshotResult{}, false, nil + } + if err != nil { + return ctops.InventorySnapshotResult{}, false, err + } + var result ctops.InventorySnapshotResult + if err := json.Unmarshal(raw, &result); err != nil { + return ctops.InventorySnapshotResult{}, false, err + } + return result, true, nil +} + +func insertSnapshotResponse(ctx context.Context, tx pgx.Tx, snapshot ctops.InventorySnapshot, result ctops.InventorySnapshotResult) error { + raw, err := json.Marshal(result) + if err != nil { + return err + } + _, err = tx.Exec(ctx, ` + INSERT INTO ct_ops_inventory_snapshots ( + org_id, snapshot_id, contract_version, snapshot_type, generated_at, cursor_value, + hosts_accepted, packages_accepted, rows_rejected, response_json, created_at, updated_at + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NOW(),NOW()) + `, snapshot.OrgID, snapshot.SnapshotID, snapshot.ContractVersion, snapshot.SnapshotType, snapshot.GeneratedAt, + snapshot.Cursor, result.HostsAccepted, result.PackagesAccepted, result.RowsRejected, raw) + return err +} + +func upsertCTOpsHost(ctx context.Context, tx pgx.Tx, orgID string, host ctops.InventoryHost) error { + ipAddresses, err := json.Marshal(host.IPAddresses) + if err != nil { + return err + } + status := strings.TrimSpace(host.Status) + if status == "" { + status = "unknown" + } + _, err = tx.Exec(ctx, ` + INSERT INTO ct_ops_hosts ( + org_id, host_id, agent_id, hostname, display_name, os, os_version, arch, + ip_addresses, status, last_seen_at, deleted_at, created_at, updated_at + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,NOW(),NOW()) + ON CONFLICT (org_id, host_id) DO UPDATE SET + agent_id = EXCLUDED.agent_id, + hostname = EXCLUDED.hostname, + display_name = EXCLUDED.display_name, + os = EXCLUDED.os, + os_version = EXCLUDED.os_version, + arch = EXCLUDED.arch, + ip_addresses = EXCLUDED.ip_addresses, + status = EXCLUDED.status, + last_seen_at = EXCLUDED.last_seen_at, + deleted_at = EXCLUDED.deleted_at, + updated_at = NOW() + `, orgID, host.HostID, host.AgentID, host.Hostname, host.DisplayName, host.OS, host.OSVersion, host.Arch, + ipAddresses, status, host.LastSeenAt, host.DeletedAt) + return err +} + +func upsertCTOpsPackage(ctx context.Context, tx pgx.Tx, orgID string, pkg ctops.InventoryPackage) error { + _, err := tx.Exec(ctx, ` + INSERT INTO ct_ops_software_packages ( + org_id, package_id, host_id, name, version, architecture, source, fingerprint, + distro_id, distro_version_id, distro_codename, distro_id_like, source_name, source_version, + package_epoch, package_release, repository, origin, install_date, first_seen_at, last_seen_at, + removed_at, deleted_at, created_at, updated_at + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,NOW(),NOW()) + ON CONFLICT (org_id, package_id) DO UPDATE SET + host_id = EXCLUDED.host_id, + name = EXCLUDED.name, + version = EXCLUDED.version, + architecture = EXCLUDED.architecture, + source = EXCLUDED.source, + fingerprint = EXCLUDED.fingerprint, + distro_id = EXCLUDED.distro_id, + distro_version_id = EXCLUDED.distro_version_id, + distro_codename = EXCLUDED.distro_codename, + distro_id_like = EXCLUDED.distro_id_like, + source_name = EXCLUDED.source_name, + source_version = EXCLUDED.source_version, + package_epoch = EXCLUDED.package_epoch, + package_release = EXCLUDED.package_release, + repository = EXCLUDED.repository, + origin = EXCLUDED.origin, + install_date = EXCLUDED.install_date, + first_seen_at = LEAST(ct_ops_software_packages.first_seen_at, EXCLUDED.first_seen_at), + last_seen_at = EXCLUDED.last_seen_at, + removed_at = EXCLUDED.removed_at, + deleted_at = EXCLUDED.deleted_at, + updated_at = NOW() + `, orgID, pkg.SoftwarePackageID, pkg.HostID, pkg.Name, pkg.Version, pkg.Architecture, pkg.Source, pkg.Fingerprint, + pkg.DistroID, pkg.DistroVersionID, pkg.DistroCodename, pkg.DistroIDLike, pkg.SourceName, pkg.SourceVersion, + pkg.PackageEpoch, pkg.PackageRelease, pkg.Repository, pkg.Origin, pkg.InstallDate, pkg.FirstSeenAt, + pkg.LastSeenAt, pkg.RemovedAt, pkg.DeletedAt) + return err +} + +func matchSnapshotFindings(ctx context.Context, tx pgx.Tx, snapshot ctops.InventorySnapshot, now time.Time) ([]ctops.Finding, error) { + candidates, err := affectedCandidates(ctx, tx, snapshot.Packages) + if err != nil { + return nil, err + } + hostDeleted := make(map[string]bool, len(snapshot.Hosts)) + for _, host := range snapshot.Hosts { + hostDeleted[host.HostID] = host.DeletedAt != nil + } + + touchedPackageIDs := make([]string, 0, len(snapshot.Packages)) + matchedKeys := map[string]bool{} + findings := make([]ctops.Finding, 0) + for _, pkg := range snapshot.Packages { + touchedPackageIDs = append(touchedPackageIDs, pkg.SoftwarePackageID) + if pkg.RemovedAt != nil || pkg.DeletedAt != nil || hostDeleted[pkg.HostID] { + continue + } + inventory := vuln.InventoryPackage{ + ID: pkg.SoftwarePackageID, + OrganisationID: snapshot.OrgID, + HostID: pkg.HostID, + Name: pkg.Name, + Version: pkg.Version, + Source: pkg.Source, + DistroID: stringPtrValue(pkg.DistroID), + DistroIDLike: pkg.DistroIDLike, + DistroVersionID: stringPtrValue(pkg.DistroVersionID), + DistroCodename: stringPtrValue(pkg.DistroCodename), + SourceName: stringPtrValue(pkg.SourceName), + SourceVersion: stringPtrValue(pkg.SourceVersion), + Repository: stringPtrValue(pkg.Repository), + } + for _, candidate := range candidates { + matched, reason := vuln.MatchPackage(inventory, candidate.Affected) + if !matched { + continue + } + finding := findingFromMatch(snapshot.OrgID, pkg, candidate, reason, now, nil) + matchedKeys[findingKey(finding.HostID, finding.SoftwarePackageID, finding.CVEID)] = true + if err := upsertOpenFinding(ctx, tx, snapshot.OrgID, finding, now); err != nil { + return nil, err + } + findings = append(findings, finding) + } + } + + resolved, err := resolveStaleFindings(ctx, tx, snapshot.OrgID, touchedPackageIDs, matchedKeys, now) + if err != nil { + return nil, err + } + findings = append(findings, resolved...) + return findings, nil +} + +func affectedCandidates(ctx context.Context, tx pgx.Tx, packages []ctops.InventoryPackage) ([]affectedRecord, error) { + names := make([]string, 0, len(packages)*2) + seen := map[string]bool{} + for _, pkg := range packages { + for _, name := range []string{pkg.Name, stringPtrValue(pkg.SourceName)} { + if name == "" || seen[name] { + continue + } + seen[name] = true + names = append(names, name) + } + } + if len(names) == 0 { + return nil, nil + } + rows, err := tx.Query(ctx, ` + SELECT ap.cve_id, ap.source, ap.distro_id, ap.distro_version_id, ap.distro_codename, + ap.package_name, ap.source_package_name, ap.fixed_version, ap.affected_versions, + ap.repository, ap.severity, ap.package_state, ap.metadata_json, + cr.title, cr.description, cr.cvss_score::float8, cr.published_at, cr.modified_at, + cr.rejected, cr.known_exploited, cr.kev_due_date::timestamptz, cr.kev_vendor_project, cr.kev_product, + cr.kev_required_action + FROM affected_packages ap + JOIN cve_records cr ON cr.cve_id = ap.cve_id + WHERE ap.package_name = ANY($1) OR ap.source_package_name = ANY($1) + `, names) + if err != nil { + return nil, err + } + defer rows.Close() + + var candidates []affectedRecord + for rows.Next() { + var row affectedRecord + var severity string + var cvss pgtype.Float8 + var published, modified, kevDue pgtype.Timestamptz + if err := rows.Scan( + &row.Affected.CVEID, + &row.Affected.Source, + &row.Affected.DistroID, + &row.Affected.DistroVersionID, + &row.Affected.DistroCodename, + &row.Affected.PackageName, + &row.Affected.SourcePackageName, + &row.Affected.FixedVersion, + &row.Affected.AffectedVersions, + &row.Affected.Repository, + &severity, + &row.Affected.PackageState, + &row.Affected.MetadataJSON, + &row.CVE.Title, + &row.CVE.Description, + &cvss, + &published, + &modified, + &row.CVE.Rejected, + &row.KEV, + &kevDue, + &row.CVE.KEVVendorProject, + &row.CVE.KEVProduct, + &row.CVE.KEVRequiredAction, + ); err != nil { + return nil, err + } + row.Affected.Severity = vuln.Severity(severity) + if cvss.Valid { + value := cvss.Float64 + row.CVSS = &value + } + if published.Valid { + row.CVE.PublishedAt = &published.Time + } + if modified.Valid { + row.CVE.ModifiedAt = &modified.Time + } + if kevDue.Valid { + row.CVE.KEVDueDate = &kevDue.Time + } + candidates = append(candidates, row) + } + return candidates, rows.Err() +} + +func findingFromMatch(orgID string, pkg ctops.InventoryPackage, candidate affectedRecord, reason string, now time.Time, resolvedAt *time.Time) ctops.Finding { + status := ctops.FindingStatusOpen + if resolvedAt != nil { + status = ctops.FindingStatusResolved + } + severity := string(candidate.Affected.Severity) + if severity == "" { + severity = "unknown" + } + return ctops.Finding{ + FindingID: stableFindingID(orgID, pkg.HostID, pkg.SoftwarePackageID, candidate.Affected.CVEID), + HostID: pkg.HostID, + SoftwarePackageID: pkg.SoftwarePackageID, + CVEID: candidate.Affected.CVEID, + Status: status, + PackageName: pkg.Name, + InstalledVersion: pkg.Version, + FixedVersion: candidate.Affected.FixedVersion, + Source: candidate.Affected.Source, + Severity: severity, + CVSSScore: candidate.CVSS, + KnownExploited: candidate.KEV, + Confidence: "confirmed", + MatchReason: reason, + FirstSeenAt: now, + LastSeenAt: now, + ResolvedAt: resolvedAt, + CVE: candidate.CVE, + } +} + +func upsertOpenFinding(ctx context.Context, tx pgx.Tx, orgID string, finding ctops.Finding, now time.Time) error { + metadata, err := json.Marshal(map[string]any{"ctCveFindingId": finding.FindingID}) + if err != nil { + return err + } + _, err = tx.Exec(ctx, ` + INSERT INTO integration_findings ( + id, finding_id, org_id, host_id, package_id, cve_id, confidence, reason, + first_seen_at, last_seen_at, resolved_at, package_name, installed_version, + fixed_version, source, severity, cvss_score, known_exploited, metadata_json, + created_at, updated_at + ) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,NULL,$11,$12,$13,$14,$15,$16,$17,$18,NOW(),NOW()) + ON CONFLICT (org_id, host_id, package_id, cve_id) DO UPDATE SET + finding_id = EXCLUDED.finding_id, + confidence = EXCLUDED.confidence, + reason = EXCLUDED.reason, + first_seen_at = LEAST(integration_findings.first_seen_at, EXCLUDED.first_seen_at), + last_seen_at = EXCLUDED.last_seen_at, + resolved_at = NULL, + package_name = EXCLUDED.package_name, + installed_version = EXCLUDED.installed_version, + fixed_version = EXCLUDED.fixed_version, + source = EXCLUDED.source, + severity = EXCLUDED.severity, + cvss_score = EXCLUDED.cvss_score, + known_exploited = EXCLUDED.known_exploited, + metadata_json = EXCLUDED.metadata_json, + updated_at = NOW() + `, randomUUID(), finding.FindingID, orgID, finding.HostID, finding.SoftwarePackageID, finding.CVEID, + finding.Confidence, finding.MatchReason, finding.FirstSeenAt, finding.LastSeenAt, finding.PackageName, + finding.InstalledVersion, finding.FixedVersion, finding.Source, finding.Severity, finding.CVSSScore, + finding.KnownExploited, metadata) + _ = now + return err +} + +func resolveStaleFindings(ctx context.Context, tx pgx.Tx, orgID string, packageIDs []string, matchedKeys map[string]bool, now time.Time) ([]ctops.Finding, error) { + if len(packageIDs) == 0 { + return nil, nil + } + rows, err := tx.Query(ctx, ` + SELECT f.finding_id, f.host_id, f.package_id, f.cve_id, f.package_name, f.installed_version, + COALESCE(f.fixed_version, ''), f.source, f.severity, f.cvss_score::float8, f.known_exploited, + f.confidence, f.reason, f.first_seen_at, f.last_seen_at, + cr.title, cr.description, cr.published_at, cr.modified_at, cr.rejected, cr.known_exploited, + cr.kev_due_date::timestamptz, cr.kev_vendor_project, cr.kev_product, cr.kev_required_action + FROM integration_findings f + JOIN cve_records cr ON cr.cve_id = f.cve_id + WHERE f.org_id = $1 AND f.package_id = ANY($2) AND f.resolved_at IS NULL + `, orgID, packageIDs) + if err != nil { + return nil, err + } + defer rows.Close() + + var resolved []ctops.Finding + for rows.Next() { + var finding ctops.Finding + var cvss pgtype.Float8 + var published, modified, kevDue pgtype.Timestamptz + var cveKnownExploited bool + if err := rows.Scan( + &finding.FindingID, + &finding.HostID, + &finding.SoftwarePackageID, + &finding.CVEID, + &finding.PackageName, + &finding.InstalledVersion, + &finding.FixedVersion, + &finding.Source, + &finding.Severity, + &cvss, + &finding.KnownExploited, + &finding.Confidence, + &finding.MatchReason, + &finding.FirstSeenAt, + &finding.LastSeenAt, + &finding.CVE.Title, + &finding.CVE.Description, + &published, + &modified, + &finding.CVE.Rejected, + &cveKnownExploited, + &kevDue, + &finding.CVE.KEVVendorProject, + &finding.CVE.KEVProduct, + &finding.CVE.KEVRequiredAction, + ); err != nil { + return nil, err + } + if matchedKeys[findingKey(finding.HostID, finding.SoftwarePackageID, finding.CVEID)] { + continue + } + if cvss.Valid { + value := cvss.Float64 + finding.CVSSScore = &value + } + if published.Valid { + finding.CVE.PublishedAt = &published.Time + } + if modified.Valid { + finding.CVE.ModifiedAt = &modified.Time + } + if kevDue.Valid { + finding.CVE.KEVDueDate = &kevDue.Time + } + resolvedAt := now + finding.Status = ctops.FindingStatusResolved + finding.ResolvedAt = &resolvedAt + finding.LastSeenAt = now + resolved = append(resolved, finding) + } + if err := rows.Err(); err != nil { + return nil, err + } + for _, finding := range resolved { + if _, err := tx.Exec(ctx, ` + UPDATE integration_findings + SET resolved_at = $5, last_seen_at = $5, updated_at = NOW() + WHERE org_id = $1 AND host_id = $2 AND package_id = $3 AND cve_id = $4 + `, orgID, finding.HostID, finding.SoftwarePackageID, finding.CVEID, now); err != nil { + return nil, err + } + } + return resolved, nil +} + +func touchInventoryStatus(ctx context.Context, tx pgx.Tx, orgID string, now time.Time) error { + _, err := tx.Exec(ctx, ` + INSERT INTO ct_ops_connection_status (org_id, configured, enabled, last_inventory_push_at, updated_at) + VALUES ($1, true, true, $2, NOW()) + ON CONFLICT (org_id) DO UPDATE SET + configured = true, + enabled = true, + last_inventory_push_at = EXCLUDED.last_inventory_push_at, + updated_at = NOW() + `, orgID, now) + return err +} + +func stableFindingID(orgID, hostID, packageID, cveID string) string { + sum := sha256.Sum256([]byte(strings.Join([]string{orgID, hostID, packageID, cveID}, "\x00"))) + return "ctcve_find_" + hex.EncodeToString(sum[:])[:24] +} + +func findingKey(hostID, packageID, cveID string) string { + return hostID + "\x00" + packageID + "\x00" + cveID +} + +func randomUUID() string { + var raw [16]byte + if _, err := rand.Read(raw[:]); err != nil { + sum := sha256.Sum256([]byte(fmt.Sprintf("%d", time.Now().UnixNano()))) + copy(raw[:], sum[:16]) + } + raw[6] = (raw[6] & 0x0f) | 0x40 + raw[8] = (raw[8] & 0x3f) | 0x80 + return fmt.Sprintf("%x-%x-%x-%x-%x", raw[0:4], raw[4:6], raw[6:8], raw[8:10], raw[10:16]) +} + +func stringPtrValue(value *string) string { + if value == nil { + return "" + } + return *value +} diff --git a/migrations/0005_ct_ops_connector.sql b/migrations/0005_ct_ops_connector.sql new file mode 100644 index 0000000..24592c5 --- /dev/null +++ b/migrations/0005_ct_ops_connector.sql @@ -0,0 +1,105 @@ +CREATE TABLE ct_ops_hosts ( + org_id text NOT NULL, + host_id text NOT NULL, + agent_id text, + hostname text NOT NULL, + display_name text, + os text, + os_version text, + arch text, + ip_addresses jsonb NOT NULL DEFAULT '[]'::jsonb, + status text NOT NULL DEFAULT 'unknown', + last_seen_at timestamptz, + deleted_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (org_id, host_id) +); + +CREATE INDEX ct_ops_hosts_org_status_idx + ON ct_ops_hosts (org_id, status); + +CREATE TABLE ct_ops_software_packages ( + org_id text NOT NULL, + package_id text NOT NULL, + host_id text NOT NULL, + name text NOT NULL, + version text NOT NULL, + architecture text, + source text NOT NULL, + fingerprint text NOT NULL DEFAULT '', + distro_id text, + distro_version_id text, + distro_codename text, + distro_id_like text[] NOT NULL DEFAULT ARRAY[]::text[], + source_name text, + source_version text, + package_epoch text, + package_release text, + repository text, + origin text, + install_date timestamptz, + first_seen_at timestamptz NOT NULL, + last_seen_at timestamptz NOT NULL, + removed_at timestamptz, + deleted_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (org_id, package_id) +); + +CREATE INDEX ct_ops_packages_org_host_idx + ON ct_ops_software_packages (org_id, host_id); + +CREATE INDEX ct_ops_packages_match_idx + ON ct_ops_software_packages (source, distro_id, distro_codename, source_name, name); + +CREATE TABLE ct_ops_inventory_snapshots ( + org_id text NOT NULL, + snapshot_id text NOT NULL, + contract_version text NOT NULL, + snapshot_type text NOT NULL, + generated_at timestamptz NOT NULL, + cursor_value text, + hosts_accepted integer NOT NULL DEFAULT 0, + packages_accepted integer NOT NULL DEFAULT 0, + rows_rejected integer NOT NULL DEFAULT 0, + response_json jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (org_id, snapshot_id) +); + +CREATE TABLE ct_cve_service_nonces ( + token_id text NOT NULL, + nonce text NOT NULL, + expires_at timestamptz NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (token_id, nonce) +); + +CREATE INDEX ct_cve_service_nonces_expires_at_idx + ON ct_cve_service_nonces (expires_at); + +CREATE TABLE ct_ops_connection_status ( + org_id text PRIMARY KEY, + configured boolean NOT NULL DEFAULT true, + enabled boolean NOT NULL DEFAULT true, + last_inventory_push_at timestamptz, + last_finding_ingest_at timestamptz, + last_health_check_at timestamptz, + last_error_code text NOT NULL DEFAULT '', + last_error_at timestamptz, + updated_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE integration_findings + ADD COLUMN finding_id text NOT NULL DEFAULT '', + ADD COLUMN package_name text NOT NULL DEFAULT '', + ADD COLUMN installed_version text NOT NULL DEFAULT '', + ADD COLUMN fixed_version text, + ADD COLUMN source text NOT NULL DEFAULT '', + ADD COLUMN severity text NOT NULL DEFAULT 'unknown', + ADD COLUMN cvss_score numeric(3,1), + ADD COLUMN known_exploited boolean NOT NULL DEFAULT false, + ADD COLUMN metadata_json jsonb NOT NULL DEFAULT '{}'::jsonb;