Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/adr/0009-streaming-monitor-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ v2 sidecar config tables, but the new table is intentionally additive so later
iterations can move derived scheduling state out of the legacy path without
breaking rollback.

HTTP monitor identity is the legacy row id, not just `blog_id`. Production
datasets can contain multiple active monitor URLs for one blog, so streaming
planner targets, retry state, and future `jetmon_check_targets` sync must key
by `jetpack_monitor_sites.jetpack_monitor_site_id` / `source_site_id` when that
row id is available. `blog_id` remains the WPCOM/site identity used for
notifications and site-level API views.

## Compatibility

`jetpack_monitor_sites` remains the source of truth for v1-owned site identity,
Expand Down
10 changes: 10 additions & 0 deletions docs/data-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ from `jetpack_monitor_sites`. That keeps correctness and rollback behavior easy
to validate before moving config-sync reads fully onto the v2-native target
table in a later scaling branch.

Production data can contain more than one active monitor URL for the same
`blog_id`. Monitor execution therefore treats
`jetpack_monitor_sites.jetpack_monitor_site_id` as the endpoint identity for
HTTP checks while retaining `blog_id` as the WPCOM/site identity. HTTP events
write that row id to `jetmon_events.endpoint_id`, scheduler/retry in-memory
state keys by the row id when available, and the v1 compatibility projection is
updated by `jetpack_monitor_site_id` so two active URLs for one site do not
overwrite each other's rollout state. The v2-native target table is unique on
`source_site_id` for the same reason.

## Process Health

