Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 192 additions & 14 deletions cmd/jetmon2/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type activityCheckDeps struct {
}

type projectionDriftDeps struct {
CountLegacyProjectionDrift func(context.Context, int, int) (int, error)
ListLegacyProjectionDrift func(context.Context, int, int, int) ([]db.ProjectionDriftRow, error)
CountLegacyProjectionDrift func(context.Context, int, int) (int, error)
ListLegacyProjectionDrift func(context.Context, int, int, int) ([]db.ProjectionDriftRow, error)
SummarizeLegacyProjectionDrift func(context.Context, int, int, int) ([]db.ProjectionDriftSummaryRow, error)
}

type cutoverCheckDeps struct {
Expand Down Expand Up @@ -547,8 +548,9 @@ func cmdRolloutCutoverCheck(args []string) {
CountRecentlyCheckedActiveSitesForRange: db.CountRecentlyCheckedActiveSitesForBucketRange,
},
Projection: projectionDriftDeps{
CountLegacyProjectionDrift: db.CountLegacyProjectionDrift,
ListLegacyProjectionDrift: db.ListLegacyProjectionDrift,
CountLegacyProjectionDrift: db.CountLegacyProjectionDrift,
ListLegacyProjectionDrift: db.ListLegacyProjectionDrift,
SummarizeLegacyProjectionDrift: db.SummarizeLegacyProjectionDrift,
},
Status: dashboardStatus,
}
Expand Down Expand Up @@ -723,8 +725,9 @@ func cmdRolloutProjectionDrift(args []string) {
fmt.Fprintln(out, "PASS db connect")

deps := projectionDriftDeps{
CountLegacyProjectionDrift: db.CountLegacyProjectionDrift,
ListLegacyProjectionDrift: db.ListLegacyProjectionDrift,
CountLegacyProjectionDrift: db.CountLegacyProjectionDrift,
ListLegacyProjectionDrift: db.ListLegacyProjectionDrift,
SummarizeLegacyProjectionDrift: db.SummarizeLegacyProjectionDrift,
}
return runProjectionDriftReport(context.Background(), out, config.Get(), *bucketMin, *bucketMax, *limit, deps)
}); err != nil {
Expand Down Expand Up @@ -1101,8 +1104,9 @@ func defaultGuidedRolloutDeps() guidedRolloutDeps {
CountRecentlyCheckedActiveSitesForRange: db.CountRecentlyCheckedActiveSitesForBucketRange,
},
Projection: projectionDriftDeps{
CountLegacyProjectionDrift: db.CountLegacyProjectionDrift,
ListLegacyProjectionDrift: db.ListLegacyProjectionDrift,
CountLegacyProjectionDrift: db.CountLegacyProjectionDrift,
ListLegacyProjectionDrift: db.ListLegacyProjectionDrift,
SummarizeLegacyProjectionDrift: db.SummarizeLegacyProjectionDrift,
},
Status: dashboardStatus,
}
Expand Down Expand Up @@ -3080,6 +3084,8 @@ func formatHostRows(hosts []db.HostRow) string {
return strings.Join(parts, ", ")
}

const defaultProjectionDriftSummaryLimit = 20

