From 972006cf70ab72d1427298a2e3131d7a61d96877 Mon Sep 17 00:00:00 2001 From: Chris Jean Date: Wed, 13 May 2026 23:31:29 -0500 Subject: [PATCH] Preserve endpoint identity for duplicate monitor rows Production jetpack_monitor_sites data can contain multiple active monitor URLs for the same blog_id. Treating blog_id as the scheduler identity collapses those rows, which risks skipping checks, merging retry state, and projecting status changes onto the wrong legacy row during rollout. Carry jetpack_monitor_site_id through local checker requests, Veriflier requests and responses, streaming planner state, retry state, HTTP event identity, and legacy status projection updates. blog_id remains the WPCOM/site identity, while endpoint_id records the legacy monitor row for HTTP incidents. Also update the v2-native target table uniqueness to source_site_id, document the identity split, and add regression tests for duplicate active monitor rows plus endpoint-aware status and Veriflier identity handling. Tests: go test ./... --- docs/adr/0009-streaming-monitor-engine.md | 7 ++ docs/data-model.md | 10 ++ docs/roadmap.md | 5 + internal/checker/checker.go | 3 + internal/db/migrations.go | 8 ++ internal/db/queries.go | 55 ++++++++- internal/db/queries_test.go | 27 +++++ internal/eventstore/eventstore.go | 28 ++++- internal/orchestrator/identity.go | 30 +++++ internal/orchestrator/orchestrator.go | 132 +++++++++++---------- internal/orchestrator/orchestrator_test.go | 36 +++++- internal/orchestrator/retry.go | 37 +++--- internal/orchestrator/streaming.go | 99 ++++++++-------- internal/orchestrator/streaming_test.go | 21 +++- internal/veriflier/server.go | 6 + internal/veriflier/types.go | 18 +-- internal/veriflier/veriflier_test.go | 7 +- veriflier2/cmd/main.go | 14 ++- 18 files changed, 393 insertions(+), 150 deletions(-) create mode 100644 internal/orchestrator/identity.go diff --git a/docs/adr/0009-streaming-monitor-engine.md b/docs/adr/0009-streaming-monitor-engine.md index 12709c19..a8e18fe2 100644 --- a/docs/adr/0009-streaming-monitor-engine.md +++ b/docs/adr/0009-streaming-monitor-engine.md @@ -48,6 +48,13 @@ v2 sidecar config tables, but the new table is intentionally additive so later iterations can move derived scheduling state out of the legacy path without breaking rollback. +HTTP monitor identity is the legacy row id, not just `blog_id`. Production +datasets can contain multiple active monitor URLs for one blog, so streaming +planner targets, retry state, and future `jetmon_check_targets` sync must key +by `jetpack_monitor_sites.jetpack_monitor_site_id` / `source_site_id` when that +row id is available. `blog_id` remains the WPCOM/site identity used for +notifications and site-level API views. + ## Compatibility `jetpack_monitor_sites` remains the source of truth for v1-owned site identity, diff --git a/docs/data-model.md b/docs/data-model.md index 31ff69ad..341b914d 100644 --- a/docs/data-model.md +++ b/docs/data-model.md @@ -131,6 +131,16 @@ from `jetpack_monitor_sites`. That keeps correctness and rollback behavior easy to validate before moving config-sync reads fully onto the v2-native target table in a later scaling branch. +Production data can contain more than one active monitor URL for the same +`blog_id`. Monitor execution therefore treats +`jetpack_monitor_sites.jetpack_monitor_site_id` as the endpoint identity for +HTTP checks while retaining `blog_id` as the WPCOM/site identity. HTTP events +write that row id to `jetmon_events.endpoint_id`, scheduler/retry in-memory +state keys by the row id when available, and the v1 compatibility projection is +updated by `jetpack_monitor_site_id` so two active URLs for one site do not +overwrite each other's rollout state. The v2-native target table is unique on +`source_site_id` for the same reason. + ## Process Health `jetmon_process_health` is the durable source for fleet-level operator views. diff --git a/docs/roadmap.md b/docs/roadmap.md index 88836d21..3bc34201 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -233,6 +233,11 @@ production telemetry branches: first prototype still reloads active identity/cadence from `jetpack_monitor_sites` plus v2 sidecar config so correctness can be validated before optimizing config-sync reads. +- [x] Preserve duplicate active monitor URLs for the same `blog_id` by carrying + `jetpack_monitor_site_id` through local checks, Veriflier RPCs, streaming + planner state, retry state, HTTP event identity, and legacy projection + writes. `blog_id` remains the site/WPCOM identity, while the legacy row id is + the endpoint identity for monitor execution. - [x] Evaluate whether any remaining single-column `blog_id` index is needed on `jetpack_monitor_sites` after sidecar-table rollout. PR #101 added `idx_monitor_blog_id` for legacy-table point writes, but current rollout diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 0efe7d52..1f25f565 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -619,6 +619,7 @@ func isLocalResolverHost(host string) bool { // Request holds the parameters for a single HTTP check. type Request struct { + MonitorSiteID int64 BlogID int64 URL string Method string @@ -637,6 +638,7 @@ type Request struct { // Result holds the outcome of a single HTTP check. type Result struct { + MonitorSiteID int64 BlogID int64 URL string Method string @@ -722,6 +724,7 @@ func Check(ctx context.Context, req Request) Result { profile = checkmode.EffectiveProfile(method, profile) res := Result{ + MonitorSiteID: req.MonitorSiteID, BlogID: req.BlogID, URL: req.URL, Method: method, diff --git a/internal/db/migrations.go b/internal/db/migrations.go index cc9af130..d6fea87c 100644 --- a/internal/db/migrations.go +++ b/internal/db/migrations.go @@ -548,6 +548,14 @@ var migrations = []migration{ ADD COLUMN timeout_seconds TINYINT UNSIGNED NULL AFTER custom_headers, ADD COLUMN redirect_policy ENUM('follow','alert','fail') NULL DEFAULT NULL AFTER timeout_seconds, ADD COLUMN alert_cooldown_minutes SMALLINT UNSIGNED NULL AFTER redirect_policy`}, + + // Migration 39 prepares the v2-native target table for production rows + // where one blog_id has multiple active monitor URLs. The legacy table's + // primary row id is the durable endpoint identity, so target sync must be + // unique on source_site_id rather than collapsing all endpoints for a blog. + {39, `ALTER TABLE jetmon_check_targets + DROP INDEX uk_blog_id, + ADD UNIQUE KEY uk_source_site_id (source_site_id)`}, } // Migrate applies all pending migrations idempotently. diff --git a/internal/db/queries.go b/internal/db/queries.go index ad64ce30..bef93389 100644 --- a/internal/db/queries.go +++ b/internal/db/queries.go @@ -60,7 +60,7 @@ func GetSitesForBucket(ctx context.Context, bucketMin, bucketMax, batchSize int, // scheduler. It intentionally ignores last_checked_at and next_check_at: those // are legacy scheduler projections, while streaming mode maintains due time in // memory and writes coarse rollback freshness separately. -func ListActiveSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int, afterBlogID int64, limit int) ([]Site, error) { +func ListActiveSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int, afterMonitorSiteID int64, limit int) ([]Site, error) { if limit <= 0 { limit = 5000 } @@ -76,10 +76,10 @@ func ListActiveSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int LEFT JOIN jetmon_site_runtime r ON r.blog_id = s.blog_id WHERE s.monitor_active = 1 AND s.bucket_no BETWEEN ? AND ? - AND s.blog_id > ? - ORDER BY s.blog_id ASC + AND s.jetpack_monitor_site_id > ? + ORDER BY s.jetpack_monitor_site_id ASC LIMIT ?`, - bucketMin, bucketMax, afterBlogID, limit, + bucketMin, bucketMax, afterMonitorSiteID, limit, ) if err != nil { return nil, fmt.Errorf("query active sites: %w", err) @@ -189,6 +189,20 @@ func CountDueSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int, // UpdateSiteStatus updates site_status and last_status_change for a site. func UpdateSiteStatus(ctx context.Context, blogID int64, status int, changedAt time.Time) error { + return UpdateSiteStatusForMonitorSite(ctx, 0, blogID, status, changedAt) +} + +// UpdateSiteStatusForMonitorSite updates the legacy projection for one monitor +// row when monitorSiteID is known, falling back to the historical blog_id update +// for callers that are still site-level. +func UpdateSiteStatusForMonitorSite(ctx context.Context, monitorSiteID, blogID int64, status int, changedAt time.Time) error { + if monitorSiteID > 0 { + _, err := db.ExecContext(ctx, + `UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE jetpack_monitor_site_id = ?`, + status, changedAt.UTC(), monitorSiteID, + ) + return err + } _, err := db.ExecContext(ctx, `UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE blog_id = ?`, status, changedAt.UTC(), blogID, @@ -200,7 +214,24 @@ func UpdateSiteStatus(ctx context.Context, blogID int64, status int, changedAt t // mode uses this sparingly after verifier escalation so its in-memory target // state does not send a recovery notification after a false alarm. func GetSiteStatus(ctx context.Context, blogID int64) (int, error) { + return GetSiteStatusForMonitorSite(ctx, 0, blogID) +} + +// GetSiteStatusForMonitorSite reads the legacy status projection for one +// monitor row when monitorSiteID is known, falling back to the historical +// blog_id lookup for site-level callers. +func GetSiteStatusForMonitorSite(ctx context.Context, monitorSiteID, blogID int64) (int, error) { var status int + if monitorSiteID > 0 { + err := db.QueryRowContext(ctx, + `SELECT site_status FROM jetpack_monitor_sites WHERE jetpack_monitor_site_id = ?`, + monitorSiteID, + ).Scan(&status) + if err != nil { + return 0, fmt.Errorf("get site status: %w", err) + } + return status, nil + } err := db.QueryRowContext(ctx, `SELECT site_status FROM jetpack_monitor_sites WHERE blog_id = ?`, blogID, @@ -214,6 +245,19 @@ func GetSiteStatus(ctx context.Context, blogID int64) (int, error) { // UpdateSiteStatusTx is the transaction-aware variant of UpdateSiteStatus, used // when the projection write must commit atomically with an event mutation. func UpdateSiteStatusTx(ctx context.Context, tx *sql.Tx, blogID int64, status int, changedAt time.Time) error { + return UpdateSiteStatusTxForMonitorSite(ctx, tx, 0, blogID, status, changedAt) +} + +// UpdateSiteStatusTxForMonitorSite is the transaction-aware variant of +// UpdateSiteStatusForMonitorSite. +func UpdateSiteStatusTxForMonitorSite(ctx context.Context, tx *sql.Tx, monitorSiteID, blogID int64, status int, changedAt time.Time) error { + if monitorSiteID > 0 { + _, err := tx.ExecContext(ctx, + `UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE jetpack_monitor_site_id = ?`, + status, changedAt.UTC(), monitorSiteID, + ) + return err + } _, err := tx.ExecContext(ctx, `UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE blog_id = ?`, status, changedAt.UTC(), blogID, @@ -240,6 +284,7 @@ func CountLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax int) ( FROM jetpack_monitor_sites s LEFT JOIN jetmon_events e ON e.blog_id = s.blog_id + AND (e.endpoint_id = s.jetpack_monitor_site_id OR e.endpoint_id IS NULL) AND e.check_type = 'http' AND e.ended_at IS NULL WHERE s.monitor_active = 1 @@ -317,6 +362,7 @@ func ListLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, limit FROM jetpack_monitor_sites s LEFT JOIN jetmon_events e ON e.blog_id = s.blog_id + AND (e.endpoint_id = s.jetpack_monitor_site_id OR e.endpoint_id IS NULL) AND e.check_type = 'http' AND e.ended_at IS NULL WHERE s.monitor_active = 1 @@ -396,6 +442,7 @@ func SummarizeLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, l FROM jetpack_monitor_sites s LEFT JOIN jetmon_events e ON e.blog_id = s.blog_id + AND (e.endpoint_id = s.jetpack_monitor_site_id OR e.endpoint_id IS NULL) AND e.check_type = 'http' AND e.ended_at IS NULL WHERE s.monitor_active = 1 diff --git a/internal/db/queries_test.go b/internal/db/queries_test.go index d3d5995e..5b7cff2a 100644 --- a/internal/db/queries_test.go +++ b/internal/db/queries_test.go @@ -358,6 +358,33 @@ func TestMarkSitesCheckedBatchesUpdates(t *testing.T) { } } +func TestMonitorSiteStatusUsesEndpointIdentity(t *testing.T) { + mock, cleanup := withMockDB(t) + defer cleanup() + + changedAt := time.Date(2026, 5, 13, 18, 0, 0, 0, time.UTC) + mock.ExpectExec("UPDATE jetpack_monitor_sites SET site_status"). + WithArgs(2, changedAt, int64(1234)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT site_status FROM jetpack_monitor_sites"). + WithArgs(int64(1234)). + WillReturnRows(sqlmock.NewRows([]string{"site_status"}).AddRow(2)) + + if err := UpdateSiteStatusForMonitorSite(context.Background(), 1234, 42, 2, changedAt); err != nil { + t.Fatalf("UpdateSiteStatusForMonitorSite: %v", err) + } + status, err := GetSiteStatusForMonitorSite(context.Background(), 1234, 42) + if err != nil { + t.Fatalf("GetSiteStatusForMonitorSite: %v", err) + } + if status != 2 { + t.Fatalf("status = %d, want 2", status) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sql expectations: %v", err) + } +} + func TestMarkSitesCheckedRetriesDeadlock(t *testing.T) { mock, cleanup := withMockDB(t) defer cleanup() diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 9d2c0c9d..51ce1d0e 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -442,15 +442,41 @@ type ActiveEvent struct { // the event id cached (e.g. a recovery in a round after the open was forgotten // across a process restart). func (t *Tx) FindActiveByBlog(ctx context.Context, blogID int64, checkType string) (ActiveEvent, error) { + return t.FindActive(ctx, Identity{BlogID: blogID, CheckType: checkType}) +} + +// FindActive returns the open event for an identity. When EndpointID is set it +// prefers an endpoint-specific event but can fall back to a legacy site-level +// event so in-flight migrations can recover rows opened before endpoint +// identity was introduced. +func (t *Tx) FindActive(ctx context.Context, identity Identity) (ActiveEvent, error) { if t.tx == nil { return ActiveEvent{}, nil } var ae ActiveEvent + if identity.EndpointID != nil { + err := t.tx.QueryRowContext(ctx, ` + SELECT id, severity, state FROM jetmon_events + WHERE blog_id = ? + AND check_type = ? + AND ended_at IS NULL + AND (endpoint_id = ? OR endpoint_id IS NULL) + ORDER BY endpoint_id IS NULL ASC, started_at ASC + LIMIT 1`, identity.BlogID, identity.CheckType, *identity.EndpointID, + ).Scan(&ae.ID, &ae.Severity, &ae.State) + if errors.Is(err, sql.ErrNoRows) { + return ActiveEvent{}, ErrEventNotFound + } + if err != nil { + return ActiveEvent{}, fmt.Errorf("find active event: %w", err) + } + return ae, nil + } err := t.tx.QueryRowContext(ctx, ` SELECT id, severity, state FROM jetmon_events WHERE blog_id = ? AND check_type = ? AND ended_at IS NULL ORDER BY started_at ASC - LIMIT 1`, blogID, checkType, + LIMIT 1`, identity.BlogID, identity.CheckType, ).Scan(&ae.ID, &ae.Severity, &ae.State) if errors.Is(err, sql.ErrNoRows) { return ActiveEvent{}, ErrEventNotFound diff --git a/internal/orchestrator/identity.go b/internal/orchestrator/identity.go new file mode 100644 index 00000000..e902a9b7 --- /dev/null +++ b/internal/orchestrator/identity.go @@ -0,0 +1,30 @@ +package orchestrator + +import ( + "github.com/Automattic/jetmon/internal/checker" + "github.com/Automattic/jetmon/internal/db" + "github.com/Automattic/jetmon/internal/eventstore" +) + +func monitorTargetID(site db.Site) int64 { + if site.ID > 0 { + return site.ID + } + return site.BlogID +} + +func checkResultTargetID(res checker.Result) int64 { + if res.MonitorSiteID > 0 { + return res.MonitorSiteID + } + return res.BlogID +} + +func httpEventIdentity(site db.Site) eventstore.Identity { + identity := eventstore.Identity{BlogID: site.BlogID, CheckType: checkTypeHTTP} + if site.ID > 0 { + endpointID := site.ID + identity.EndpointID = &endpointID + } + return identity +} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index c794e9d4..31346e98 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -87,7 +87,7 @@ var ( dbUpdateSSLExpiry = db.UpdateSSLExpiry dbUpdateSSLExpiries = db.UpdateSSLExpiries dbUpdateSiteStatus = db.UpdateSiteStatus - dbGetSiteStatus = db.GetSiteStatus + dbGetSiteStatus = db.GetSiteStatusForMonitorSite dbRecordFalsePositive = db.RecordFalsePositive dbUpdateLastAlertSent = db.UpdateLastAlertSent dbCountDueSites = db.CountDueSitesForBucketRange @@ -485,7 +485,7 @@ func (o *Orchestrator) checkSitesPage(cfg *config.Config, sites []db.Site, pageN siteMap := make(map[int64]db.Site, len(sites)) results := make(map[int64]checker.Result, len(sites)) for _, s := range sites { - siteMap[s.BlogID] = s + siteMap[monitorTargetID(s)] = s } dispatchStart := time.Now() @@ -636,10 +636,11 @@ func (o *Orchestrator) waitForPageResult(siteMap map[int64]db.Site, results map[ func filterUnseenSites(sites []db.Site, seen map[int64]struct{}) []db.Site { filtered := make([]db.Site, 0, len(sites)) for _, site := range sites { - if _, ok := seen[site.BlogID]; ok { + targetID := monitorTargetID(site) + if _, ok := seen[targetID]; ok { continue } - seen[site.BlogID] = struct{}{} + seen[targetID] = struct{}{} filtered = append(filtered, site) } return filtered @@ -665,6 +666,7 @@ func checkRequestForSite(cfg *config.Config, site db.Site) checker.Request { method := effectiveCheckMethod(cfg, site) profile := effectiveDetectionProfile(cfg, site, method) req := checker.Request{ + MonitorSiteID: site.ID, BlogID: site.BlogID, URL: site.MonitorURL, Method: method, @@ -730,17 +732,18 @@ func collectionDeadlineForSites(cfg *config.Config, sites []db.Site) time.Durati } func recordPageResult(siteMap map[int64]db.Site, results map[int64]checker.Result, res checker.Result, summary *roundSummary) { - if _, ok := siteMap[res.BlogID]; !ok { + targetID := checkResultTargetID(res) + if _, ok := siteMap[targetID]; !ok { summary.staleResults++ - log.Printf("orchestrator: ignored stale check result blog_id=%d", res.BlogID) + log.Printf("orchestrator: ignored stale check result target_id=%d blog_id=%d", targetID, res.BlogID) return } - if _, ok := results[res.BlogID]; ok { + if _, ok := results[targetID]; ok { summary.duplicateResults++ - log.Printf("orchestrator: ignored duplicate check result blog_id=%d", res.BlogID) + log.Printf("orchestrator: ignored duplicate check result target_id=%d blog_id=%d", targetID, res.BlogID) return } - results[res.BlogID] = res + results[targetID] = res } func (o *Orchestrator) shouldSampleDueCounts(now time.Time) bool { @@ -1076,24 +1079,24 @@ func (o *Orchestrator) updateSSLExpiries(updates []db.SiteSSLExpiry, summary *re } func knownSiteResults(results map[int64]checker.Result, sites map[int64]db.Site) []siteCheckResult { - blogIDs := make([]int64, 0, len(results)) - for blogID := range results { - blogIDs = append(blogIDs, blogID) + targetIDs := make([]int64, 0, len(results)) + for targetID := range results { + targetIDs = append(targetIDs, targetID) } - sort.Slice(blogIDs, func(i, j int) bool { - return blogIDs[i] < blogIDs[j] + sort.Slice(targetIDs, func(i, j int) bool { + return targetIDs[i] < targetIDs[j] }) records := make([]siteCheckResult, 0, len(results)) - for _, blogID := range blogIDs { - site, ok := sites[blogID] + for _, targetID := range targetIDs { + site, ok := sites[targetID] if !ok { continue } records = append(records, siteCheckResult{ - blogID: blogID, + blogID: site.BlogID, site: site, - res: results[blogID], + res: results[targetID], }) } return records @@ -1400,7 +1403,8 @@ func shouldUpdateSSLExpiry(stored *time.Time, observed time.Time) bool { } func (o *Orchestrator) handleRecovery(site db.Site, res checker.Result) { - entry := o.retries.get(site.BlogID) + targetID := monitorTargetID(site) + entry := o.retries.get(targetID) if entry == nil && site.SiteStatus == statusRunning { return // was already up, nothing to do } @@ -1409,7 +1413,7 @@ func (o *Orchestrator) handleRecovery(site db.Site, res checker.Result) { if entry != nil { knownEventID = entry.eventID } - o.retries.clear(site.BlogID) + o.retries.clear(targetID) if site.SiteStatus != statusRunning || knownEventID > 0 { changeTime := nowFunc().UTC() @@ -1424,7 +1428,7 @@ func (o *Orchestrator) handleRecovery(site db.Site, res checker.Result) { // same transaction. The resolution reason depends on whether the event // was already verifier-confirmed (Down) or still in the local-retry // phase (Seems Down). - if err := o.closeRecoveredEvent(site.BlogID, knownEventID, changeTime, res); err != nil { + if err := o.closeRecoveredEvent(site, knownEventID, changeTime, res); err != nil { log.Printf("orchestrator: close recovered event blog_id=%d: %v", site.BlogID, err) } @@ -1445,7 +1449,7 @@ func (o *Orchestrator) handleRecovery(site db.Site, res checker.Result) { Detail: "recovery cooldown active", }) } - o.retries.markRecovered(site.BlogID, changeTime) + o.retries.markRecovered(targetID, changeTime) } } @@ -1479,7 +1483,7 @@ func (o *Orchestrator) handleFailure(site db.Site, res checker.Result) bool { } if site.SiteStatus == statusConfirmedDown { - o.retries.clear(site.BlogID) + o.retries.clear(monitorTargetID(site)) class := failureClass(res) emitCounter("detection.down.still_down.count", 1) emitCounter("detection.down.still_down."+class+".count", 1) @@ -1550,7 +1554,7 @@ func (o *Orchestrator) postRecoveryTransientSuppression(site db.Site, res checke if o == nil || o.retries == nil || !postRecoveryTransientFailure(res) { return false, "", 0 } - if o.retries.get(site.BlogID) != nil { + if o.retries.get(monitorTargetID(site)) != nil { return false, "", 0 } return postRecoveryTransientSuppression(site, res, o.retries) @@ -1560,17 +1564,18 @@ func postRecoveryTransientSuppression(site db.Site, res checker.Result, retries if retries == nil || !postRecoveryTransientFailure(res) { return false, "", 0 } - if retries.get(site.BlogID) != nil { + targetID := monitorTargetID(site) + if retries.get(targetID) != nil { return false, "", 0 } checkedAt := resultCheckedAt(res) falseAlarmWindow := postFalseAlarmTransientFailureWindow(site) - if retries.recentlyFalseAlarmed(site.BlogID, checkedAt, falseAlarmWindow) { - retries.markFalseAlarm(site.BlogID, checkedAt) + if retries.recentlyFalseAlarmed(targetID, checkedAt, falseAlarmWindow) { + retries.markFalseAlarm(targetID, checkedAt) return true, "false_alarm", falseAlarmWindow } recoveryWindow := postRecoveryTransientFailureWindow(site) - if retries.recentlyRecovered(site.BlogID, checkedAt, recoveryWindow) { + if retries.recentlyRecovered(targetID, checkedAt, recoveryWindow) { return true, "recovery", recoveryWindow } return false, "", 0 @@ -1623,6 +1628,7 @@ func (o *Orchestrator) escalateToVerifliers(site db.Site, entry *retryEntry) { method := effectiveCheckMethod(cfg, site) profile := effectiveDetectionProfile(cfg, site, method) req := veriflier.CheckRequest{ + MonitorSiteID: site.ID, BlogID: site.BlogID, URL: site.MonitorURL, Method: method, @@ -1766,14 +1772,15 @@ func (o *Orchestrator) escalateToVerifliers(site db.Site, entry *retryEntry) { "verifier_disagreed": healthyVerifliers - confirmations, "verifier_confirmed": confirmations, }) - if err := o.closeEvent(site.BlogID, entry.eventID, + if err := o.closeEvent(site, entry.eventID, eventstore.ReasonFalseAlarm, statusRunning, falseAlarmAt, meta); err != nil { log.Printf("orchestrator: close false-alarm event blog_id=%d event_id=%d: %v", site.BlogID, entry.eventID, err) } } - o.retries.clear(site.BlogID) - o.retries.markFalseAlarm(site.BlogID, falseAlarmAt) + targetID := monitorTargetID(site) + o.retries.clear(targetID) + o.retries.markFalseAlarm(targetID, falseAlarmAt) } } @@ -1804,7 +1811,7 @@ func (o *Orchestrator) confirmDown(site db.Site, entry *retryEntry, vResults []v // local DNS failures intentionally skip the Seems Down event, so verifier // confirmation opens the customer-visible incident directly as Down. if entry.eventID > 0 { - if err := o.promoteToDown(site.BlogID, entry.eventID, changeTime, meta); err != nil { + if err := o.promoteToDown(site, entry.eventID, changeTime, meta); err != nil { log.Printf("orchestrator: promote event blog_id=%d event_id=%d: %v", site.BlogID, entry.eventID, err) } } else { @@ -1812,7 +1819,7 @@ func (o *Orchestrator) confirmDown(site db.Site, entry *retryEntry, vResults []v if err != nil { log.Printf("orchestrator: open confirmed-down event blog_id=%d: %v", site.BlogID, err) if config.LegacyStatusProjectionEnabled() { - _ = dbUpdateSiteStatus(o.ctx, site.BlogID, newStatus, changeTime) + _ = db.UpdateSiteStatusForMonitorSite(o.ctx, site.ID, site.BlogID, newStatus, changeTime) } } else { entry.eventID = eventID @@ -1840,7 +1847,7 @@ func (o *Orchestrator) confirmDown(site db.Site, entry *retryEntry, vResults []v }) } - o.retries.clear(site.BlogID) + o.retries.clear(monitorTargetID(site)) } func confirmedDownMetadata(site db.Site, entry *retryEntry, vResults []veriflier.CheckResult, directOpen bool) json.RawMessage { @@ -1869,7 +1876,8 @@ func verifierConfirmationCount(vResults []veriflier.CheckResult) int { } func (o *Orchestrator) swallowMaintenanceFailure(site db.Site, res checker.Result) { - entry := o.retries.get(site.BlogID) + targetID := monitorTargetID(site) + entry := o.retries.get(targetID) knownEventID := int64(0) if entry != nil { knownEventID = entry.eventID @@ -1889,12 +1897,12 @@ func (o *Orchestrator) swallowMaintenanceFailure(site db.Site, res checker.Resul }) if entry != nil || site.SiteStatus != statusRunning { - if err := o.closeMaintenanceEvent(site.BlogID, knownEventID, nowFunc().UTC(), meta); err != nil { + if err := o.closeMaintenanceEvent(site, knownEventID, nowFunc().UTC(), meta); err != nil { log.Printf("orchestrator: close maintenance-swallowed event blog_id=%d event_id=%d: %v", site.BlogID, knownEventID, err) } } - o.retries.clear(site.BlogID) + o.retries.clear(targetID) o.auditLog(audit.Entry{ BlogID: site.BlogID, @@ -2447,7 +2455,7 @@ func (o *Orchestrator) openSeemsDownOnce(site db.Site, res checker.Result, first meta, _ := json.Marshal(checkResultMetadata(site, res, firstFailAt)) out, err := tx.Open(o.ctx, eventstore.OpenInput{ - Identity: eventstore.Identity{BlogID: site.BlogID, CheckType: checkTypeHTTP}, + Identity: httpEventIdentity(site), Severity: eventstore.SeveritySeemsDown, State: eventstore.StateSeemsDown, Source: o.hostname, @@ -2461,7 +2469,7 @@ func (o *Orchestrator) openSeemsDownOnce(site db.Site, res checker.Result, first // (Opened=false) is by definition a row that already exists, so site_status // was already projected when the event first opened. if out.Opened && config.LegacyStatusProjectionEnabled() && tx.Tx() != nil { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), site.BlogID, statusDown, nowFunc().UTC()); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, statusDown, nowFunc().UTC()); err != nil { return 0, false, fmt.Errorf("project site_status: %w", err) } } @@ -2497,7 +2505,7 @@ func (o *Orchestrator) openConfirmedDownOnce(site db.Site, changeTime time.Time, defer func() { _ = tx.Rollback() }() out, err := tx.Open(o.ctx, eventstore.OpenInput{ - Identity: eventstore.Identity{BlogID: site.BlogID, CheckType: checkTypeHTTP}, + Identity: httpEventIdentity(site), Severity: eventstore.SeverityDown, State: eventstore.StateDown, Source: o.hostname, @@ -2518,7 +2526,7 @@ func (o *Orchestrator) openConfirmedDownOnce(site db.Site, changeTime time.Time, } if projectConfirmedDown && config.LegacyStatusProjectionEnabled() && tx.Tx() != nil { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), site.BlogID, statusConfirmedDown, changeTime); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, statusConfirmedDown, changeTime); err != nil { return 0, false, fmt.Errorf("project site_status: %w", err) } } @@ -2531,13 +2539,13 @@ func (o *Orchestrator) openConfirmedDownOnce(site db.Site, changeTime time.Time, // promoteToDown bumps an open Seems Down event to Down (severity 4) and // projects site_status=SITE_CONFIRMED_DOWN in the same transaction. -func (o *Orchestrator) promoteToDown(blogID, eventID int64, changeTime time.Time, meta json.RawMessage) error { - return o.withEventMutationRetry(blogID, "promote_to_down", func() error { - return o.promoteToDownOnce(blogID, eventID, changeTime, meta) +func (o *Orchestrator) promoteToDown(site db.Site, eventID int64, changeTime time.Time, meta json.RawMessage) error { + return o.withEventMutationRetry(site.BlogID, "promote_to_down", func() error { + return o.promoteToDownOnce(site, eventID, changeTime, meta) }) } -func (o *Orchestrator) promoteToDownOnce(blogID, eventID int64, changeTime time.Time, meta json.RawMessage) error { +func (o *Orchestrator) promoteToDownOnce(site db.Site, eventID int64, changeTime time.Time, meta json.RawMessage) error { tx, err := o.ev().Begin(o.ctx) if err != nil { return err @@ -2551,7 +2559,7 @@ func (o *Orchestrator) promoteToDownOnce(blogID, eventID int64, changeTime time. } if config.LegacyStatusProjectionEnabled() && tx.Tx() != nil { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), blogID, statusConfirmedDown, changeTime); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, statusConfirmedDown, changeTime); err != nil { return fmt.Errorf("project site_status: %w", err) } } @@ -2560,13 +2568,13 @@ func (o *Orchestrator) promoteToDownOnce(blogID, eventID int64, changeTime time. // closeEvent closes an open event with the given resolution reason and projects // site_status to the given v1 value in the same transaction. -func (o *Orchestrator) closeEvent(blogID, eventID int64, reason string, projectedStatus int, changeTime time.Time, meta json.RawMessage) error { - return o.withEventMutationRetry(blogID, "close_event", func() error { - return o.closeEventOnce(blogID, eventID, reason, projectedStatus, changeTime, meta) +func (o *Orchestrator) closeEvent(site db.Site, eventID int64, reason string, projectedStatus int, changeTime time.Time, meta json.RawMessage) error { + return o.withEventMutationRetry(site.BlogID, "close_event", func() error { + return o.closeEventOnce(site, eventID, reason, projectedStatus, changeTime, meta) }) } -func (o *Orchestrator) closeEventOnce(blogID, eventID int64, reason string, projectedStatus int, changeTime time.Time, meta json.RawMessage) error { +func (o *Orchestrator) closeEventOnce(site db.Site, eventID int64, reason string, projectedStatus int, changeTime time.Time, meta json.RawMessage) error { tx, err := o.ev().Begin(o.ctx) if err != nil { return err @@ -2578,7 +2586,7 @@ func (o *Orchestrator) closeEventOnce(blogID, eventID int64, reason string, proj } if config.LegacyStatusProjectionEnabled() && tx.Tx() != nil { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), blogID, projectedStatus, changeTime); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, projectedStatus, changeTime); err != nil { return fmt.Errorf("project site_status: %w", err) } } @@ -2591,13 +2599,13 @@ func (o *Orchestrator) closeEventOnce(blogID, eventID int64, reason string, proj // retry entry) it is used directly; otherwise the active event is looked up // inside the transaction. site_status is projected back to SITE_RUNNING in the // same tx. -func (o *Orchestrator) closeRecoveredEvent(blogID, knownEventID int64, changeTime time.Time, res checker.Result) error { - return o.withEventMutationRetry(blogID, "close_recovered_event", func() error { - return o.closeRecoveredEventOnce(blogID, knownEventID, changeTime, res) +func (o *Orchestrator) closeRecoveredEvent(site db.Site, knownEventID int64, changeTime time.Time, res checker.Result) error { + return o.withEventMutationRetry(site.BlogID, "close_recovered_event", func() error { + return o.closeRecoveredEventOnce(site, knownEventID, changeTime, res) }) } -func (o *Orchestrator) closeRecoveredEventOnce(blogID, knownEventID int64, changeTime time.Time, res checker.Result) error { +func (o *Orchestrator) closeRecoveredEventOnce(site db.Site, knownEventID int64, changeTime time.Time, res checker.Result) error { tx, err := o.ev().Begin(o.ctx) if err != nil { return err @@ -2617,13 +2625,13 @@ func (o *Orchestrator) closeRecoveredEventOnce(blogID, knownEventID int64, chang return fmt.Errorf("read event state: %w", err) } case tx.Tx() != nil: - ae, err := tx.FindActiveByBlog(o.ctx, blogID, checkTypeHTTP) + ae, err := tx.FindActive(o.ctx, httpEventIdentity(site)) if err != nil { if errors.Is(err, eventstore.ErrEventNotFound) { // site_status disagreed with the event store (no open event but // projection said non-running). Just project back to running. if config.LegacyStatusProjectionEnabled() { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), blogID, statusRunning, changeTime); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, statusRunning, changeTime); err != nil { return fmt.Errorf("project site_status: %w", err) } } @@ -2648,14 +2656,14 @@ func (o *Orchestrator) closeRecoveredEventOnce(blogID, knownEventID int64, chang return fmt.Errorf("close event: %w", err) } if config.LegacyStatusProjectionEnabled() && tx.Tx() != nil { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), blogID, statusRunning, changeTime); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, statusRunning, changeTime); err != nil { return fmt.Errorf("project site_status: %w", err) } } return tx.Commit() } -func (o *Orchestrator) closeMaintenanceEvent(blogID, knownEventID int64, changeTime time.Time, meta json.RawMessage) error { +func (o *Orchestrator) closeMaintenanceEvent(site db.Site, knownEventID int64, changeTime time.Time, meta json.RawMessage) error { tx, err := o.ev().Begin(o.ctx) if err != nil { return err @@ -2667,11 +2675,11 @@ func (o *Orchestrator) closeMaintenanceEvent(blogID, knownEventID int64, changeT case knownEventID > 0 && tx.Tx() != nil: eventID = knownEventID case tx.Tx() != nil: - ae, err := tx.FindActiveByBlog(o.ctx, blogID, checkTypeHTTP) + ae, err := tx.FindActive(o.ctx, httpEventIdentity(site)) if err != nil { if errors.Is(err, eventstore.ErrEventNotFound) { if config.LegacyStatusProjectionEnabled() { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), blogID, statusRunning, changeTime); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, statusRunning, changeTime); err != nil { return fmt.Errorf("project site_status: %w", err) } } @@ -2688,7 +2696,7 @@ func (o *Orchestrator) closeMaintenanceEvent(blogID, knownEventID int64, changeT return fmt.Errorf("close event: %w", err) } if config.LegacyStatusProjectionEnabled() && tx.Tx() != nil { - if err := db.UpdateSiteStatusTx(o.ctx, tx.Tx(), blogID, statusRunning, changeTime); err != nil { + if err := db.UpdateSiteStatusTxForMonitorSite(o.ctx, tx.Tx(), site.ID, site.BlogID, statusRunning, changeTime); err != nil { return fmt.Errorf("project site_status: %w", err) } } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 006694b3..e518282e 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -46,6 +46,40 @@ func TestIsAlertSuppressedUsesLastAlertSent(t *testing.T) { } } +func TestDuplicateBlogMonitorRowsUseMonitorSiteIdentity(t *testing.T) { + sites := []db.Site{ + {ID: 10, BlogID: 42, MonitorURL: "https://example.com/"}, + {ID: 11, BlogID: 42, MonitorURL: "https://example.com/path"}, + } + filtered := filterUnseenSites(sites, map[int64]struct{}{}) + if len(filtered) != 2 { + t.Fatalf("filterUnseenSites returned %d sites, want 2", len(filtered)) + } + + req := checkRequestForSite(&config.Config{}, sites[0]) + if req.MonitorSiteID != 10 || req.BlogID != 42 { + t.Fatalf("request identity = monitor_site_id:%d blog_id:%d, want 10/42", req.MonitorSiteID, req.BlogID) + } + + siteMap := map[int64]db.Site{ + monitorTargetID(sites[0]): sites[0], + monitorTargetID(sites[1]): sites[1], + } + results := map[int64]checker.Result{ + 10: {MonitorSiteID: 10, BlogID: 42, Success: true}, + 11: {MonitorSiteID: 11, BlogID: 42, Success: true}, + } + records := knownSiteResults(results, siteMap) + if len(records) != 2 { + t.Fatalf("knownSiteResults returned %d records, want 2", len(records)) + } + + identity := httpEventIdentity(sites[0]) + if identity.EndpointID == nil || *identity.EndpointID != 10 { + t.Fatalf("httpEventIdentity endpoint = %v, want 10", identity.EndpointID) + } +} + func TestTimeoutForSite(t *testing.T) { cfg := &config.Config{NetCommsTimeout: 10} @@ -732,7 +766,7 @@ func stubOrchestratorDeps() func() { dbMarkHostDraining = func(context.Context, string) error { return nil } dbGetSitesForBucket = func(context.Context, int, int, int, bool) ([]db.Site, error) { return nil, nil } dbUpdateSiteStatus = func(context.Context, int64, int, time.Time) error { return nil } - dbGetSiteStatus = func(context.Context, int64) (int, error) { return statusRunning, nil } + dbGetSiteStatus = func(context.Context, int64, int64) (int, error) { return statusRunning, nil } dbUpdateLastAlertSent = func(context.Context, int64, time.Time) error { return nil } dbRecordFalsePositive = func(int64, int, int, int64) error { return nil } dbMarkSiteChecked = func(context.Context, int64, time.Time, time.Time) error { return nil } diff --git a/internal/orchestrator/retry.go b/internal/orchestrator/retry.go index 72ca91af..73f3cd6d 100644 --- a/internal/orchestrator/retry.go +++ b/internal/orchestrator/retry.go @@ -9,6 +9,7 @@ import ( // retryEntry tracks local retry state for a site that has failed at least once. type retryEntry struct { + targetID int64 blogID int64 url string failCount int @@ -40,14 +41,16 @@ func (q *retryQueue) record(res checker.Result) *retryEntry { q.mu.Lock() defer q.mu.Unlock() - e, exists := q.entries[res.BlogID] + targetID := checkResultTargetID(res) + e, exists := q.entries[targetID] if !exists { e = &retryEntry{ + targetID: targetID, blogID: res.BlogID, url: res.URL, firstFailAt: res.Timestamp, } - q.entries[res.BlogID] = e + q.entries[targetID] = e } e.failCount++ e.lastResult = res @@ -56,45 +59,45 @@ func (q *retryQueue) record(res checker.Result) *retryEntry { } // clear removes a site from the retry queue (site recovered or confirmed down). -func (q *retryQueue) clear(blogID int64) { +func (q *retryQueue) clear(targetID int64) { q.mu.Lock() defer q.mu.Unlock() - delete(q.entries, blogID) + delete(q.entries, targetID) } -func (q *retryQueue) markRecovered(blogID int64, recoveredAt time.Time) { +func (q *retryQueue) markRecovered(targetID int64, recoveredAt time.Time) { q.mu.Lock() defer q.mu.Unlock() if recoveredAt.IsZero() { recoveredAt = time.Now().UTC() } - q.recentRecoveries[blogID] = recoveredAt.UTC() + q.recentRecoveries[targetID] = recoveredAt.UTC() } -func (q *retryQueue) recentlyRecovered(blogID int64, at time.Time, window time.Duration) bool { - return q.recentlyMarked(q.recentRecoveries, blogID, at, window) +func (q *retryQueue) recentlyRecovered(targetID int64, at time.Time, window time.Duration) bool { + return q.recentlyMarked(q.recentRecoveries, targetID, at, window) } -func (q *retryQueue) markFalseAlarm(blogID int64, falseAlarmAt time.Time) { +func (q *retryQueue) markFalseAlarm(targetID int64, falseAlarmAt time.Time) { q.mu.Lock() defer q.mu.Unlock() if falseAlarmAt.IsZero() { falseAlarmAt = time.Now().UTC() } - q.recentFalseAlarms[blogID] = falseAlarmAt.UTC() + q.recentFalseAlarms[targetID] = falseAlarmAt.UTC() } -func (q *retryQueue) recentlyFalseAlarmed(blogID int64, at time.Time, window time.Duration) bool { - return q.recentlyMarked(q.recentFalseAlarms, blogID, at, window) +func (q *retryQueue) recentlyFalseAlarmed(targetID int64, at time.Time, window time.Duration) bool { + return q.recentlyMarked(q.recentFalseAlarms, targetID, at, window) } -func (q *retryQueue) recentlyMarked(markers map[int64]time.Time, blogID int64, at time.Time, window time.Duration) bool { +func (q *retryQueue) recentlyMarked(markers map[int64]time.Time, targetID int64, at time.Time, window time.Duration) bool { if window <= 0 { return false } q.mu.Lock() defer q.mu.Unlock() - markedAt, ok := markers[blogID] + markedAt, ok := markers[targetID] if !ok { return false } @@ -107,15 +110,15 @@ func (q *retryQueue) recentlyMarked(markers map[int64]time.Time, blogID int64, a if at.Sub(markedAt.UTC()) <= window { return true } - delete(markers, blogID) + delete(markers, targetID) return false } // get returns the entry for a site, or nil if not in the queue. -func (q *retryQueue) get(blogID int64) *retryEntry { +func (q *retryQueue) get(targetID int64) *retryEntry { q.mu.Lock() defer q.mu.Unlock() - return q.entries[blogID] + return q.entries[targetID] } // allBlogIDs returns the blog IDs of all sites currently in retry. diff --git a/internal/orchestrator/streaming.go b/internal/orchestrator/streaming.go index 987626d5..225e6296 100644 --- a/internal/orchestrator/streaming.go +++ b/internal/orchestrator/streaming.go @@ -100,8 +100,8 @@ type streamingDueWheel struct { } type streamingDueBucket struct { - dueUnix int64 - blogIDs []int64 + dueUnix int64 + targetIDs []int64 } func newStreamingDueWheel(capacity int) streamingDueWheel { @@ -111,11 +111,11 @@ func newStreamingDueWheel(capacity int) streamingDueWheel { return streamingDueWheel{buckets: make(map[int64][]int64, capacity)} } -func (w *streamingDueWheel) schedule(dueUnix, blogID int64) { +func (w *streamingDueWheel) schedule(dueUnix, targetID int64) { if w.buckets == nil { w.buckets = make(map[int64][]int64) } - w.buckets[dueUnix] = append(w.buckets[dueUnix], blogID) + w.buckets[dueUnix] = append(w.buckets[dueUnix], targetID) if w.nextDueUnix == 0 || dueUnix < w.nextDueUnix { w.nextDueUnix = dueUnix } @@ -145,11 +145,11 @@ func (w *streamingDueWheel) popReady(nowUnix int64) []streamingDueBucket { }) ready := make([]streamingDueBucket, 0, len(readyTimes)) for _, dueUnix := range readyTimes { - blogIDs := w.buckets[dueUnix] + targetIDs := w.buckets[dueUnix] delete(w.buckets, dueUnix) ready = append(ready, streamingDueBucket{ - dueUnix: dueUnix, - blogIDs: blogIDs, + dueUnix: dueUnix, + targetIDs: targetIDs, }) } w.refreshNextDue() @@ -204,8 +204,9 @@ func streamingDueWheelInitialCapacity(sites []db.Site) int { func (p *streamingPlanner) merge(sites []db.Site, now time.Time) (added, updated, removed int) { seen := make(map[int64]struct{}, len(sites)) for _, site := range sites { - seen[site.BlogID] = struct{}{} - if target, ok := p.targets[site.BlogID]; ok { + targetID := monitorTargetID(site) + seen[targetID] = struct{}{} + if target, ok := p.targets[targetID]; ok { if !streamingSiteCheckConfigEqual(target.site, site) { target.checkRequestDirty = true } @@ -222,7 +223,7 @@ func (p *streamingPlanner) merge(sites []db.Site, now time.Time) (added, updated if site.LastCheckedAt != nil { target.lastProjectedAt = site.LastCheckedAt.UTC() } - p.targets[site.BlogID] = target + p.targets[targetID] = target p.scheduleAt(target, initialStreamingDueAt(site, now)) added++ } @@ -295,7 +296,7 @@ func (p *streamingPlanner) scheduleAfterResult(target *streamingTarget, res chec func (p *streamingPlanner) scheduleAtNextPhaseAfter(target *streamingTarget, after time.Time) { interval := streamingCheckCadence(target.site) - phase := streamingPhaseOffset(target.site.BlogID, interval) + phase := streamingPhaseOffset(monitorTargetID(target.site), interval) p.scheduleAt(target, nextStreamingPhaseAt(after.Add(time.Second), interval, phase)) } @@ -303,15 +304,15 @@ func (p *streamingPlanner) scheduleAt(target *streamingTarget, dueAt time.Time) dueAt = dueAt.UTC().Truncate(time.Second) target.dueAt = dueAt dueUnix := dueAt.Unix() - p.due.schedule(dueUnix, target.site.BlogID) + p.due.schedule(dueUnix, monitorTargetID(target.site)) } func (p *streamingPlanner) popDue(now time.Time) []*streamingTarget { nowUnix := now.UTC().Unix() var due []*streamingTarget for _, bucket := range p.due.popReady(nowUnix) { - for _, blogID := range bucket.blogIDs { - target, ok := p.targets[blogID] + for _, targetID := range bucket.targetIDs { + target, ok := p.targets[targetID] if !ok || !target.active || target.inFlight || target.queued || target.dueAt.Unix() != bucket.dueUnix { continue } @@ -438,6 +439,7 @@ type streamingSideEffectJob struct { } type streamingSideEffectReport struct { + targetID int64 blogID int64 status int resultFailure bool @@ -479,8 +481,8 @@ func (o *Orchestrator) newStreamingSideEffectProcessor(shards, queueCap int) *st func (p *streamingSideEffectProcessor) runShard(o *Orchestrator, jobs <-chan streamingSideEffectJob) { defer p.wg.Done() - statusByBlog := make(map[int64]int) - sslExpiryByBlog := make(map[int64]*time.Time) + statusByTarget := make(map[int64]int) + sslExpiryByTarget := make(map[int64]*time.Time) historyRows := make([]db.CheckHistoryRow, 0, streamingHistoryBatchSize) historyTicker := time.NewTicker(streamingHistoryFlushInterval) defer historyTicker.Stop() @@ -513,19 +515,20 @@ func (p *streamingSideEffectProcessor) runShard(o *Orchestrator, jobs <-chan str return } site := job.site - if status, ok := statusByBlog[site.BlogID]; ok { + targetID := monitorTargetID(site) + if status, ok := statusByTarget[targetID]; ok { site.SiteStatus = status } - if expiry, ok := sslExpiryByBlog[site.BlogID]; ok { + if expiry, ok := sslExpiryByTarget[targetID]; ok { site.SSLExpiryDate = expiry } summary, updated := o.processStreamingSideEffects(site, job.res) - statusByBlog[site.BlogID] = updated.SiteStatus + statusByTarget[targetID] = updated.SiteStatus if updated.SSLExpiryDate != nil { expiry := *updated.SSLExpiryDate - sslExpiryByBlog[site.BlogID] = &expiry + sslExpiryByTarget[targetID] = &expiry } else { - delete(sslExpiryByBlog, site.BlogID) + delete(sslExpiryByTarget, targetID) } if job.res.IsFailure() { historyRows = append(historyRows, checkHistoryRowForResult(site.BlogID, job.res)) @@ -535,6 +538,7 @@ func (p *streamingSideEffectProcessor) runShard(o *Orchestrator, jobs <-chan str } select { case p.reports <- streamingSideEffectReport{ + targetID: targetID, blogID: site.BlogID, status: updated.SiteStatus, resultFailure: job.res.IsFailure(), @@ -552,7 +556,7 @@ func (p *streamingSideEffectProcessor) enqueue(job streamingSideEffectJob) bool if len(p.shards) == 0 { return false } - ch := p.shards[streamingSideEffectShard(job.site.BlogID, len(p.shards))] + ch := p.shards[streamingSideEffectShard(monitorTargetID(job.site), len(p.shards))] select { case ch <- job: return true @@ -565,7 +569,7 @@ func (p *streamingSideEffectProcessor) tryEnqueue(job streamingSideEffectJob) bo if len(p.shards) == 0 { return false } - ch := p.shards[streamingSideEffectShard(job.site.BlogID, len(p.shards))] + ch := p.shards[streamingSideEffectShard(monitorTargetID(job.site), len(p.shards))] select { case ch <- job: return true @@ -764,7 +768,8 @@ func (o *Orchestrator) runStreamingEngine() { } handleResult := func(res checker.Result, now time.Time) { - target, ok := planner.targets[res.BlogID] + targetID := checkResultTargetID(res) + target, ok := planner.targets[targetID] if !ok || !target.inFlight { stats.staleResults++ return @@ -795,7 +800,7 @@ func (o *Orchestrator) runStreamingEngine() { return } } - pendingSideEffects[target.site.BlogID]++ + pendingSideEffects[targetID]++ } planner.scheduleAfterResult(target, res, checkedAt, streamingAllowImmediateRetry(target, res, o.retries, pressureActive)) o.queueStreamingProjection(cfg, target, checkedAt, now, pendingProjection) @@ -939,14 +944,14 @@ func (o *Orchestrator) runStreamingEngine() { handleProjectionFlushResult(result) case report := <-sideEffects.reportsChannel(): stats.addSideEffects(report.summary) - if report.blogID != 0 { - if pendingSideEffects[report.blogID] <= 1 { - delete(pendingSideEffects, report.blogID) + if report.targetID != 0 { + if pendingSideEffects[report.targetID] <= 1 { + delete(pendingSideEffects, report.targetID) } else { - pendingSideEffects[report.blogID]-- + pendingSideEffects[report.targetID]-- } - sideEffectStatus[report.blogID] = report.status - if target, ok := planner.targets[report.blogID]; ok { + sideEffectStatus[report.targetID] = report.status + if target, ok := planner.targets[report.targetID]; ok { target.site.SiteStatus = report.status rescheduleStreamingAfterSideEffect(planner, target, report) } @@ -1004,11 +1009,11 @@ func (o *Orchestrator) loadStreamingSites(cfg *config.Config) ([]db.Site, error) func (o *Orchestrator) loadStreamingSitesForRange(ctx context.Context, cfg *config.Config, bucketMin, bucketMax int) ([]db.Site, error) { pageSize := streamingLoadPageSize(cfg) var ( - afterBlogID int64 - sites []db.Site + afterMonitorSiteID int64 + sites []db.Site ) for { - page, err := dbListActiveSites(ctx, bucketMin, bucketMax, afterBlogID, pageSize) + page, err := dbListActiveSites(ctx, bucketMin, bucketMax, afterMonitorSiteID, pageSize) if err != nil { return nil, err } @@ -1016,7 +1021,7 @@ func (o *Orchestrator) loadStreamingSitesForRange(ctx context.Context, cfg *conf return sites, nil } sites = append(sites, page...) - afterBlogID = page[len(page)-1].BlogID + afterMonitorSiteID = page[len(page)-1].ID if len(page) < pageSize { return sites, nil } @@ -1079,9 +1084,9 @@ func (o *Orchestrator) processStreamingSideEffects(site db.Site, res checker.Res site.SiteStatus = statusRunning } else { failureActive := o.handleFailure(site, res) - if retry := o.retries.get(site.BlogID); retry != nil && (failureActive || retry.eventID > 0) { + if retry := o.retries.get(monitorTargetID(site)); retry != nil && (failureActive || retry.eventID > 0) { site.SiteStatus = statusDown - } else if status, err := dbGetSiteStatus(o.ctx, site.BlogID); err != nil { + } else if status, err := dbGetSiteStatus(o.ctx, site.ID, site.BlogID); err != nil { log.Printf("orchestrator: streaming refresh site status blog_id=%d: %v", site.BlogID, err) } else { site.SiteStatus = status @@ -1126,15 +1131,15 @@ func streamingSideEffectsNeeded(target *streamingTarget, res checker.Result, pen if streamingSideEffectsSuppressedByPressure(target, res, pending, statusCache, retries, pressure) { return false } - blogID := target.site.BlogID - if pending[blogID] > 0 { + targetID := monitorTargetID(target.site) + if pending[targetID] > 0 { return true } status := target.site.SiteStatus - if cached, ok := statusCache[blogID]; ok { + if cached, ok := statusCache[targetID]; ok { status = cached } - retrying := retries != nil && retries.get(blogID) != nil + retrying := retries != nil && retries.get(targetID) != nil if res.IsFailure() || res.TLSVersion != 0 || res.SSLExpiry != nil { return true } @@ -1148,15 +1153,15 @@ func streamingSideEffectsSuppressedByPressure(target *streamingTarget, res check if !pressure || target == nil { return false } - blogID := target.site.BlogID - if pending[blogID] > 0 { + targetID := monitorTargetID(target.site) + if pending[targetID] > 0 { return false } status := target.site.SiteStatus - if cached, ok := statusCache[blogID]; ok { + if cached, ok := statusCache[targetID]; ok { status = cached } - retrying := retries != nil && retries.get(blogID) != nil + retrying := retries != nil && retries.get(targetID) != nil return status == statusRunning && !retrying && streamingLocalPressureFailure(res) } @@ -1186,7 +1191,7 @@ func streamingAllowImmediateRetry(target *streamingTarget, res checker.Result, r if target.site.SiteStatus != statusRunning { return true } - return retries != nil && retries.get(target.site.BlogID) != nil + return retries != nil && retries.get(monitorTargetID(target.site)) != nil } func streamingSuppressPostRecoveryImmediateRetry(target *streamingTarget, res checker.Result, retries *retryQueue) bool { @@ -1799,7 +1804,7 @@ func streamingDispatchFastCatchup(maxLag time.Duration, resultDepth, workerTarge func initialStreamingDueAt(site db.Site, now time.Time) time.Time { interval := streamingCheckCadence(site) - phase := streamingPhaseOffset(site.BlogID, interval) + phase := streamingPhaseOffset(monitorTargetID(site), interval) return nextStreamingPhaseAt(now, interval, phase) } diff --git a/internal/orchestrator/streaming_test.go b/internal/orchestrator/streaming_test.go index a75eea92..aaf9c5a5 100644 --- a/internal/orchestrator/streaming_test.go +++ b/internal/orchestrator/streaming_test.go @@ -14,7 +14,7 @@ import ( func TestStreamingPhaseStaysInsideInterval(t *testing.T) { now := time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC) - site := db.Site{BlogID: 12345, CheckInterval: 5} + site := db.Site{ID: 98765, BlogID: 12345, CheckInterval: 5} due := initialStreamingDueAt(site, now) if due.Before(now) { @@ -24,11 +24,28 @@ func TestStreamingPhaseStaysInsideInterval(t *testing.T) { t.Fatalf("initialStreamingDueAt() delay = %s, want < 5m", due.Sub(now)) } cadence := streamingCheckCadence(site) - if got := due.Unix() % int64(cadence/time.Second); got != streamingPhaseOffset(site.BlogID, cadence) { + if got := due.Unix() % int64(cadence/time.Second); got != streamingPhaseOffset(site.ID, cadence) { t.Fatalf("due phase = %d, want stable phase", got) } } +func TestStreamingPlannerKeepsDuplicateBlogEndpoints(t *testing.T) { + now := time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC) + planner := newStreamingPlanner([]db.Site{ + {ID: 10, BlogID: 42, MonitorURL: "https://example.com/", CheckInterval: 5}, + {ID: 11, BlogID: 42, MonitorURL: "https://example.com/path", CheckInterval: 5}, + }, now) + if got := planner.activeCount(); got != 2 { + t.Fatalf("activeCount = %d, want 2", got) + } + if _, ok := planner.targets[10]; !ok { + t.Fatal("planner missing monitor_site_id 10") + } + if _, ok := planner.targets[11]; !ok { + t.Fatal("planner missing monitor_site_id 11") + } +} + func TestStreamingPlannerPopDueSkipsQueuedAndInflightTargets(t *testing.T) { now := time.Date(2026, 5, 10, 12, 0, 0, 0, time.UTC) planner := &streamingPlanner{ diff --git a/internal/veriflier/server.go b/internal/veriflier/server.go index c9145284..90ec8f70 100644 --- a/internal/veriflier/server.go +++ b/internal/veriflier/server.go @@ -140,6 +140,12 @@ func (s *Server) handleCheck(w http.ResponseWriter, r *http.Request) { // audit row it wrote when escalating. log.Printf("veriflier: check blog_id=%d request_id=%s url=%s", site.BlogID, site.RequestID, site.URL) res := s.checkFn(site) + if res.MonitorSiteID == 0 { + res.MonitorSiteID = site.MonitorSiteID + } + if res.BlogID == 0 { + res.BlogID = site.BlogID + } res.Host = s.hostname res.RequestID = site.RequestID results = append(results, res) diff --git a/internal/veriflier/types.go b/internal/veriflier/types.go index 9de72f58..42721540 100644 --- a/internal/veriflier/types.go +++ b/internal/veriflier/types.go @@ -11,6 +11,7 @@ package veriflier // that "the orchestrator escalated → this verifier observed → this audit row // in the monitor DB" can be reconstructed without timestamp matching. type CheckRequest struct { + MonitorSiteID int64 BlogID int64 URL string Method string @@ -30,12 +31,13 @@ type CheckRequest struct { // CheckResult is a single check outcome returned by the Veriflier. type CheckResult struct { - BlogID int64 - URL string - Host string - Success bool - HTTPCode int32 - ErrorCode int32 - RTTMs int64 - RequestID string // echoed from CheckRequest.RequestID + MonitorSiteID int64 + BlogID int64 + URL string + Host string + Success bool + HTTPCode int32 + ErrorCode int32 + RTTMs int64 + RequestID string // echoed from CheckRequest.RequestID } diff --git a/internal/veriflier/veriflier_test.go b/internal/veriflier/veriflier_test.go index 69e7e98a..477cb3ef 100644 --- a/internal/veriflier/veriflier_test.go +++ b/internal/veriflier/veriflier_test.go @@ -33,12 +33,12 @@ func checkReqBody(t *testing.T, sites []CheckRequest) *bytes.Buffer { func TestServerHandleCheckSuccess(t *testing.T) { _, ts := newTestServer(func(req CheckRequest) CheckResult { - return CheckResult{BlogID: req.BlogID, Success: true, HTTPCode: 200} + return CheckResult{Success: true, HTTPCode: 200} }) defer ts.Close() req, _ := http.NewRequest(http.MethodPost, ts.URL+"/check", checkReqBody(t, []CheckRequest{ - {BlogID: 42, URL: "https://example.com"}, + {MonitorSiteID: 1234, BlogID: 42, URL: "https://example.com"}, })) req.Header.Set("Authorization", "Bearer secret") req.Header.Set("Content-Type", "application/json") @@ -68,6 +68,9 @@ func TestServerHandleCheckSuccess(t *testing.T) { if result.Results[0].BlogID != 42 { t.Fatalf("BlogID = %d, want 42", result.Results[0].BlogID) } + if result.Results[0].MonitorSiteID != 1234 { + t.Fatalf("MonitorSiteID = %d, want 1234", result.Results[0].MonitorSiteID) + } } func TestServerHandleCheckUnauthorized(t *testing.T) { diff --git a/veriflier2/cmd/main.go b/veriflier2/cmd/main.go index 7645d818..67ce2fa5 100644 --- a/veriflier2/cmd/main.go +++ b/veriflier2/cmd/main.go @@ -96,6 +96,7 @@ func main() { // performCheck runs a single HTTP check and returns the result for the server. func performCheck(req veriflier.CheckRequest) veriflier.CheckResult { res := checker.Check(context.Background(), checker.Request{ + MonitorSiteID: req.MonitorSiteID, BlogID: req.BlogID, URL: req.URL, Method: req.Method, @@ -113,12 +114,13 @@ func performCheck(req veriflier.CheckRequest) veriflier.CheckResult { }) return veriflier.CheckResult{ - BlogID: res.BlogID, - URL: res.URL, - Success: res.Success, - HTTPCode: int32(res.HTTPCode), - ErrorCode: int32(res.ErrorCode), - RTTMs: res.RTT.Milliseconds(), + MonitorSiteID: res.MonitorSiteID, + BlogID: res.BlogID, + URL: res.URL, + Success: res.Success, + HTTPCode: int32(res.HTTPCode), + ErrorCode: int32(res.ErrorCode), + RTTMs: res.RTT.Milliseconds(), } }