diff --git a/cmd/jetmon2/rollout.go b/cmd/jetmon2/rollout.go index 93cde47..9f51c36 100644 --- a/cmd/jetmon2/rollout.go +++ b/cmd/jetmon2/rollout.go @@ -46,8 +46,9 @@ type activityCheckDeps struct { } type projectionDriftDeps struct { - CountLegacyProjectionDrift func(context.Context, int, int) (int, error) - ListLegacyProjectionDrift func(context.Context, int, int, int) ([]db.ProjectionDriftRow, error) + CountLegacyProjectionDrift func(context.Context, int, int) (int, error) + ListLegacyProjectionDrift func(context.Context, int, int, int) ([]db.ProjectionDriftRow, error) + SummarizeLegacyProjectionDrift func(context.Context, int, int, int) ([]db.ProjectionDriftSummaryRow, error) } type cutoverCheckDeps struct { @@ -547,8 +548,9 @@ func cmdRolloutCutoverCheck(args []string) { CountRecentlyCheckedActiveSitesForRange: db.CountRecentlyCheckedActiveSitesForBucketRange, }, Projection: projectionDriftDeps{ - CountLegacyProjectionDrift: db.CountLegacyProjectionDrift, - ListLegacyProjectionDrift: db.ListLegacyProjectionDrift, + CountLegacyProjectionDrift: db.CountLegacyProjectionDrift, + ListLegacyProjectionDrift: db.ListLegacyProjectionDrift, + SummarizeLegacyProjectionDrift: db.SummarizeLegacyProjectionDrift, }, Status: dashboardStatus, } @@ -723,8 +725,9 @@ func cmdRolloutProjectionDrift(args []string) { fmt.Fprintln(out, "PASS db connect") deps := projectionDriftDeps{ - CountLegacyProjectionDrift: db.CountLegacyProjectionDrift, - ListLegacyProjectionDrift: db.ListLegacyProjectionDrift, + CountLegacyProjectionDrift: db.CountLegacyProjectionDrift, + ListLegacyProjectionDrift: db.ListLegacyProjectionDrift, + SummarizeLegacyProjectionDrift: db.SummarizeLegacyProjectionDrift, } return runProjectionDriftReport(context.Background(), out, config.Get(), *bucketMin, *bucketMax, *limit, deps) }); err != nil { @@ -1101,8 +1104,9 @@ func defaultGuidedRolloutDeps() guidedRolloutDeps { CountRecentlyCheckedActiveSitesForRange: db.CountRecentlyCheckedActiveSitesForBucketRange, }, Projection: projectionDriftDeps{ - CountLegacyProjectionDrift: db.CountLegacyProjectionDrift, - ListLegacyProjectionDrift: db.ListLegacyProjectionDrift, + CountLegacyProjectionDrift: db.CountLegacyProjectionDrift, + ListLegacyProjectionDrift: db.ListLegacyProjectionDrift, + SummarizeLegacyProjectionDrift: db.SummarizeLegacyProjectionDrift, }, Status: dashboardStatus, } @@ -3080,6 +3084,8 @@ func formatHostRows(hosts []db.HostRow) string { return strings.Join(parts, ", ") } +const defaultProjectionDriftSummaryLimit = 20 + func runProjectionDriftReport(ctx context.Context, out io.Writer, cfg *config.Config, bucketMin, bucketMax, limit int, deps projectionDriftDeps) error { if cfg == nil { return errors.New("config is not loaded") @@ -3106,6 +3112,29 @@ func runProjectionDriftReport(ctx context.Context, out io.Writer, cfg *config.Co fmt.Fprintln(out, "PASS legacy_projection_drift=0") return nil } + fmt.Fprintf(out, "WARN legacy_projection_drift_requires_manual_review=%d\n", count) + fmt.Fprintln(out, `WARN projection_drift_next_step="review the summary first, then inspect listed event rows before making any site_status repair"`) + + if deps.SummarizeLegacyProjectionDrift == nil { + return errors.New("projection drift summarizer is not configured") + } + summaries, err := deps.SummarizeLegacyProjectionDrift(ctx, minBucket, maxBucket, count) + if err != nil { + return fmt.Errorf("summarize legacy projection drift in range %d-%d: %w", minBucket, maxBucket, err) + } + visibleSummaries := firstProjectionDriftSummaries(summaries, defaultProjectionDriftSummaryLimit) + fmt.Fprintln(out, "## projection drift summary") + printProjectionDriftSummaries(out, visibleSummaries) + visibleSummaryCount := sumProjectionDriftSummaryRows(visibleSummaries) + totalSummaryCount := sumProjectionDriftSummaryRows(summaries) + if len(summaries) > len(visibleSummaries) { + fmt.Fprintf(out, "INFO projection_drift_summary_groups_truncated=%d\n", len(summaries)-len(visibleSummaries)) + fmt.Fprintf(out, "INFO projection_drift_summary_rows_hidden=%d\n", totalSummaryCount-visibleSummaryCount) + } + if totalSummaryCount != count { + fmt.Fprintf(out, `WARN projection_drift_count_changed=%d summarized=%d note="drift changed while the report was running; rerun projection-drift before repair"`+"\n", count, totalSummaryCount) + } + printProjectionDriftCauseGuidance(out, summaries) if deps.ListLegacyProjectionDrift == nil { return errors.New("projection drift lister is not configured") @@ -3114,10 +3143,13 @@ func runProjectionDriftReport(ctx context.Context, out io.Writer, cfg *config.Co if err != nil { return fmt.Errorf("list legacy projection drift in range %d-%d: %w", minBucket, maxBucket, err) } + fmt.Fprintln(out, "## projection drift rows") printProjectionDriftRows(out, rows) if count > len(rows) { fmt.Fprintf(out, "INFO projection_drift_rows_truncated=%d\n", count-len(rows)) } + fmt.Fprintln(out, "INFO projection_drift_repair=manual_confirmation_required") + fmt.Fprintln(out, "INFO projection_drift_repair_guidance=confirm the authoritative event rows first; then repair the legacy site_status projection inside a reviewed DB change or by rerunning the code path that writes the event and projection together") return fmt.Errorf("legacy projection drift=%d in range %d-%d", count, minBucket, maxBucket) } @@ -3177,20 +3209,157 @@ func resolveExplicitRolloutBucketRange(cfg *config.Config, bucketMin, bucketMax } func printProjectionDriftRows(out io.Writer, rows []db.ProjectionDriftRow) { - fmt.Fprintf(out, "%-12s %-8s %-11s %-9s %-10s %s\n", - "BLOG_ID", "BUCKET", "SITE_STATUS", "EXPECTED", "EVENT_ID", "EVENT_STATE") + fmt.Fprintf(out, "%-12s %-8s %-22s %-22s %-10s %-11s %-35s %s\n", + "BLOG_ID", "BUCKET", "SITE_STATUS", "EXPECTED", "EVENT_ID", "OPEN_EVENTS", "CAUSE", "EVENT_STATE") for _, row := range rows { - fmt.Fprintf(out, "%-12d %-8d %-11d %-9d %-10s %s\n", + cause := classifyProjectionDriftCause(row.SiteStatus, row.ExpectedStatus, row.EventState, row.OpenEventCount) + fmt.Fprintf(out, "%-12d %-8d %-22s %-22s %-10s %-11d %-35s %s\n", row.BlogID, row.BucketNo, - row.SiteStatus, - row.ExpectedStatus, + formatLegacySiteStatus(row.SiteStatus), + formatLegacySiteStatus(row.ExpectedStatus), formatOptionalInt(row.EventID), + row.OpenEventCount, + cause.Code, formatOptionalString(row.EventState), ) } } +func printProjectionDriftSummaries(out io.Writer, summaries []db.ProjectionDriftSummaryRow) { + if len(summaries) == 0 { + fmt.Fprintln(out, "INFO projection_drift_summary=none") + return + } + fmt.Fprintf(out, "%-8s %-7s %-22s %-22s %-11s %-12s %-35s %s\n", + "BUCKET", "COUNT", "SITE_STATUS", "EXPECTED", "OPEN_EVENTS", "SAMPLE_BLOG", "CAUSE", "EVENT_STATE") + for _, row := range summaries { + cause := classifyProjectionDriftCause(row.SiteStatus, row.ExpectedStatus, row.EventState, row.MaxOpenEventCount) + fmt.Fprintf(out, "%-8d %-7d %-22s %-22s %-11d %-12d %-35s %s\n", + row.BucketNo, + row.DriftCount, + formatLegacySiteStatus(row.SiteStatus), + formatLegacySiteStatus(row.ExpectedStatus), + row.MaxOpenEventCount, + row.SampleBlogID, + cause.Code, + formatOptionalString(row.EventState), + ) + } +} + +func sumProjectionDriftSummaryRows(summaries []db.ProjectionDriftSummaryRow) int { + total := 0 + for _, row := range summaries { + total += row.DriftCount + } + return total +} + +func firstProjectionDriftSummaries(summaries []db.ProjectionDriftSummaryRow, limit int) []db.ProjectionDriftSummaryRow { + if limit <= 0 || len(summaries) <= limit { + return summaries + } + return summaries[:limit] +} + +type projectionDriftCause struct { + Code string + Action string +} + +func classifyProjectionDriftCause(siteStatus, expectedStatus int, eventState *string, openEventCount int) projectionDriftCause { + switch { + case !isKnownLegacySiteStatus(siteStatus) || !isKnownLegacySiteStatus(expectedStatus): + return projectionDriftCause{ + Code: "unexpected_projection_value", + Action: "the legacy site_status value is outside the expected v1 projection shape; inspect and repair the site row manually", + } + case openEventCount > 1: + return projectionDriftCause{ + Code: "multiple_open_http_events", + Action: "inspect duplicate open HTTP events before repairing the legacy projection; the site-level projection uses the worst open HTTP state", + } + case expectedStatus == 1 && siteStatus != 1: + return projectionDriftCause{ + Code: "stale_legacy_down_projection", + Action: "the legacy site row still reports downtime even though no open HTTP downtime event requires it; inspect recent close transitions before setting the projection back to running", + } + case expectedStatus == 2 && siteStatus == 1: + return projectionDriftCause{ + Code: "missing_confirmed_down_projection", + Action: "an open Down event exists but the legacy site row still reports running; inspect the eventstore transaction path before continuing rollout", + } + case expectedStatus == 0 && siteStatus == 1: + return projectionDriftCause{ + Code: "missing_seems_down_projection", + Action: "an open Seems Down event exists but the legacy site row still reports running; inspect first-failure projection writes and local retry handling", + } + case expectedStatus == 2 && siteStatus == 0: + return projectionDriftCause{ + Code: "missing_confirmed_promotion", + Action: "the event reached Down but the legacy projection stayed Seems Down; inspect verifier-confirmed promotion writes", + } + case expectedStatus == 0 && siteStatus == 2: + return projectionDriftCause{ + Code: "stale_confirmed_down_projection", + Action: "the legacy site row reports confirmed down while the open event is only Seems Down; inspect recovery or false-alarm transition history", + } + case eventState != nil && *eventState != "": + return projectionDriftCause{ + Code: "unexpected_open_event_projection", + Action: "the open event and legacy projection disagree in a non-standard way; inspect the event row, transition history, and site row before repair", + } + } + return projectionDriftCause{ + Code: "unexpected_projection_value", + Action: "the legacy site_status value is outside the expected v1 projection shape; inspect and repair the site row manually", + } +} + +func isKnownLegacySiteStatus(status int) bool { + return status == 0 || status == 1 || status == 2 +} + +func printProjectionDriftCauseGuidance(out io.Writer, summaries []db.ProjectionDriftSummaryRow) { + causes := map[string]projectionDriftCause{} + counts := map[string]int{} + for _, row := range summaries { + cause := classifyProjectionDriftCause(row.SiteStatus, row.ExpectedStatus, row.EventState, row.MaxOpenEventCount) + causes[cause.Code] = cause + counts[cause.Code] += row.DriftCount + } + if len(counts) == 0 { + return + } + codes := make([]string, 0, len(counts)) + for code := range counts { + codes = append(codes, code) + } + sort.Slice(codes, func(i, j int) bool { + if counts[codes[i]] != counts[codes[j]] { + return counts[codes[i]] > counts[codes[j]] + } + return codes[i] < codes[j] + }) + for _, code := range codes { + fmt.Fprintf(out, "WARN projection_drift_cause=%s count=%d action=%q\n", code, counts[code], causes[code].Action) + } +} + +func formatLegacySiteStatus(status int) string { + switch status { + case 0: + return "0/SITE_DOWN" + case 1: + return "1/SITE_RUNNING" + case 2: + return "2/SITE_CONFIRMED_DOWN" + default: + return fmt.Sprintf("%d/UNKNOWN", status) + } +} + func formatOptionalInt(v *int64) string { if v == nil { return "-" @@ -3202,5 +3371,14 @@ func formatOptionalString(v *string) string { if v == nil || *v == "" { return "-" } - return *v + return sanitizeRolloutTableString(*v) +} + +func sanitizeRolloutTableString(v string) string { + return strings.Map(func(r rune) rune { + if r < 0x20 || r == 0x7f { + return '?' + } + return r + }, v) } diff --git a/cmd/jetmon2/rollout_test.go b/cmd/jetmon2/rollout_test.go index 4fe8c41..9cadfb0 100644 --- a/cmd/jetmon2/rollout_test.go +++ b/cmd/jetmon2/rollout_test.go @@ -1902,6 +1902,11 @@ func TestRunCutoverCheckFailures(t *testing.T) { deps.Projection.CountLegacyProjectionDrift = func(context.Context, int, int) (int, error) { return 2, nil } + deps.Projection.SummarizeLegacyProjectionDrift = func(context.Context, int, int, int) ([]db.ProjectionDriftSummaryRow, error) { + return []db.ProjectionDriftSummaryRow{ + {BucketNo: 0, SiteStatus: 1, ExpectedStatus: 2, DriftCount: 2, SampleBlogID: 42}, + }, nil + } deps.Projection.ListLegacyProjectionDrift = func(context.Context, int, int, int) ([]db.ProjectionDriftRow, error) { return nil, nil } @@ -2757,12 +2762,20 @@ func TestRunProjectionDriftReportListsRowsAndFails(t *testing.T) { CountLegacyProjectionDrift: func(context.Context, int, int) (int, error) { return 2, nil }, + SummarizeLegacyProjectionDrift: func(_ context.Context, min, max, limit int) ([]db.ProjectionDriftSummaryRow, error) { + if min != 2 || max != 4 || limit != 2 { + t.Fatalf("summary args = %d-%d limit=%d, want 2-4 limit=2", min, max, limit) + } + return []db.ProjectionDriftSummaryRow{ + {BucketNo: 3, SiteStatus: 1, ExpectedStatus: 2, EventState: &eventState, MaxOpenEventCount: 1, DriftCount: 2, SampleBlogID: 42}, + }, nil + }, ListLegacyProjectionDrift: func(_ context.Context, min, max, limit int) ([]db.ProjectionDriftRow, error) { if min != 2 || max != 4 || limit != 1 { t.Fatalf("list args = %d-%d limit=%d, want 2-4 limit=1", min, max, limit) } return []db.ProjectionDriftRow{ - {BlogID: 42, BucketNo: 3, SiteStatus: 1, ExpectedStatus: 2, EventID: &eventID, EventState: &eventState}, + {BlogID: 42, BucketNo: 3, SiteStatus: 1, ExpectedStatus: 2, EventID: &eventID, EventState: &eventState, OpenEventCount: 1}, }, nil }, } @@ -2776,10 +2789,16 @@ func TestRunProjectionDriftReportListsRowsAndFails(t *testing.T) { t.Fatalf("error = %q, want drift count", err.Error()) } for _, want := range []string{ + "WARN legacy_projection_drift_requires_manual_review=2", + "projection_drift_next_step=", + "SAMPLE_BLOG", + "missing_confirmed_down_projection", + "projection_drift_cause=missing_confirmed_down_projection count=2", "BLOG_ID", "42", "Down", "INFO projection_drift_rows_truncated=1", + "INFO projection_drift_repair=manual_confirmation_required", } { if !strings.Contains(out.String(), want) { t.Fatalf("output missing %q:\n%s", want, out.String()) @@ -2787,6 +2806,94 @@ func TestRunProjectionDriftReportListsRowsAndFails(t *testing.T) { } } +func TestRunProjectionDriftReportUsesAllSummariesForCauseGuidance(t *testing.T) { + cfg := dynamicRolloutTestConfig() + deps := projectionDriftDeps{ + CountLegacyProjectionDrift: func(context.Context, int, int) (int, error) { + return defaultProjectionDriftSummaryLimit + 1, nil + }, + SummarizeLegacyProjectionDrift: func(context.Context, int, int, int) ([]db.ProjectionDriftSummaryRow, error) { + var summaries []db.ProjectionDriftSummaryRow + for i := range defaultProjectionDriftSummaryLimit { + summaries = append(summaries, db.ProjectionDriftSummaryRow{ + BucketNo: i, + SiteStatus: 1, + ExpectedStatus: 2, + DriftCount: 1, + SampleBlogID: int64(100 + i), + }) + } + summaries = append(summaries, db.ProjectionDriftSummaryRow{ + BucketNo: 99, + SiteStatus: 0, + ExpectedStatus: 1, + DriftCount: 1, + SampleBlogID: 999, + }) + return summaries, nil + }, + ListLegacyProjectionDrift: func(context.Context, int, int, int) ([]db.ProjectionDriftRow, error) { + return nil, nil + }, + } + + var out bytes.Buffer + err := runProjectionDriftReport(context.Background(), &out, cfg, 0, 9, 1, deps) + if err == nil { + t.Fatal("runProjectionDriftReport succeeded") + } + for _, want := range []string{ + "INFO projection_drift_summary_groups_truncated=1", + "INFO projection_drift_summary_rows_hidden=1", + "projection_drift_cause=missing_confirmed_down_projection count=20", + "projection_drift_cause=stale_legacy_down_projection count=1", + } { + if !strings.Contains(out.String(), want) { + t.Fatalf("output missing %q:\n%s", want, out.String()) + } + } + if strings.Contains(out.String(), "999") { + t.Fatalf("hidden summary sample was printed:\n%s", out.String()) + } +} + +func TestFormatOptionalStringSanitizesControlCharacters(t *testing.T) { + raw := "Down\x1b[31m\nStill Down" + got := formatOptionalString(&raw) + if strings.ContainsAny(got, "\x1b\n\r\t") { + t.Fatalf("formatted string contains control characters: %q", got) + } + if !strings.Contains(got, "?") { + t.Fatalf("formatted string = %q, want replacement marker", got) + } +} + +func TestClassifyProjectionDriftCause(t *testing.T) { + eventState := "Down" + tests := []struct { + name string + status int + expected int + state *string + openEvents int + want string + }{ + {name: "legacy down but no open event", status: 2, expected: 1, want: "stale_legacy_down_projection"}, + {name: "running with open down", status: 1, expected: 2, state: &eventState, openEvents: 1, want: "missing_confirmed_down_projection"}, + {name: "seems down not promoted", status: 0, expected: 2, state: &eventState, openEvents: 1, want: "missing_confirmed_promotion"}, + {name: "duplicate open events", status: 1, expected: 2, state: &eventState, openEvents: 2, want: "multiple_open_http_events"}, + {name: "unknown status", status: 9, expected: 1, want: "unexpected_projection_value"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := classifyProjectionDriftCause(tt.status, tt.expected, tt.state, tt.openEvents) + if got.Code != tt.want { + t.Fatalf("cause = %q, want %q", got.Code, tt.want) + } + }) + } +} + func TestResolveProjectionDriftRange(t *testing.T) { minBucket, maxBucket := 2, 4 tests := []struct { diff --git a/docs/data-model.md b/docs/data-model.md index d6c819d..1f95e80 100644 --- a/docs/data-model.md +++ b/docs/data-model.md @@ -136,6 +136,16 @@ If drift is suspected, inspect mismatches with: ./jetmon2 rollout projection-drift --bucket-min=0 --bucket-max=99 --limit=100 ``` +The drift report summarizes mismatches by bucket, projected status, expected +status, likely cause, and sample blog before listing individual rows. It is +read-only: use the likely-cause and repair guidance to confirm the event rows +and transition history before making any reviewed database repair. + +Watch for repeated drift classes during rollout rehearsal and early production +operation. Do not add an automated or dry-run repair planner until those real +examples show which mismatch classes are safe to repair mechanically and which +ones require eventstore investigation first. + After legacy readers move to the v2 API or event tables, disable the projection. ## Status And Failure Types diff --git a/docs/operations-guide.md b/docs/operations-guide.md index ee2f0bb..ee1f611 100644 --- a/docs/operations-guide.md +++ b/docs/operations-guide.md @@ -229,6 +229,16 @@ projection drift. It also shows per-table delivery queue counts and per-host bucket-owner rows for diagnosis. It uses stale heartbeat thresholds when deciding whether a process or dynamic bucket owner is healthy. +When fleet projection drift is red, run `./jetmon2 rollout projection-drift +--limit=100` on an operator host. The command reports bucket/status summaries, +likely causes, and sample rows before listing individual mismatches, and it +does not repair the legacy projection automatically. + +Capture the cause labels from rehearsal and early production incidents. A +future dry-run repair planner should be based on those observed patterns, not +on assumed failure modes, because the unsafe case is repairing `site_status` +while the event rows or transitions still need investigation. + Fleet snapshots are cached briefly by the dashboard process so multiple open operator tabs do not run the full fleet query set on every refresh. diff --git a/docs/project.md b/docs/project.md index 784c8de..db54997 100644 --- a/docs/project.md +++ b/docs/project.md @@ -288,7 +288,7 @@ Queryable by `blog_id` and time range via a CLI tool (`jetmon2 audit --blog-id 1 - `jetmon2 rollout activity-check` — verifies recent check activity for a bucket range after cutover - `jetmon2 rollout rollback-check` — verifies a pinned v2 range is safe to hand back to v1 - `jetmon2 rollout dynamic-check` — validates full `jetmon_hosts` coverage after the fleet transitions from pinned to dynamic ownership -- `jetmon2 rollout projection-drift` — lists active sites whose legacy `site_status` projection disagrees with the authoritative event state +- `jetmon2 rollout projection-drift` — summarizes and lists active sites whose legacy `site_status` projection disagrees with the authoritative event state - `jetmon2 rollout state-report` — summarizes ownership mode, bucket coverage, recent activity, projection drift, delivery-owner state, and the suggested next action - `jetmon2 drain --worker N` — gracefully removes one worker pool slot, waiting for in-flight checks to complete before reducing concurrency - `jetmon2 reload` — sends SIGHUP to the running process (convenience wrapper) diff --git a/docs/roadmap.md b/docs/roadmap.md index 4693101..8c97810 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -15,16 +15,21 @@ migration and the operating data needed to make larger architecture decisions. These are scoped branches worth considering after the merged API CLI, rollout preflight, deliverer hardening, and API CLI fixture workflow branches: -- **`feature/projection-drift-tooling`** - expand drift diagnostics beyond - count/list output with range summaries, likely causes, rehearsal reports, and - dry-run repair guidance if repair becomes safe enough to automate. - **`feature/production-telemetry-reports`** - turn existing StatsD and event data into repeatable reports for first-failure timing, verifier agreement, false-alarm classes, WPCOM parity, and operator explanation gaps after v2 has enough traffic. -- **`feature/v2-rollout-docs-rehearsal`** - walk the migration docs through a - full rehearsal and keep README, operations docs, migration docs, config - samples, service units, and CLI output aligned. + +### Projection Drift Tooling TODO + +- [x] Compare legacy projection status against a per-blog rollup of open HTTP + events so multiple open endpoint events cannot overcount drift. +- [x] Add bucket/status summaries to `rollout projection-drift` so operators + can distinguish one-off rows from range-wide projection failures. +- [x] Add likely-cause labels and manual repair guidance to the drift report + without mutating production data automatically. +- [ ] Consider a dedicated dry-run repair planner after production rehearsals + show which drift classes are safe enough to automate. ### Rollout Simplification TODO @@ -846,10 +851,12 @@ where to look, and what each item unlocked. range without gaps/overlaps, heartbeats are fresh, and projection drift is zero. This supports the second step after every host has moved safely to v2. -- **Projection drift reporting.** `./jetmon2 rollout projection-drift` lists - the specific active sites whose legacy projection disagrees with the - authoritative open HTTP event. - Operators get actionable rows instead of a count-only rollout failure. +- **Projection drift reporting.** `./jetmon2 rollout projection-drift` prints + bucket/status summaries, likely causes, sample rows, and the specific active + sites whose legacy projection disagrees with the authoritative open HTTP + event. + Operators get actionable diagnostics and manual repair guidance instead of a + count-only rollout failure. - **Rollout guidance in validation and dashboard.** `validate-config` prints the correct rollout preflight and drift-report commands, while the operator dashboard shows bucket mode, projection mode, delivery ownership, rollout diff --git a/docs/rollout-quick-reference.md b/docs/rollout-quick-reference.md index 4465d5c..fe757de 100644 --- a/docs/rollout-quick-reference.md +++ b/docs/rollout-quick-reference.md @@ -214,6 +214,10 @@ restart the fleet in the approved window, and run: ./jetmon2 rollout projection-drift --limit=100 ``` +When `projection-drift` fails, start with the summary and cause lines before +the row table. The command is read-only and gives repair guidance; it does not +change `site_status` automatically. + ## Automation Rollout gate commands support JSON output: diff --git a/docs/v1-to-v2-migration.md b/docs/v1-to-v2-migration.md index f1f0b3a..92859c6 100644 --- a/docs/v1-to-v2-migration.md +++ b/docs/v1-to-v2-migration.md @@ -527,6 +527,11 @@ For every replaced range, verify: --limit=100 ``` + If this fails, read the summary section first. It groups mismatches by bucket + and likely cause, then lists sample rows. Do not restart v1 readers or apply + ad hoc `site_status` updates until the matching `jetmon_events` rows and + transition history confirm which projection value is authoritative. + - recent check activity exists for the pinned range: ```bash @@ -685,6 +690,9 @@ After every monitor host is on v2 and stable in pinned mode: overlap-free. If `DASHBOARD_PORT` is enabled, `/fleet` should show `mode=dynamic`, green bucket coverage, no stale processes, no projection drift, and no failed or abandoned delivery rows. + If the projection-drift check fails, use the bucket/cause summary to decide + whether this is a stale legacy projection, a missing event-to-projection + write, or an unexpected status value before making any manual repair. 9. Continue with normal v2 rolling updates: stop one host, deploy, start it, verify `./jetmon2 status`, then move to the next host. diff --git a/internal/db/queries.go b/internal/db/queries.go index 755b07e..82cef0a 100644 --- a/internal/db/queries.go +++ b/internal/db/queries.go @@ -124,18 +124,25 @@ func CountLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax int) ( var count int err := db.QueryRowContext(ctx, ` SELECT COUNT(*) - FROM jetpack_monitor_sites s - LEFT JOIN jetmon_events e - ON e.blog_id = s.blog_id - AND e.check_type = 'http' - AND e.ended_at IS NULL - WHERE s.monitor_active = 1 - AND s.bucket_no BETWEEN ? AND ? - AND s.site_status <> CASE - WHEN e.state = 'Down' THEN 2 - WHEN e.state = 'Seems Down' THEN 0 - ELSE 1 - END`, + FROM ( + SELECT s.jetpack_monitor_site_id, + s.blog_id, + s.site_status, + CASE + WHEN SUM(CASE WHEN e.state = 'Down' THEN 1 ELSE 0 END) > 0 THEN 2 + WHEN SUM(CASE WHEN e.state = 'Seems Down' THEN 1 ELSE 0 END) > 0 THEN 0 + ELSE 1 + END AS expected_status + FROM jetpack_monitor_sites s + LEFT JOIN jetmon_events e + ON e.blog_id = s.blog_id + AND e.check_type = 'http' + AND e.ended_at IS NULL + WHERE s.monitor_active = 1 + AND s.bucket_no BETWEEN ? AND ? + GROUP BY s.jetpack_monitor_site_id, s.blog_id, s.site_status + ) drift + WHERE drift.site_status <> drift.expected_status`, bucketMin, bucketMax, ).Scan(&count) if err != nil { @@ -153,6 +160,19 @@ type ProjectionDriftRow struct { ExpectedStatus int EventID *int64 EventState *string + OpenEventCount int +} + +// ProjectionDriftSummaryRow summarizes one bucket/status/cause group of legacy +// projection drift rows. +type ProjectionDriftSummaryRow struct { + BucketNo int + SiteStatus int + ExpectedStatus int + EventState *string + MaxOpenEventCount int + DriftCount int + SampleBlogID int64 } // ListLegacyProjectionDrift returns active sites in the bucket range whose v1 @@ -162,29 +182,45 @@ func ListLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, limit limit = 50 } rows, err := db.QueryContext(ctx, ` - SELECT s.blog_id, - s.bucket_no, - s.site_status, - CASE - WHEN e.state = 'Down' THEN 2 - WHEN e.state = 'Seems Down' THEN 0 - ELSE 1 - END AS expected_status, - e.id, - e.state - FROM jetpack_monitor_sites s - LEFT JOIN jetmon_events e - ON e.blog_id = s.blog_id - AND e.check_type = 'http' - AND e.ended_at IS NULL - WHERE s.monitor_active = 1 - AND s.bucket_no BETWEEN ? AND ? - AND s.site_status <> CASE - WHEN e.state = 'Down' THEN 2 - WHEN e.state = 'Seems Down' THEN 0 - ELSE 1 - END - ORDER BY s.bucket_no ASC, s.blog_id ASC + SELECT drift.blog_id, + drift.bucket_no, + drift.site_status, + drift.expected_status, + drift.event_id, + drift.event_state, + drift.open_event_count + FROM ( + SELECT s.jetpack_monitor_site_id, + s.blog_id, + s.bucket_no, + s.site_status, + CASE + WHEN SUM(CASE WHEN e.state = 'Down' THEN 1 ELSE 0 END) > 0 THEN 2 + WHEN SUM(CASE WHEN e.state = 'Seems Down' THEN 1 ELSE 0 END) > 0 THEN 0 + ELSE 1 + END AS expected_status, + CASE + WHEN SUM(CASE WHEN e.state = 'Down' THEN 1 ELSE 0 END) > 0 THEN 'Down' + WHEN SUM(CASE WHEN e.state = 'Seems Down' THEN 1 ELSE 0 END) > 0 THEN 'Seems Down' + ELSE MIN(e.state) + END AS event_state, + COALESCE( + MIN(CASE WHEN e.state = 'Down' THEN e.id END), + MIN(CASE WHEN e.state = 'Seems Down' THEN e.id END), + MIN(e.id) + ) AS event_id, + COUNT(e.id) AS open_event_count + FROM jetpack_monitor_sites s + LEFT JOIN jetmon_events e + ON e.blog_id = s.blog_id + AND e.check_type = 'http' + AND e.ended_at IS NULL + WHERE s.monitor_active = 1 + AND s.bucket_no BETWEEN ? AND ? + GROUP BY s.jetpack_monitor_site_id, s.blog_id, s.bucket_no, s.site_status + ) drift + WHERE drift.site_status <> drift.expected_status + ORDER BY drift.bucket_no ASC, drift.blog_id ASC LIMIT ?`, bucketMin, bucketMax, limit, ) @@ -205,6 +241,7 @@ func ListLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, limit &row.ExpectedStatus, &eventID, &eventState, + &row.OpenEventCount, ); err != nil { return nil, fmt.Errorf("scan projection drift: %w", err) } @@ -221,6 +258,81 @@ func ListLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, limit return out, rows.Err() } +// SummarizeLegacyProjectionDrift groups drift rows by bucket and mismatch +// shape so operators can see whether the problem is isolated, systemic, or a +// repeated projection failure pattern. +func SummarizeLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, limit int) ([]ProjectionDriftSummaryRow, error) { + if limit <= 0 { + limit = 20 + } + rows, err := db.QueryContext(ctx, ` + SELECT drift.bucket_no, + drift.site_status, + drift.expected_status, + drift.event_state, + MAX(drift.open_event_count) AS max_open_event_count, + COUNT(*) AS drift_count, + MIN(drift.blog_id) AS sample_blog_id + FROM ( + SELECT s.jetpack_monitor_site_id, + s.blog_id, + s.bucket_no, + s.site_status, + CASE + WHEN SUM(CASE WHEN e.state = 'Down' THEN 1 ELSE 0 END) > 0 THEN 2 + WHEN SUM(CASE WHEN e.state = 'Seems Down' THEN 1 ELSE 0 END) > 0 THEN 0 + ELSE 1 + END AS expected_status, + CASE + WHEN SUM(CASE WHEN e.state = 'Down' THEN 1 ELSE 0 END) > 0 THEN 'Down' + WHEN SUM(CASE WHEN e.state = 'Seems Down' THEN 1 ELSE 0 END) > 0 THEN 'Seems Down' + ELSE MIN(e.state) + END AS event_state, + COUNT(e.id) AS open_event_count + FROM jetpack_monitor_sites s + LEFT JOIN jetmon_events e + ON e.blog_id = s.blog_id + AND e.check_type = 'http' + AND e.ended_at IS NULL + WHERE s.monitor_active = 1 + AND s.bucket_no BETWEEN ? AND ? + GROUP BY s.jetpack_monitor_site_id, s.blog_id, s.bucket_no, s.site_status + ) drift + WHERE drift.site_status <> drift.expected_status + GROUP BY drift.bucket_no, drift.site_status, drift.expected_status, drift.event_state + ORDER BY drift_count DESC, drift.bucket_no ASC, drift.site_status ASC, drift.expected_status ASC + LIMIT ?`, + bucketMin, bucketMax, limit, + ) + if err != nil { + return nil, fmt.Errorf("summarize projection drift: %w", err) + } + defer rows.Close() + + var out []ProjectionDriftSummaryRow + for rows.Next() { + var row ProjectionDriftSummaryRow + var eventState sql.NullString + if err := rows.Scan( + &row.BucketNo, + &row.SiteStatus, + &row.ExpectedStatus, + &eventState, + &row.MaxOpenEventCount, + &row.DriftCount, + &row.SampleBlogID, + ); err != nil { + return nil, fmt.Errorf("scan projection drift summary: %w", err) + } + if eventState.Valid { + v := eventState.String + row.EventState = &v + } + out = append(out, row) + } + return out, rows.Err() +} + // MarkSiteChecked records when a site was last checked. func MarkSiteChecked(ctx context.Context, blogID int64, checkedAt time.Time) error { _, err := db.ExecContext(ctx, diff --git a/internal/db/queries_test.go b/internal/db/queries_test.go index 2fdb31e..6b52d67 100644 --- a/internal/db/queries_test.go +++ b/internal/db/queries_test.go @@ -357,13 +357,13 @@ func TestListLegacyProjectionDrift(t *testing.T) { mock, cleanup := withMockDB(t) defer cleanup() - mock.ExpectQuery("SELECT s.blog_id"). + mock.ExpectQuery("SELECT drift.blog_id"). WithArgs(0, 99, 50). WillReturnRows(sqlmock.NewRows([]string{ - "blog_id", "bucket_no", "site_status", "expected_status", "id", "state", + "blog_id", "bucket_no", "site_status", "expected_status", "id", "state", "open_event_count", }). - AddRow(int64(42), 7, 1, 2, int64(123), "Down"). - AddRow(int64(43), 8, 0, 1, nil, nil)) + AddRow(int64(42), 7, 1, 2, int64(123), "Down", 1). + AddRow(int64(43), 8, 0, 1, nil, nil, 0)) rows, err := ListLegacyProjectionDrift(context.Background(), 0, 99, 0) if err != nil { @@ -381,6 +381,9 @@ func TestListLegacyProjectionDrift(t *testing.T) { if rows[0].EventState == nil || *rows[0].EventState != "Down" { t.Fatalf("row 0 EventState = %v, want Down", rows[0].EventState) } + if rows[0].OpenEventCount != 1 { + t.Fatalf("row 0 OpenEventCount = %d, want 1", rows[0].OpenEventCount) + } if rows[1].EventID != nil || rows[1].EventState != nil { t.Fatalf("row 1 event fields = %+v, want nil", rows[1]) } @@ -389,6 +392,39 @@ func TestListLegacyProjectionDrift(t *testing.T) { } } +func TestSummarizeLegacyProjectionDrift(t *testing.T) { + mock, cleanup := withMockDB(t) + defer cleanup() + + mock.ExpectQuery("SELECT drift.bucket_no"). + WithArgs(0, 99, 20). + WillReturnRows(sqlmock.NewRows([]string{ + "bucket_no", "site_status", "expected_status", "expected_state", "max_open_event_count", "drift_count", "sample_blog_id", + }). + AddRow(7, 1, 2, "Down", 1, 3, int64(42)). + AddRow(8, 0, 1, nil, 0, 2, int64(43))) + + rows, err := SummarizeLegacyProjectionDrift(context.Background(), 0, 99, 0) + if err != nil { + t.Fatalf("SummarizeLegacyProjectionDrift: %v", err) + } + if len(rows) != 2 { + t.Fatalf("rows len = %d, want 2", len(rows)) + } + if rows[0].BucketNo != 7 || rows[0].SiteStatus != 1 || rows[0].ExpectedStatus != 2 || rows[0].DriftCount != 3 || rows[0].SampleBlogID != 42 { + t.Fatalf("row 0 = %+v", rows[0]) + } + if rows[0].EventState == nil || *rows[0].EventState != "Down" { + t.Fatalf("row 0 EventState = %v, want Down", rows[0].EventState) + } + if rows[1].EventState != nil { + t.Fatalf("row 1 EventState = %v, want nil", rows[1].EventState) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sql expectations: %v", err) + } +} + func TestGetAllHostsScansRows(t *testing.T) { mock, cleanup := withMockDB(t) defer cleanup()