`jetmon_process_health` is the durable source for fleet-level operator views.
Expand Down
5 changes: 5 additions & 0 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ production telemetry branches:
first prototype still reloads active identity/cadence from
`jetpack_monitor_sites` plus v2 sidecar config so correctness can be
validated before optimizing config-sync reads.
- [x] Preserve duplicate active monitor URLs for the same `blog_id` by carrying
`jetpack_monitor_site_id` through local checks, Veriflier RPCs, streaming
planner state, retry state, HTTP event identity, and legacy projection
writes. `blog_id` remains the site/WPCOM identity, while the legacy row id is
the endpoint identity for monitor execution.
- [x] Evaluate whether any remaining single-column `blog_id` index is needed
on `jetpack_monitor_sites` after sidecar-table rollout. PR #101 added
`idx_monitor_blog_id` for legacy-table point writes, but current rollout
Expand Down
3 changes: 3 additions & 0 deletions internal/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ func isLocalResolverHost(host string) bool {

// Request holds the parameters for a single HTTP check.
type Request struct {
MonitorSiteID int64
BlogID int64
URL string
Method string
Expand All @@ -637,6 +638,7 @@ type Request struct {

// Result holds the outcome of a single HTTP check.
type Result struct {
MonitorSiteID int64
BlogID int64
URL string
Method string
Expand Down Expand Up @@ -722,6 +724,7 @@ func Check(ctx context.Context, req Request) Result {
profile = checkmode.EffectiveProfile(method, profile)

res := Result{
MonitorSiteID: req.MonitorSiteID,
BlogID: req.BlogID,
URL: req.URL,
Method: method,
Expand Down
8 changes: 8 additions & 0 deletions internal/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,14 @@ var migrations = []migration{
ADD COLUMN timeout_seconds TINYINT UNSIGNED NULL AFTER custom_headers,
ADD COLUMN redirect_policy ENUM('follow','alert','fail') NULL DEFAULT NULL AFTER timeout_seconds,
ADD COLUMN alert_cooldown_minutes SMALLINT UNSIGNED NULL AFTER redirect_policy`},

// Migration 39 prepares the v2-native target table for production rows
// where one blog_id has multiple active monitor URLs. The legacy table's
// primary row id is the durable endpoint identity, so target sync must be
// unique on source_site_id rather than collapsing all endpoints for a blog.
{39, `ALTER TABLE jetmon_check_targets
DROP INDEX uk_blog_id,
ADD UNIQUE KEY uk_source_site_id (source_site_id)`},
}

// Migrate applies all pending migrations idempotently.
Expand Down
55 changes: 51 additions & 4 deletions internal/db/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func GetSitesForBucket(ctx context.Context, bucketMin, bucketMax, batchSize int,
// scheduler. It intentionally ignores last_checked_at and next_check_at: those
// are legacy scheduler projections, while streaming mode maintains due time in
// memory and writes coarse rollback freshness separately.
func ListActiveSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int, afterBlogID int64, limit int) ([]Site, error) {
func ListActiveSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int, afterMonitorSiteID int64, limit int) ([]Site, error) {
if limit <= 0 {
limit = 5000
}
Expand All @@ -76,10 +76,10 @@ func ListActiveSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int
LEFT JOIN jetmon_site_runtime r ON r.blog_id = s.blog_id
WHERE s.monitor_active = 1
AND s.bucket_no BETWEEN ? AND ?
AND s.blog_id > ?
ORDER BY s.blog_id ASC
AND s.jetpack_monitor_site_id > ?
ORDER BY s.jetpack_monitor_site_id ASC
LIMIT ?`,
bucketMin, bucketMax, afterBlogID, limit,
bucketMin, bucketMax, afterMonitorSiteID, limit,
)
if err != nil {
return nil, fmt.Errorf("query active sites: %w", err)
Expand Down Expand Up @@ -189,6 +189,20 @@ func CountDueSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int,

// UpdateSiteStatus updates site_status and last_status_change for a site.
func UpdateSiteStatus(ctx context.Context, blogID int64, status int, changedAt time.Time) error {
return UpdateSiteStatusForMonitorSite(ctx, 0, blogID, status, changedAt)
}

// UpdateSiteStatusForMonitorSite updates the legacy projection for one monitor
// row when monitorSiteID is known, falling back to the historical blog_id update
// for callers that are still site-level.
func UpdateSiteStatusForMonitorSite(ctx context.Context, monitorSiteID, blogID int64, status int, changedAt time.Time) error {
if monitorSiteID > 0 {
_, err := db.ExecContext(ctx,
`UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE jetpack_monitor_site_id = ?`,
status, changedAt.UTC(), monitorSiteID,
)
return err
}
_, err := db.ExecContext(ctx,
`UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE blog_id = ?`,
status, changedAt.UTC(), blogID,
Expand All @@ -200,7 +214,24 @@ func UpdateSiteStatus(ctx context.Context, blogID int64, status int, changedAt t
// mode uses this sparingly after verifier escalation so its in-memory target
// state does not send a recovery notification after a false alarm.
func GetSiteStatus(ctx context.Context, blogID int64) (int, error) {
return GetSiteStatusForMonitorSite(ctx, 0, blogID)
}

// GetSiteStatusForMonitorSite reads the legacy status projection for one
// monitor row when monitorSiteID is known, falling back to the historical
// blog_id lookup for site-level callers.
func GetSiteStatusForMonitorSite(ctx context.Context, monitorSiteID, blogID int64) (int, error) {
var status int
if monitorSiteID > 0 {
err := db.QueryRowContext(ctx,
`SELECT site_status FROM jetpack_monitor_sites WHERE jetpack_monitor_site_id = ?`,
monitorSiteID,
).Scan(&status)
if err != nil {
return 0, fmt.Errorf("get site status: %w", err)
}
return status, nil
}
err := db.QueryRowContext(ctx,
`SELECT site_status FROM jetpack_monitor_sites WHERE blog_id = ?`,
blogID,
Expand All @@ -214,6 +245,19 @@ func GetSiteStatus(ctx context.Context, blogID int64) (int, error) {
// UpdateSiteStatusTx is the transaction-aware variant of UpdateSiteStatus, used
// when the projection write must commit atomically with an event mutation.
func UpdateSiteStatusTx(ctx context.Context, tx *sql.Tx, blogID int64, status int, changedAt time.Time) error {
return UpdateSiteStatusTxForMonitorSite(ctx, tx, 0, blogID, status, changedAt)
}

// UpdateSiteStatusTxForMonitorSite is the transaction-aware variant of
// UpdateSiteStatusForMonitorSite.
func UpdateSiteStatusTxForMonitorSite(ctx context.Context, tx *sql.Tx, monitorSiteID, blogID int64, status int, changedAt time.Time) error {
if monitorSiteID > 0 {
_, err := tx.ExecContext(ctx,
`UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE jetpack_monitor_site_id = ?`,
status, changedAt.UTC(), monitorSiteID,
)
return err
}
_, err := tx.ExecContext(ctx,
`UPDATE jetpack_monitor_sites SET site_status = ?, last_status_change = ? WHERE blog_id = ?`,
status, changedAt.UTC(), blogID,
Expand All @@ -240,6 +284,7 @@ func CountLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax int) (
FROM jetpack_monitor_sites s
LEFT JOIN jetmon_events e
ON e.blog_id = s.blog_id
AND (e.endpoint_id = s.jetpack_monitor_site_id OR e.endpoint_id IS NULL)
AND e.check_type = 'http'
AND e.ended_at IS NULL
WHERE s.monitor_active = 1
Expand Down Expand Up @@ -317,6 +362,7 @@ func ListLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, limit
FROM jetpack_monitor_sites s
LEFT JOIN jetmon_events e
ON e.blog_id = s.blog_id
AND (e.endpoint_id = s.jetpack_monitor_site_id OR e.endpoint_id IS NULL)
AND e.check_type = 'http'
AND e.ended_at IS NULL
WHERE s.monitor_active = 1
Expand Down Expand Up @@ -396,6 +442,7 @@ func SummarizeLegacyProjectionDrift(ctx context.Context, bucketMin, bucketMax, l
FROM jetpack_monitor_sites s
LEFT JOIN jetmon_events e
ON e.blog_id = s.blog_id
AND (e.endpoint_id = s.jetpack_monitor_site_id OR e.endpoint_id IS NULL)
AND e.check_type = 'http'
AND e.ended_at IS NULL
WHERE s.monitor_active = 1
Expand Down
27 changes: 27 additions & 0 deletions internal/db/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,33 @@ func TestMarkSitesCheckedBatchesUpdates(t *testing.T) {
}
}

func TestMonitorSiteStatusUsesEndpointIdentity(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()

changedAt := time.Date(2026, 5, 13, 18, 0, 0, 0, time.UTC)
mock.ExpectExec("UPDATE jetpack_monitor_sites SET site_status").
WithArgs(2, changedAt, int64(1234)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT site_status FROM jetpack_monitor_sites").
WithArgs(int64(1234)).
WillReturnRows(sqlmock.NewRows([]string{"site_status"}).AddRow(2))

if err := UpdateSiteStatusForMonitorSite(context.Background(), 1234, 42, 2, changedAt); err != nil {
t.Fatalf("UpdateSiteStatusForMonitorSite: %v", err)
}
status, err := GetSiteStatusForMonitorSite(context.Background(), 1234, 42)
if err != nil {
t.Fatalf("GetSiteStatusForMonitorSite: %v", err)
}
if status != 2 {
t.Fatalf("status = %d, want 2", status)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet sql expectations: %v", err)
}
}

func TestMarkSitesCheckedRetriesDeadlock(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
Expand Down
28 changes: 27 additions & 1 deletion internal/eventstore/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,41 @@ type ActiveEvent struct {
// the event id cached (e.g. a recovery in a round after the open was forgotten
// across a process restart).
func (t *Tx) FindActiveByBlog(ctx context.Context, blogID int64, checkType string) (ActiveEvent, error) {
return t.FindActive(ctx, Identity{BlogID: blogID, CheckType: checkType})
}

// FindActive returns the open event for an identity. When EndpointID is set it
// prefers an endpoint-specific event but can fall back to a legacy site-level
// event so in-flight migrations can recover rows opened before endpoint
// identity was introduced.
func (t *Tx) FindActive(ctx context.Context, identity Identity) (ActiveEvent, error) {
if t.tx == nil {
return ActiveEvent{}, nil
}
var ae ActiveEvent
if identity.EndpointID != nil {
err := t.tx.QueryRowContext(ctx, `
SELECT id, severity, state FROM jetmon_events
WHERE blog_id = ?
AND check_type = ?
AND ended_at IS NULL
AND (endpoint_id = ? OR endpoint_id IS NULL)
ORDER BY endpoint_id IS NULL ASC, started_at ASC
LIMIT 1`, identity.BlogID, identity.CheckType, *identity.EndpointID,
).Scan(&ae.ID, &ae.Severity, &ae.State)
if errors.Is(err, sql.ErrNoRows) {
return ActiveEvent{}, ErrEventNotFound
}
if err != nil {
return ActiveEvent{}, fmt.Errorf("find active event: %w", err)
}
return ae, nil
}
err := t.tx.QueryRowContext(ctx, `
SELECT id, severity, state FROM jetmon_events
WHERE blog_id = ? AND check_type = ? AND ended_at IS NULL
ORDER BY started_at ASC
LIMIT 1`, blogID, checkType,
LIMIT 1`, identity.BlogID, identity.CheckType,
).Scan(&ae.ID, &ae.Severity, &ae.State)
if errors.Is(err, sql.ErrNoRows) {
return ActiveEvent{}, ErrEventNotFound
Expand Down
30 changes: 30 additions & 0 deletions internal/orchestrator/identity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package orchestrator

import (
"github.com/Automattic/jetmon/internal/checker"
"github.com/Automattic/jetmon/internal/db"
"github.com/Automattic/jetmon/internal/eventstore"
)

func monitorTargetID(site db.Site) int64 {
if site.ID > 0 {
return site.ID
}
return site.BlogID
}

func checkResultTargetID(res checker.Result) int64 {
if res.MonitorSiteID > 0 {
return res.MonitorSiteID
}
return res.BlogID
}

func httpEventIdentity(site db.Site) eventstore.Identity {
identity := eventstore.Identity{BlogID: site.BlogID, CheckType: checkTypeHTTP}
if site.ID > 0 {
endpointID := site.ID
identity.EndpointID = &endpointID
}
return identity
}
Loading