-
Notifications
You must be signed in to change notification settings - Fork 843
Use swap table pattern and batch delete to improve DB access patterns for vuln cron #41729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
422f555
81e3bc4
82b910d
56f48ab
2c2b254
1293b7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| * Reduced database contention during the vulnerability cron. | ||
| * Added a secondary index on `host_software(software_id)` to improve query performance. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| package tables | ||
|
|
||
| import ( | ||
| "database/sql" | ||
| "fmt" | ||
| ) | ||
|
|
||
| func init() { | ||
| MigrationClient.AddMigration(Up_20260316120000, Down_20260316120000) | ||
| } | ||
|
|
||
| func Up_20260316120000(tx *sql.Tx) error { | ||
| // The FK is unnecessary because kernel_host_counts is fully rebuilt on each vulnerability cron run via a swap table, | ||
| // and CREATE TABLE ... LIKE does not copy foreign keys. Keeping the FK would require restoring it after every swap, | ||
| // which can fail if a referenced software_title is deleted between the SELECT and the ALTER TABLE. | ||
| // Orphaned rows are harmless because API queries (ListKernelsByOS) JOIN back to software_titles, excluding any | ||
| // rows that reference deleted titles. | ||
| if _, err := tx.Exec(`ALTER TABLE kernel_host_counts DROP FOREIGN KEY kernel_host_counts_ibfk_1`); err != nil { | ||
|
ksykulev marked this conversation as resolved.
|
||
| return fmt.Errorf("dropping kernel_host_counts foreign key: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func Down_20260316120000(_ *sql.Tx) error { | ||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -469,50 +469,106 @@ GROUP BY id, cve, version | |
| // It should be called as part of vulnerabilities job, which should only run on 1 server at a time. | ||
| // If concurrent calls are expected, add proper locking. | ||
| func (ds *Datastore) InsertKernelSoftwareMapping(ctx context.Context) error { | ||
| _, err := ds.writer(ctx).ExecContext(ctx, `UPDATE kernel_host_counts SET hosts_count = 0`) | ||
| if err != nil { | ||
| return ctxerr.Wrap(ctx, err, "zero out existing kernel hosts counts") | ||
| } | ||
| const ( | ||
| swapTable = "kernel_host_counts_swap" | ||
| swapTableCreate = "CREATE TABLE IF NOT EXISTS " + swapTable + " LIKE kernel_host_counts" | ||
|
getvictor marked this conversation as resolved.
|
||
|
|
||
| statsStmt := ` | ||
| INSERT INTO kernel_host_counts (software_title_id, software_id, os_version_id, hosts_count, team_id) | ||
| SELECT | ||
| software_titles.id AS software_title_id, | ||
| software.id AS software_id, | ||
| operating_systems.os_version_id AS os_version_id, | ||
| COUNT(host_operating_system.host_id) AS hosts_count, | ||
| COALESCE(hosts.team_id, 0) AS team_id | ||
| FROM | ||
| software_titles | ||
| JOIN software ON software.title_id = software_titles.id | ||
| JOIN host_software ON host_software.software_id = software.id | ||
| JOIN host_operating_system ON host_operating_system.host_id = host_software.host_id | ||
| JOIN operating_systems ON operating_systems.id = host_operating_system.os_id | ||
| JOIN hosts ON hosts.id = host_software.host_id | ||
| WHERE | ||
| software_titles.is_kernel = TRUE | ||
| GROUP BY | ||
| software_title_id, | ||
| software_id, | ||
| os_version_id, | ||
| team_id | ||
| ON DUPLICATE KEY UPDATE | ||
| hosts_count=VALUES(hosts_count) | ||
| ` | ||
| selectStmt = ` | ||
| SELECT | ||
| software_titles.id AS software_title_id, | ||
| software.id AS software_id, | ||
| operating_systems.os_version_id AS os_version_id, | ||
| COUNT(host_operating_system.host_id) AS hosts_count, | ||
| COALESCE(hosts.team_id, 0) AS team_id | ||
| FROM | ||
| software_titles | ||
| JOIN software ON software.title_id = software_titles.id | ||
| JOIN host_software ON host_software.software_id = software.id | ||
| JOIN host_operating_system ON host_operating_system.host_id = host_software.host_id | ||
| JOIN operating_systems ON operating_systems.id = host_operating_system.os_id | ||
| JOIN hosts ON hosts.id = host_software.host_id | ||
| WHERE | ||
| software_titles.is_kernel = TRUE | ||
| GROUP BY | ||
| software_titles.id, | ||
| software.id, | ||
| operating_systems.os_version_id, | ||
| hosts.team_id` | ||
|
|
||
| _, err = ds.writer(ctx).ExecContext(ctx, statsStmt) | ||
| if err != nil { | ||
| return ctxerr.Wrap(ctx, err, "insert kernel software mapping") | ||
| insertStmt = `INSERT INTO ` + swapTable + ` (software_title_id, software_id, os_version_id, hosts_count, team_id) VALUES %s` | ||
|
|
||
| valuesPart = `(?, ?, ?, ?, ?),` | ||
| ) | ||
|
|
||
| // Create a fresh swap table. Drop any leftover from a previous failed run. | ||
| if _, err := ds.writer(ctx).ExecContext(ctx, "DROP TABLE IF EXISTS "+swapTable); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "drop existing kernel swap table") | ||
| } | ||
| if _, err := ds.writer(ctx).ExecContext(ctx, swapTableCreate); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "create kernel swap table") | ||
| } | ||
|
|
||
| _, err = ds.writer(ctx).ExecContext(ctx, `DELETE k FROM kernel_host_counts k LEFT JOIN software ON k.software_id = software.id WHERE software.id IS NULL`) | ||
| // Read kernel host counts from the reader to avoid contention with per-host writes on the writer. | ||
| rows, err := ds.reader(ctx).QueryContext(ctx, selectStmt) | ||
| if err != nil { | ||
| return ctxerr.Wrap(ctx, err, "clean up orphan kernels by software id") | ||
| return ctxerr.Wrap(ctx, err, "read kernel host counts") | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| // Batch insert into the swap table on the writer. | ||
| const batchSize = 100 | ||
| var batchCount int | ||
| args := make([]any, 0, batchSize*5) | ||
| for rows.Next() { | ||
| var ( | ||
| softwareTitleID uint | ||
| softwareID uint | ||
| osVersionID uint | ||
| hostsCount uint | ||
| teamID uint | ||
| ) | ||
| if err := rows.Scan(&softwareTitleID, &softwareID, &osVersionID, &hostsCount, &teamID); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "scan kernel host count row") | ||
| } | ||
| args = append(args, softwareTitleID, softwareID, osVersionID, hostsCount, teamID) | ||
| batchCount++ | ||
|
|
||
| _, err = ds.writer(ctx).ExecContext(ctx, `DELETE k FROM kernel_host_counts k LEFT JOIN operating_systems ON k.os_version_id = operating_systems.os_version_id WHERE operating_systems.id IS NULL`) | ||
| if err != nil { | ||
| return ctxerr.Wrap(ctx, err, "clean up orphan kernels by os version id") | ||
| if batchCount == batchSize { | ||
| values := strings.TrimSuffix(strings.Repeat(valuesPart, batchCount), ",") | ||
| if _, err := ds.writer(ctx).ExecContext(ctx, fmt.Sprintf(insertStmt, values), args...); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "insert kernel host counts batch into swap table") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this fails mid batch what happens? We aren't in a transaction. Do we care?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This query is inserting into a new
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parent context is cancelled. But I don't think we actually care if the swap table is left partial. |
||
| } | ||
| args = args[:0] | ||
| batchCount = 0 | ||
| } | ||
| } | ||
| if batchCount > 0 { | ||
| values := strings.TrimSuffix(strings.Repeat(valuesPart, batchCount), ",") | ||
| if _, err := ds.writer(ctx).ExecContext(ctx, fmt.Sprintf(insertStmt, values), args...); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "insert last kernel host counts batch into swap table") | ||
| } | ||
| } | ||
| if err := rows.Err(); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "iterate kernel host count rows") | ||
| } | ||
|
|
||
| // Atomic table swap. | ||
| if err := ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error { | ||
| if _, err := tx.ExecContext(ctx, "DROP TABLE IF EXISTS kernel_host_counts_old"); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "drop leftover old kernel table") | ||
| } | ||
| if _, err := tx.ExecContext(ctx, ` | ||
| RENAME TABLE | ||
| kernel_host_counts TO kernel_host_counts_old, | ||
| `+swapTable+` TO kernel_host_counts`); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "atomic kernel table swap") | ||
| } | ||
| if _, err := tx.ExecContext(ctx, "DROP TABLE IF EXISTS kernel_host_counts_old"); err != nil { | ||
| return ctxerr.Wrap(ctx, err, "drop old kernel table after swap") | ||
| } | ||
| return nil | ||
| }); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Refresh the pre-aggregated OS version vulnerabilities table | ||
|
|
||
Large diffs are not rendered by default.
Uh oh!
There was an error while loading. Please reload this page.