Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions server/chart/arch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ func TestChartPackageDependencies(t *testing.T) {
ignoreDeps: []string{m + "/server/chart/api"},
},
{
name: "internal/mysql depends on chart, types, and platform",
pkg: m + "/server/chart/internal/mysql",
ignoreDeps: slices.Concat(chartPkgs, platformPkgs),
name: "internal/mysql depends on chart, types, and platform",
pkg: m + "/server/chart/internal/mysql",
ignoreDeps: slices.Concat(chartPkgs, platformPkgs, []string{
m + "/server/chart/internal/testutils",
}),
},
{
name: "internal/service depends on chart and platform packages",
Expand All @@ -84,6 +86,7 @@ func TestChartPackageDependencies(t *testing.T) {
ignoreDeps: slices.Concat([]string{
m + "/server/chart/internal/mysql",
m + "/server/chart/internal/service",
m + "/server/chart/internal/testutils",
}, chartPkgs, platformPkgs),
},
}
Expand Down
36 changes: 28 additions & 8 deletions server/chart/internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ var scdOpenSentinel = time.Date(9999, 12, 31, 0, 0, 0, 0, time.UTC)
// scdUpsertBatch caps how many entity rows are written per INSERT statement.
const scdUpsertBatch = 200

// scdCleanupBatch caps how many rows CleanupSCDData deletes per statement, so
// each batch's lock window is short and concurrent writers can interleave.
// var (not const) so tests can shrink it to exercise multi-batch behavior.
var scdCleanupBatch = 1000