func runProjectionDriftReport(ctx context.Context, out io.Writer, cfg *config.Config, bucketMin, bucketMax, limit int, deps projectionDriftDeps) error {
if cfg == nil {
return errors.New("config is not loaded")
Expand All @@ -3106,6 +3112,29 @@ func runProjectionDriftReport(ctx context.Context, out io.Writer, cfg *config.Co
fmt.Fprintln(out, "PASS legacy_projection_drift=0")
return nil
}
fmt.Fprintf(out, "WARN legacy_projection_drift_requires_manual_review=%d\n", count)
fmt.Fprintln(out, `WARN projection_drift_next_step="review the summary first, then inspect listed event rows before making any site_status repair"`)

if deps.SummarizeLegacyProjectionDrift == nil {
return errors.New("projection drift summarizer is not configured")
}
summaries, err := deps.SummarizeLegacyProjectionDrift(ctx, minBucket, maxBucket, count)
if err != nil {
return fmt.Errorf("summarize legacy projection drift in range %d-%d: %w", minBucket, maxBucket, err)
}
visibleSummaries := firstProjectionDriftSummaries(summaries, defaultProjectionDriftSummaryLimit)
fmt.Fprintln(out, "## projection drift summary")
printProjectionDriftSummaries(out, visibleSummaries)
visibleSummaryCount := sumProjectionDriftSummaryRows(visibleSummaries)
totalSummaryCount := sumProjectionDriftSummaryRows(summaries)
if len(summaries) > len(visibleSummaries) {
fmt.Fprintf(out, "INFO projection_drift_summary_groups_truncated=%d\n", len(summaries)-len(visibleSummaries))
fmt.Fprintf(out, "INFO projection_drift_summary_rows_hidden=%d\n", totalSummaryCount-visibleSummaryCount)
}
if totalSummaryCount != count {
fmt.Fprintf(out, `WARN projection_drift_count_changed=%d summarized=%d note="drift changed while the report was running; rerun projection-drift before repair"`+"\n", count, totalSummaryCount)
}
printProjectionDriftCauseGuidance(out, summaries)

if deps.ListLegacyProjectionDrift == nil {
return errors.New("projection drift lister is not configured")
Expand All @@ -3114,10 +3143,13 @@ func runProjectionDriftReport(ctx context.Context, out io.Writer, cfg *config.Co
if err != nil {
return fmt.Errorf("list legacy projection drift in range %d-%d: %w", minBucket, maxBucket, err)
}
fmt.Fprintln(out, "## projection drift rows")
printProjectionDriftRows(out, rows)
if count > len(rows) {
fmt.Fprintf(out, "INFO projection_drift_rows_truncated=%d\n", count-len(rows))
}
fmt.Fprintln(out, "INFO projection_drift_repair=manual_confirmation_required")
fmt.Fprintln(out, "INFO projection_drift_repair_guidance=confirm the authoritative event rows first; then repair the legacy site_status projection inside a reviewed DB change or by rerunning the code path that writes the event and projection together")
return fmt.Errorf("legacy projection drift=%d in range %d-%d", count, minBucket, maxBucket)
}

Expand Down Expand Up @@ -3177,20 +3209,157 @@ func resolveExplicitRolloutBucketRange(cfg *config.Config, bucketMin, bucketMax
}

func printProjectionDriftRows(out io.Writer, rows []db.ProjectionDriftRow) {
fmt.Fprintf(out, "%-12s %-8s %-11s %-9s %-10s %s\n",
"BLOG_ID", "BUCKET", "SITE_STATUS", "EXPECTED", "EVENT_ID", "EVENT_STATE")
fmt.Fprintf(out, "%-12s %-8s %-22s %-22s %-10s %-11s %-35s %s\n",
"BLOG_ID", "BUCKET", "SITE_STATUS", "EXPECTED", "EVENT_ID", "OPEN_EVENTS", "CAUSE", "EVENT_STATE")
for _, row := range rows {
fmt.Fprintf(out, "%-12d %-8d %-11d %-9d %-10s %s\n",
cause := classifyProjectionDriftCause(row.SiteStatus, row.ExpectedStatus, row.EventState, row.OpenEventCount)
fmt.Fprintf(out, "%-12d %-8d %-22s %-22s %-10s %-11d %-35s %s\n",
row.BlogID,
row.BucketNo,
row.SiteStatus,
row.ExpectedStatus,
formatLegacySiteStatus(row.SiteStatus),
formatLegacySiteStatus(row.ExpectedStatus),
formatOptionalInt(row.EventID),
row.OpenEventCount,
cause.Code,
formatOptionalString(row.EventState),
)
}
}

func printProjectionDriftSummaries(out io.Writer, summaries []db.ProjectionDriftSummaryRow) {
if len(summaries) == 0 {
fmt.Fprintln(out, "INFO projection_drift_summary=none")
return
}
fmt.Fprintf(out, "%-8s %-7s %-22s %-22s %-11s %-12s %-35s %s\n",
"BUCKET", "COUNT", "SITE_STATUS", "EXPECTED", "OPEN_EVENTS", "SAMPLE_BLOG", "CAUSE", "EVENT_STATE")
for _, row := range summaries {
cause := classifyProjectionDriftCause(row.SiteStatus, row.ExpectedStatus, row.EventState, row.MaxOpenEventCount)
fmt.Fprintf(out, "%-8d %-7d %-22s %-22s %-11d %-12d %-35s %s\n",
row.BucketNo,
row.DriftCount,
formatLegacySiteStatus(row.SiteStatus),
formatLegacySiteStatus(row.ExpectedStatus),
row.MaxOpenEventCount,
row.SampleBlogID,
cause.Code,
formatOptionalString(row.EventState),
)
}
}

func sumProjectionDriftSummaryRows(summaries []db.ProjectionDriftSummaryRow) int {
total := 0
for _, row := range summaries {
total += row.DriftCount
}
return total
}

func firstProjectionDriftSummaries(summaries []db.ProjectionDriftSummaryRow, limit int) []db.ProjectionDriftSummaryRow {
if limit <= 0 || len(summaries) <= limit {
return summaries
}
return summaries[:limit]
}

type projectionDriftCause struct {
Code string
Action string
}

func classifyProjectionDriftCause(siteStatus, expectedStatus int, eventState *string, openEventCount int) projectionDriftCause {
switch {
case !isKnownLegacySiteStatus(siteStatus) || !isKnownLegacySiteStatus(expectedStatus):
return projectionDriftCause{
Code: "unexpected_projection_value",
Action: "the legacy site_status value is outside the expected v1 projection shape; inspect and repair the site row manually",
}
case openEventCount > 1:
return projectionDriftCause{
Code: "multiple_open_http_events",
Action: "inspect duplicate open HTTP events before repairing the legacy projection; the site-level projection uses the worst open HTTP state",
}
case expectedStatus == 1 && siteStatus != 1:
return projectionDriftCause{
Code: "stale_legacy_down_projection",
Action: "the legacy site row still reports downtime even though no open HTTP downtime event requires it; inspect recent close transitions before setting the projection back to running",
}
case expectedStatus == 2 && siteStatus == 1:
return projectionDriftCause{
Code: "missing_confirmed_down_projection",
Action: "an open Down event exists but the legacy site row still reports running; inspect the eventstore transaction path before continuing rollout",
}
case expectedStatus == 0 && siteStatus == 1:
return projectionDriftCause{
Code: "missing_seems_down_projection",
Action: "an open Seems Down event exists but the legacy site row still reports running; inspect first-failure projection writes and local retry handling",
}
case expectedStatus == 2 && siteStatus == 0:
return projectionDriftCause{
Code: "missing_confirmed_promotion",
Action: "the event reached Down but the legacy projection stayed Seems Down; inspect verifier-confirmed promotion writes",
}
case expectedStatus == 0 && siteStatus == 2:
return projectionDriftCause{
Code: "stale_confirmed_down_projection",
Action: "the legacy site row reports confirmed down while the open event is only Seems Down; inspect recovery or false-alarm transition history",
}
case eventState != nil && *eventState != "":
return projectionDriftCause{
Code: "unexpected_open_event_projection",
Action: "the open event and legacy projection disagree in a non-standard way; inspect the event row, transition history, and site row before repair",
}
}
return projectionDriftCause{
Code: "unexpected_projection_value",
Action: "the legacy site_status value is outside the expected v1 projection shape; inspect and repair the site row manually",
}
}

func isKnownLegacySiteStatus(status int) bool {
return status == 0 || status == 1 || status == 2
}

func printProjectionDriftCauseGuidance(out io.Writer, summaries []db.ProjectionDriftSummaryRow) {
causes := map[string]projectionDriftCause{}
counts := map[string]int{}
for _, row := range summaries {
cause := classifyProjectionDriftCause(row.SiteStatus, row.ExpectedStatus, row.EventState, row.MaxOpenEventCount)
causes[cause.Code] = cause
counts[cause.Code] += row.DriftCount
}
if len(counts) == 0 {
return
}
codes := make([]string, 0, len(counts))
for code := range counts {
codes = append(codes, code)
}
sort.Slice(codes, func(i, j int) bool {
if counts[codes[i]] != counts[codes[j]] {
return counts[codes[i]] > counts[codes[j]]
}
return codes[i] < codes[j]
})
for _, code := range codes {
fmt.Fprintf(out, "WARN projection_drift_cause=%s count=%d action=%q\n", code, counts[code], causes[code].Action)
}
}

func formatLegacySiteStatus(status int) string {
switch status {
case 0:
return "0/SITE_DOWN"
case 1:
return "1/SITE_RUNNING"
case 2:
return "2/SITE_CONFIRMED_DOWN"
default:
return fmt.Sprintf("%d/UNKNOWN", status)
}
}

func formatOptionalInt(v *int64) string {
if v == nil {
return "-"
Expand All @@ -3202,5 +3371,14 @@ func formatOptionalString(v *string) string {
if v == nil || *v == "" {
return "-"
}
return *v
return sanitizeRolloutTableString(*v)
}

func sanitizeRolloutTableString(v string) string {
return strings.Map(func(r rune) rune {
if r < 0x20 || r == 0x7f {
return '?'
}
return r
}, v)
}
Loading