diff --git a/server/chart/arch_test.go b/server/chart/arch_test.go index 17d5d8f9803..9a2a640b073 100644 --- a/server/chart/arch_test.go +++ b/server/chart/arch_test.go @@ -61,9 +61,11 @@ func TestChartPackageDependencies(t *testing.T) { ignoreDeps: []string{m + "/server/chart/api"}, }, { - name: "internal/mysql depends on chart, types, and platform", - pkg: m + "/server/chart/internal/mysql", - ignoreDeps: slices.Concat(chartPkgs, platformPkgs), + name: "internal/mysql depends on chart, types, and platform", + pkg: m + "/server/chart/internal/mysql", + ignoreDeps: slices.Concat(chartPkgs, platformPkgs, []string{ + m + "/server/chart/internal/testutils", + }), }, { name: "internal/service depends on chart and platform packages", @@ -84,6 +86,7 @@ func TestChartPackageDependencies(t *testing.T) { ignoreDeps: slices.Concat([]string{ m + "/server/chart/internal/mysql", m + "/server/chart/internal/service", + m + "/server/chart/internal/testutils", }, chartPkgs, platformPkgs), }, } diff --git a/server/chart/internal/mysql/data.go b/server/chart/internal/mysql/data.go index 82f21852824..ede0f16b6d3 100644 --- a/server/chart/internal/mysql/data.go +++ b/server/chart/internal/mysql/data.go @@ -20,6 +20,11 @@ var scdOpenSentinel = time.Date(9999, 12, 31, 0, 0, 0, 0, time.UTC) // scdUpsertBatch caps how many entity rows are written per INSERT statement. const scdUpsertBatch = 200 +// scdCleanupBatch caps how many rows CleanupSCDData deletes per statement, so +// each batch's lock window is short and concurrent writers can interleave. +// var (not const) so tests can shrink it to exercise multi-batch behavior. +var scdCleanupBatch = 1000 + // scdRow is a single row of host_scd_data as fetched by GetSCDData. type scdRow struct { EntityID string `db:"entity_id"` @@ -367,17 +372,32 @@ func aggregateBucket(rows []scdRow, bucketStart, bucketEnd time.Time, strategy a // CleanupSCDData deletes closed SCD rows whose valid_to is older than the // retention cutoff. Open rows (valid_to = sentinel) are always preserved. +// Deletes in batches so each statement holds locks briefly and the concurrent +// collection cron can interleave writes. func (ds *Datastore) CleanupSCDData(ctx context.Context, days int) error { // Compute the cutoff in Go (UTC) so the retention boundary doesn't depend // on the MySQL session time zone — all valid_to writes are UTC. cutoff := time.Now().UTC().AddDate(0, 0, -days) - _, err := ds.writer(ctx).ExecContext(ctx, - `DELETE FROM host_scd_data - WHERE valid_to < ? - AND valid_to <> ?`, - cutoff, scdOpenSentinel) - if err != nil { - return ctxerr.Wrap(ctx, err, "cleanup SCD data") + for { + if err := ctx.Err(); err != nil { + return ctxerr.Wrap(ctx, err, "cleanup SCD data") + } + res, err := ds.writer(ctx).ExecContext(ctx, + `DELETE FROM host_scd_data + WHERE valid_to < ? + AND valid_to <> ? + ORDER BY valid_to + LIMIT ?`, + cutoff, scdOpenSentinel, scdCleanupBatch) + if err != nil { + return ctxerr.Wrap(ctx, err, "cleanup SCD data") + } + n, err := res.RowsAffected() + if err != nil { + return ctxerr.Wrap(ctx, err, "cleanup SCD data rows affected") + } + if n < int64(scdCleanupBatch) { + return nil + } } - return nil } diff --git a/server/chart/internal/mysql/data_test.go b/server/chart/internal/mysql/data_test.go index 6328b92202d..4d16809d745 100644 --- a/server/chart/internal/mysql/data_test.go +++ b/server/chart/internal/mysql/data_test.go @@ -1,12 +1,16 @@ package mysql import ( + "context" + "fmt" "testing" "time" "github.com/fleetdm/fleet/v4/server/chart" "github.com/fleetdm/fleet/v4/server/chart/api" + "github.com/fleetdm/fleet/v4/server/chart/internal/testutils" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAggregateBucketAccumulate(t *testing.T) { @@ -107,3 +111,81 @@ func TestAggregateBucketSnapshotRowClosedExactlyAtBucketEnd(t *testing.T) { got := aggregateBucket(rows, bucketStart, bucketEnd, api.SampleStrategySnapshot) assert.Equal(t, 2, chart.BlobPopcount(got), "row whose valid_to equals bucketEnd covers bucketEnd-ε") } + +func TestCleanupSCDData(t *testing.T) { + tdb := testutils.SetupTestDB(t, "chart_mysql") + ds := NewDatastore(tdb.Conns(), tdb.Logger) + + cases := []struct { + name string + fn func(t *testing.T, tdb *testutils.TestDB, ds *Datastore) + }{ + {"PreservesOpenAndRecent", testCleanupPreservesOpenAndRecent}, + {"MultipleBatches", testCleanupMultipleBatches}, + {"HonorsCtxCancellation", testCleanupHonorsCtxCancellation}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + defer tdb.TruncateTables(t) + c.fn(t, tdb, ds) + }) + } +} + +func testCleanupPreservesOpenAndRecent(t *testing.T, tdb *testutils.TestDB, ds *Datastore) { + ctx := t.Context() + now := time.Now().UTC() + + // Old closed row — should be deleted (valid_to is 40 days ago, retention 30). + tdb.InsertSCDRow(t, "cve", "old", now.AddDate(0, 0, -45), now.AddDate(0, 0, -40)) + // Recent closed row — within retention window, should be preserved. + tdb.InsertSCDRow(t, "cve", "recent", now.AddDate(0, 0, -10), now.AddDate(0, 0, -5)) + // Open row (sentinel valid_to) — must always be preserved. + tdb.InsertSCDRow(t, "cve", "open", now.AddDate(0, 0, -45), scdOpenSentinel) + + require.NoError(t, ds.CleanupSCDData(ctx, 30)) + + assert.Equal(t, 2, tdb.CountSCDRows(t), "only the old closed row should be deleted") + + var entities []string + require.NoError(t, tdb.DB.SelectContext(ctx, &entities, `SELECT entity_id FROM host_scd_data ORDER BY entity_id`)) + assert.Equal(t, []string{"open", "recent"}, entities) +} + +func testCleanupMultipleBatches(t *testing.T, tdb *testutils.TestDB, ds *Datastore) { + ctx := t.Context() + now := time.Now().UTC() + + // Shrink batch size so we can prove the loop iterates without inserting + // thousands of rows. + prev := scdCleanupBatch + scdCleanupBatch = 3 + t.Cleanup(func() { scdCleanupBatch = prev }) + + // Insert 10 expired closed rows — that's 4 iterations at batch size 3 + // (3 + 3 + 3 + 1, where the final partial batch terminates the loop). + for i := range 10 { + validFrom := now.AddDate(0, 0, -45).Add(time.Duration(i) * time.Minute) + validTo := now.AddDate(0, 0, -40).Add(time.Duration(i) * time.Minute) + tdb.InsertSCDRow(t, "cve", fmt.Sprintf("e%d", i), validFrom, validTo) + } + + require.NoError(t, ds.CleanupSCDData(ctx, 30)) + + assert.Equal(t, 0, tdb.CountSCDRows(t), "all expired rows should be drained across batches") +} + +func testCleanupHonorsCtxCancellation(t *testing.T, tdb *testutils.TestDB, ds *Datastore) { + now := time.Now().UTC() + + // Insert a single expired row so a non-canceled call would have something + // to delete — confirms that nothing was removed because of cancellation. + tdb.InsertSCDRow(t, "cve", "old", now.AddDate(0, 0, -45), now.AddDate(0, 0, -40)) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + err := ds.CleanupSCDData(ctx, 30) + require.ErrorIs(t, err, context.Canceled) + assert.Equal(t, 1, tdb.CountSCDRows(t), "no rows should be deleted when ctx was canceled before the first batch") +} diff --git a/server/chart/internal/testutils/testutils.go b/server/chart/internal/testutils/testutils.go new file mode 100644 index 00000000000..51378edd197 --- /dev/null +++ b/server/chart/internal/testutils/testutils.go @@ -0,0 +1,76 @@ +// Package testutils provides shared test utilities for the chart bounded context. +package testutils + +import ( + "log/slog" + "testing" + "time" + + common_mysql "github.com/fleetdm/fleet/v4/server/platform/mysql" + mysql_testing_utils "github.com/fleetdm/fleet/v4/server/platform/mysql/testing_utils" + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/require" +) + +// TestDB holds the database connection for tests. +type TestDB struct { + DB *sqlx.DB + Logger *slog.Logger +} + +// SetupTestDB creates a test database with the Fleet schema loaded. Tests are +// skipped automatically when MYSQL_TEST is not set. +func SetupTestDB(t *testing.T, testNamePrefix string) *TestDB { + t.Helper() + + testName, opts := mysql_testing_utils.ProcessOptions(t, &mysql_testing_utils.DatastoreTestOptions{ + UniqueTestName: testNamePrefix + "_" + t.Name(), + }) + + mysql_testing_utils.LoadDefaultSchema(t, testName, opts) + config := mysql_testing_utils.MysqlTestConfig(testName) + db, err := common_mysql.NewDB(config, &common_mysql.DBOptions{}, "") + require.NoError(t, err) + + t.Cleanup(func() { db.Close() }) + + return &TestDB{ + DB: db, + Logger: slog.New(slog.DiscardHandler), + } +} + +// Conns returns DBConnections for creating a datastore. +func (tdb *TestDB) Conns() *common_mysql.DBConnections { + return &common_mysql.DBConnections{Primary: tdb.DB, Replica: tdb.DB} +} + +// TruncateTables clears the tables used by the chart bounded context. +func (tdb *TestDB) TruncateTables(t *testing.T) { + t.Helper() + mysql_testing_utils.TruncateTables(t, tdb.DB, tdb.Logger, nil, "host_scd_data") +} + +// InsertSCDRow inserts a single host_scd_data row for tests. host_bitmap is +// stored as an empty blob since cleanup tests don't care about its contents. +func (tdb *TestDB) InsertSCDRow(t *testing.T, dataset, entityID string, validFrom, validTo time.Time) { + t.Helper() + ctx := t.Context() + + _, err := tdb.DB.ExecContext(ctx, ` + INSERT INTO host_scd_data (dataset, entity_id, host_bitmap, valid_from, valid_to) + VALUES (?, ?, ?, ?, ?) + `, dataset, entityID, []byte{}, validFrom, validTo) + require.NoError(t, err) +} + +// CountSCDRows returns the total number of rows in host_scd_data. +func (tdb *TestDB) CountSCDRows(t *testing.T) int { + t.Helper() + ctx := t.Context() + + var n int + err := tdb.DB.GetContext(ctx, &n, `SELECT COUNT(*) FROM host_scd_data`) + require.NoError(t, err) + return n +} diff --git a/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go b/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go index 7cae3b8b24a..ef010757d18 100644 --- a/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go +++ b/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go @@ -32,7 +32,8 @@ func Up_20260423161823(tx *sql.Tx) error { updated_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY uniq_entity_bucket (dataset, entity_id, valid_from), - KEY idx_dataset_range (dataset, valid_from, valid_to) + KEY idx_dataset_range (dataset, valid_from, valid_to), + KEY idx_valid_to_dataset (valid_to, dataset, entity_id) ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci `) if err != nil { diff --git a/server/datastore/mysql/schema.sql b/server/datastore/mysql/schema.sql index 1b38930b6dc..784d06d7b0f 100644 --- a/server/datastore/mysql/schema.sql +++ b/server/datastore/mysql/schema.sql @@ -1113,7 +1113,8 @@ CREATE TABLE `host_scd_data` ( `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uniq_entity_bucket` (`dataset`,`entity_id`,`valid_from`), - KEY `idx_dataset_range` (`dataset`,`valid_from`,`valid_to`) + KEY `idx_dataset_range` (`dataset`,`valid_from`,`valid_to`), + KEY `idx_valid_to_dataset` (`valid_to`,`dataset`,`entity_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; /*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET @saved_cs_client = @@character_set_client */;