// scdRow is a single row of host_scd_data as fetched by GetSCDData.
type scdRow struct {
EntityID string `db:"entity_id"`
Expand Down Expand Up @@ -367,17 +372,32 @@ func aggregateBucket(rows []scdRow, bucketStart, bucketEnd time.Time, strategy a

// CleanupSCDData deletes closed SCD rows whose valid_to is older than the
// retention cutoff. Open rows (valid_to = sentinel) are always preserved.
// Deletes in batches so each statement holds locks briefly and the concurrent
// collection cron can interleave writes.
func (ds *Datastore) CleanupSCDData(ctx context.Context, days int) error {
// Compute the cutoff in Go (UTC) so the retention boundary doesn't depend
// on the MySQL session time zone — all valid_to writes are UTC.
cutoff := time.Now().UTC().AddDate(0, 0, -days)
_, err := ds.writer(ctx).ExecContext(ctx,
`DELETE FROM host_scd_data
WHERE valid_to < ?
AND valid_to <> ?`,
cutoff, scdOpenSentinel)
if err != nil {
return ctxerr.Wrap(ctx, err, "cleanup SCD data")
for {
Comment thread
sgress454 marked this conversation as resolved.
if err := ctx.Err(); err != nil {
return ctxerr.Wrap(ctx, err, "cleanup SCD data")
}
res, err := ds.writer(ctx).ExecContext(ctx,
`DELETE FROM host_scd_data
WHERE valid_to < ?
AND valid_to <> ?
Comment thread
sgress454 marked this conversation as resolved.
ORDER BY valid_to
LIMIT ?`,
cutoff, scdOpenSentinel, scdCleanupBatch)
Comment thread
sgress454 marked this conversation as resolved.
if err != nil {
return ctxerr.Wrap(ctx, err, "cleanup SCD data")
}
n, err := res.RowsAffected()
if err != nil {
return ctxerr.Wrap(ctx, err, "cleanup SCD data rows affected")
}
if n < int64(scdCleanupBatch) {
return nil
}
}
return nil
}
82 changes: 82 additions & 0 deletions server/chart/internal/mysql/data_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package mysql

import (
"context"
"fmt"
"testing"
"time"

"github.com/fleetdm/fleet/v4/server/chart"
"github.com/fleetdm/fleet/v4/server/chart/api"
"github.com/fleetdm/fleet/v4/server/chart/internal/testutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAggregateBucketAccumulate(t *testing.T) {
Expand Down Expand Up @@ -107,3 +111,81 @@ func TestAggregateBucketSnapshotRowClosedExactlyAtBucketEnd(t *testing.T) {
got := aggregateBucket(rows, bucketStart, bucketEnd, api.SampleStrategySnapshot)
assert.Equal(t, 2, chart.BlobPopcount(got), "row whose valid_to equals bucketEnd covers bucketEnd-ε")
}

func TestCleanupSCDData(t *testing.T) {
tdb := testutils.SetupTestDB(t, "chart_mysql")
ds := NewDatastore(tdb.Conns(), tdb.Logger)

cases := []struct {
name string
fn func(t *testing.T, tdb *testutils.TestDB, ds *Datastore)
}{
{"PreservesOpenAndRecent", testCleanupPreservesOpenAndRecent},
{"MultipleBatches", testCleanupMultipleBatches},
{"HonorsCtxCancellation", testCleanupHonorsCtxCancellation},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
defer tdb.TruncateTables(t)
c.fn(t, tdb, ds)
})
}
}

func testCleanupPreservesOpenAndRecent(t *testing.T, tdb *testutils.TestDB, ds *Datastore) {
ctx := t.Context()
now := time.Now().UTC()

// Old closed row — should be deleted (valid_to is 40 days ago, retention 30).
tdb.InsertSCDRow(t, "cve", "old", now.AddDate(0, 0, -45), now.AddDate(0, 0, -40))
// Recent closed row — within retention window, should be preserved.
tdb.InsertSCDRow(t, "cve", "recent", now.AddDate(0, 0, -10), now.AddDate(0, 0, -5))
// Open row (sentinel valid_to) — must always be preserved.
tdb.InsertSCDRow(t, "cve", "open", now.AddDate(0, 0, -45), scdOpenSentinel)

require.NoError(t, ds.CleanupSCDData(ctx, 30))

assert.Equal(t, 2, tdb.CountSCDRows(t), "only the old closed row should be deleted")

var entities []string
require.NoError(t, tdb.DB.SelectContext(ctx, &entities, `SELECT entity_id FROM host_scd_data ORDER BY entity_id`))
assert.Equal(t, []string{"open", "recent"}, entities)
}

func testCleanupMultipleBatches(t *testing.T, tdb *testutils.TestDB, ds *Datastore) {
ctx := t.Context()
now := time.Now().UTC()

// Shrink batch size so we can prove the loop iterates without inserting
// thousands of rows.
prev := scdCleanupBatch
scdCleanupBatch = 3
t.Cleanup(func() { scdCleanupBatch = prev })

// Insert 10 expired closed rows — that's 4 iterations at batch size 3
// (3 + 3 + 3 + 1, where the final partial batch terminates the loop).
for i := range 10 {
validFrom := now.AddDate(0, 0, -45).Add(time.Duration(i) * time.Minute)
validTo := now.AddDate(0, 0, -40).Add(time.Duration(i) * time.Minute)
tdb.InsertSCDRow(t, "cve", fmt.Sprintf("e%d", i), validFrom, validTo)
}

require.NoError(t, ds.CleanupSCDData(ctx, 30))

assert.Equal(t, 0, tdb.CountSCDRows(t), "all expired rows should be drained across batches")
}

func testCleanupHonorsCtxCancellation(t *testing.T, tdb *testutils.TestDB, ds *Datastore) {
now := time.Now().UTC()

// Insert a single expired row so a non-canceled call would have something
// to delete — confirms that nothing was removed because of cancellation.
tdb.InsertSCDRow(t, "cve", "old", now.AddDate(0, 0, -45), now.AddDate(0, 0, -40))

ctx, cancel := context.WithCancel(t.Context())
cancel()

err := ds.CleanupSCDData(ctx, 30)
require.ErrorIs(t, err, context.Canceled)
assert.Equal(t, 1, tdb.CountSCDRows(t), "no rows should be deleted when ctx was canceled before the first batch")
}
76 changes: 76 additions & 0 deletions server/chart/internal/testutils/testutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Package testutils provides shared test utilities for the chart bounded context.
package testutils

import (
"log/slog"
"testing"
"time"

common_mysql "github.com/fleetdm/fleet/v4/server/platform/mysql"
mysql_testing_utils "github.com/fleetdm/fleet/v4/server/platform/mysql/testing_utils"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/require"
)

// TestDB holds the database connection for tests.
type TestDB struct {
DB *sqlx.DB
Logger *slog.Logger
}

// SetupTestDB creates a test database with the Fleet schema loaded. Tests are
// skipped automatically when MYSQL_TEST is not set.
func SetupTestDB(t *testing.T, testNamePrefix string) *TestDB {
t.Helper()

testName, opts := mysql_testing_utils.ProcessOptions(t, &mysql_testing_utils.DatastoreTestOptions{
UniqueTestName: testNamePrefix + "_" + t.Name(),
})

mysql_testing_utils.LoadDefaultSchema(t, testName, opts)
config := mysql_testing_utils.MysqlTestConfig(testName)
db, err := common_mysql.NewDB(config, &common_mysql.DBOptions{}, "")
require.NoError(t, err)

t.Cleanup(func() { db.Close() })

return &TestDB{
DB: db,
Logger: slog.New(slog.DiscardHandler),
}
}

// Conns returns DBConnections for creating a datastore.
func (tdb *TestDB) Conns() *common_mysql.DBConnections {
return &common_mysql.DBConnections{Primary: tdb.DB, Replica: tdb.DB}
}

// TruncateTables clears the tables used by the chart bounded context.
func (tdb *TestDB) TruncateTables(t *testing.T) {
t.Helper()
mysql_testing_utils.TruncateTables(t, tdb.DB, tdb.Logger, nil, "host_scd_data")
}

// InsertSCDRow inserts a single host_scd_data row for tests. host_bitmap is
// stored as an empty blob since cleanup tests don't care about its contents.
func (tdb *TestDB) InsertSCDRow(t *testing.T, dataset, entityID string, validFrom, validTo time.Time) {
t.Helper()
ctx := t.Context()

_, err := tdb.DB.ExecContext(ctx, `
INSERT INTO host_scd_data (dataset, entity_id, host_bitmap, valid_from, valid_to)
VALUES (?, ?, ?, ?, ?)
`, dataset, entityID, []byte{}, validFrom, validTo)
require.NoError(t, err)
}

// CountSCDRows returns the total number of rows in host_scd_data.
func (tdb *TestDB) CountSCDRows(t *testing.T) int {
t.Helper()
ctx := t.Context()

var n int
err := tdb.DB.GetContext(ctx, &n, `SELECT COUNT(*) FROM host_scd_data`)
require.NoError(t, err)
return n
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func Up_20260423161823(tx *sql.Tx) error {
updated_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY uniq_entity_bucket (dataset, entity_id, valid_from),
KEY idx_dataset_range (dataset, valid_from, valid_to)
KEY idx_dataset_range (dataset, valid_from, valid_to),
KEY idx_valid_to_dataset (valid_to, dataset, entity_id)
Comment thread
sgress454 marked this conversation as resolved.
Comment thread
sgress454 marked this conversation as resolved.
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
`)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion server/datastore/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,8 @@ CREATE TABLE `host_scd_data` (
`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_entity_bucket` (`dataset`,`entity_id`,`valid_from`),
KEY `idx_dataset_range` (`dataset`,`valid_from`,`valid_to`)
KEY `idx_dataset_range` (`dataset`,`valid_from`,`valid_to`),
KEY `idx_valid_to_dataset` (`valid_to`,`dataset`,`entity_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
/*!40101 SET @saved_cs_client = @@character_set_client */;
Expand Down
Loading