From dbf0e82731ee6bddc4e230cb42522216116a6750 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sun, 12 May 2024 17:57:19 +0100 Subject: [PATCH 1/6] use proper database table Signed-off-by: Chris Martin --- internal/armada/server.go | 12 +- internal/armada/submit/deduplication_test.go | 51 +++---- internal/armada/submit/deduplicaton.go | 85 ++++++++--- internal/common/pgkeyvalue/db_testutil.go | 64 -------- internal/common/pgkeyvalue/pgkeyvalue.go | 139 ------------------ internal/common/pgkeyvalue/pgkeyvalue_test.go | 87 ----------- .../migrations/008_job_deduplication.sql | 5 + 7 files changed, 91 insertions(+), 352 deletions(-) delete mode 100644 internal/common/pgkeyvalue/db_testutil.go delete mode 100644 internal/common/pgkeyvalue/pgkeyvalue.go delete mode 100644 internal/common/pgkeyvalue/pgkeyvalue_test.go create mode 100644 internal/lookoutv2/schema/migrations/008_job_deduplication.sql diff --git a/internal/armada/server.go b/internal/armada/server.go index 06d9e6c9207..0b0640ce3b8 100644 --- a/internal/armada/server.go +++ b/internal/armada/server.go @@ -28,7 +28,6 @@ import ( "github.com/armadaproject/armada/internal/common/database" grpcCommon "github.com/armadaproject/armada/internal/common/grpc" "github.com/armadaproject/armada/internal/common/health" - "github.com/armadaproject/armada/internal/common/pgkeyvalue" "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/internal/scheduler" schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" @@ -181,22 +180,13 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt } defer publisher.Close() - // KV store where we Automatically clean up keys after two weeks. - store, err := pgkeyvalue.New(ctx, dbPool, config.Pulsar.DedupTable) - if err != nil { - return err - } - services = append(services, func() error { - return store.PeriodicCleanup(ctx, time.Hour, 14*24*time.Hour) - }) - pulsarSubmitServer := submit.NewServer( publisher, queueRepository, queueCache, jobRepository, config.Submission, - submit.NewDeduplicator(store), + submit.NewDeduplicator(dbPool), submitChecker, authorizer, config.RequireQueueAndJobSet) diff --git a/internal/armada/submit/deduplication_test.go b/internal/armada/submit/deduplication_test.go index 5e275b4e0c9..c9a18066d29 100644 --- a/internal/armada/submit/deduplication_test.go +++ b/internal/armada/submit/deduplication_test.go @@ -1,15 +1,15 @@ package submit import ( + "github.com/armadaproject/armada/internal/common/database/lookout" + "github.com/jackc/pgx/v5/pgxpool" "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" - "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/pkg/api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type deduplicationIdsWithQueue struct { @@ -17,26 +17,6 @@ type deduplicationIdsWithQueue struct { kvs map[string]string } -type InMemoryKeyValueStore struct { - kvs map[string][]byte -} - -func (m *InMemoryKeyValueStore) Store(_ *armadacontext.Context, kvs map[string][]byte) error { - maps.Copy(m.kvs, kvs) - return nil -} - -func (m *InMemoryKeyValueStore) Load(_ *armadacontext.Context, keys []string) (map[string][]byte, error) { - result := make(map[string][]byte, len(keys)) - for _, k := range keys { - v, ok := m.kvs[k] - if ok { - result[k] = v - } - } - return result, nil -} - func TestDeduplicator(t *testing.T) { tests := map[string]struct { initialKeys []deduplicationIdsWithQueue @@ -110,19 +90,24 @@ func TestDeduplicator(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) - deduplicator := NewDeduplicator(&InMemoryKeyValueStore{kvs: map[string][]byte{}}) + err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + deduplicator := NewDeduplicator(db) + + // Store + for _, keys := range tc.initialKeys { + err := deduplicator.StoreOriginalJobIds(ctx, keys.queue, keys.kvs) + require.NoError(t, err) + } - // Store - for _, keys := range tc.initialKeys { - err := deduplicator.StoreOriginalJobIds(ctx, keys.queue, keys.kvs) + // Fetch + keys, err := deduplicator.GetOriginalJobIds(ctx, tc.queueToFetch, tc.jobsToFetch) require.NoError(t, err) - } - // Fetch - keys, err := deduplicator.GetOriginalJobIds(ctx, tc.queueToFetch, tc.jobsToFetch) - require.NoError(t, err) + assert.Equal(t, tc.expectedKeys, keys) - assert.Equal(t, tc.expectedKeys, keys) + return nil + }) + assert.NoError(t, err) cancel() }) } diff --git a/internal/armada/submit/deduplicaton.go b/internal/armada/submit/deduplicaton.go index d1cc6bcc805..e9f1b99cf98 100644 --- a/internal/armada/submit/deduplicaton.go +++ b/internal/armada/submit/deduplicaton.go @@ -1,13 +1,12 @@ package submit import ( - "crypto/sha1" "fmt" + "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/exp/maps" "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/common/pgkeyvalue" "github.com/armadaproject/armada/pkg/api" ) @@ -19,21 +18,19 @@ type Deduplicator interface { // PostgresDeduplicator is an implementation of a Deduplicator that uses a pgkeyvalue.KeyValueStore as its state store type PostgresDeduplicator struct { - kvStore pgkeyvalue.KeyValueStore + db *pgxpool.Pool } -func NewDeduplicator(kvStore pgkeyvalue.KeyValueStore) *PostgresDeduplicator { - return &PostgresDeduplicator{kvStore: kvStore} +func NewDeduplicator(db *pgxpool.Pool) *PostgresDeduplicator { + return &PostgresDeduplicator{db: db} } func (s *PostgresDeduplicator) GetOriginalJobIds(ctx *armadacontext.Context, queue string, jobRequests []*api.JobSubmitRequestItem) (map[string]string, error) { // Armada checks for duplicate job submissions if a ClientId (i.e. a deduplication id) is provided. - // Deduplication is based on storing the combined hash of the ClientId and queue. For storage efficiency, - // we store hashes instead of user-provided strings. - kvs := make(map[string][]byte, len(jobRequests)) + kvs := make(map[string]string, len(jobRequests)) for _, req := range jobRequests { if req.ClientId != "" { - kvs[s.jobKey(queue, req.ClientId)] = []byte(req.ClientId) + kvs[s.jobKey(queue, req.ClientId)] = req.ClientId } } @@ -41,14 +38,14 @@ func (s *PostgresDeduplicator) GetOriginalJobIds(ctx *armadacontext.Context, que // If we have any client Ids, retrieve their job ids if len(kvs) > 0 { keys := maps.Keys(kvs) - existingKvs, err := s.kvStore.Load(ctx, keys) + existingKvs, err := s.loadMappings(ctx, keys) if err != nil { return nil, err } for k, v := range kvs { originalJobId, ok := existingKvs[k] if ok { - duplicates[string(v)] = string(originalJobId) + duplicates[v] = originalJobId } } } @@ -56,18 +53,70 @@ func (s *PostgresDeduplicator) GetOriginalJobIds(ctx *armadacontext.Context, que } func (s *PostgresDeduplicator) StoreOriginalJobIds(ctx *armadacontext.Context, queue string, mappings map[string]string) error { - if s.kvStore == nil || len(mappings) == 0 { + if len(mappings) == 0 { return nil } - kvs := make(map[string][]byte, len(mappings)) + kvs := make(map[string]string, len(mappings)) for k, v := range mappings { - kvs[s.jobKey(queue, k)] = []byte(v) + kvs[s.jobKey(queue, k)] = v } - return s.kvStore.Store(ctx, kvs) + return s.storeMappings(ctx, kvs) } func (s *PostgresDeduplicator) jobKey(queue, clientId string) string { - combined := fmt.Sprintf("%s:%s", queue, clientId) - h := sha1.Sum([]byte(combined)) - return fmt.Sprintf("%x", h) + return fmt.Sprintf("%s:%s", queue, clientId) +} + +func (s *PostgresDeduplicator) storeMappings(ctx *armadacontext.Context, mappings map[string]string) error { + deduplicationIDs := make([]string, 0, len(mappings)) + jobIDs := make([]string, 0, len(mappings)) + + for deduplicationID, jobID := range mappings { + deduplicationIDs = append(deduplicationIDs, deduplicationID) + jobIDs = append(jobIDs, jobID) + } + + sql := ` + INSERT INTO job_deduplication (deduplication_id, job_id) + SELECT unnest($1::text[]), unnest($2::text[]) + ON CONFLICT (deduplication_id) DO NOTHING + ` + _, err := s.db.Exec(ctx, sql, deduplicationIDs, jobIDs) + if err != nil { + return err + } + + return nil +} + +func (s *PostgresDeduplicator) loadMappings(ctx *armadacontext.Context, keys []string) (map[string]string, error) { + // Prepare the output map + result := make(map[string]string) + + sql := ` + SELECT deduplication_id, job_id + FROM job_deduplication + WHERE deduplication_id = ANY($1) + ` + + rows, err := s.db.Query(ctx, sql, keys) + if err != nil { + return nil, err + } + defer rows.Close() + + // Iterate through the result rows + for rows.Next() { + var deduplicationID, jobID string + if err := rows.Scan(&deduplicationID, &jobID); err != nil { + return nil, err + } + result[deduplicationID] = jobID + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return result, nil } diff --git a/internal/common/pgkeyvalue/db_testutil.go b/internal/common/pgkeyvalue/db_testutil.go deleted file mode 100644 index 37775121adb..00000000000 --- a/internal/common/pgkeyvalue/db_testutil.go +++ /dev/null @@ -1,64 +0,0 @@ -package pgkeyvalue - -import ( - "database/sql" - "fmt" - - "github.com/jackc/pgx/v5/pgxpool" - _ "github.com/jackc/pgx/v5/stdlib" - "github.com/pkg/errors" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/common/util" -) - -func withDatabasePgx(action func(db *pgxpool.Pool) error) error { - ctx := armadacontext.Background() - - // Connect and create a dedicated database for the test - // For now use database/sql for this - dbName := "test_" + util.NewULID() - connectionString := "host=localhost port=5432 user=postgres password=psw sslmode=disable" - db, err := sql.Open("pgx", connectionString) - if err != nil { - return errors.WithStack(err) - } - defer db.Close() - - _, err = db.Exec("CREATE DATABASE " + dbName) - if err != nil { - return errors.WithStack(err) - } - - // Connect again- this time to the database we just created and using pgx pool. This will be used for tests - testDbPool, err := pgxpool.New(ctx, connectionString+" dbname="+dbName) - if err != nil { - return errors.WithStack(err) - } - - defer func() { - testDbPool.Close() - - // disconnect all db user before cleanup - _, err = db.Exec( - `SELECT pg_terminate_backend(pg_stat_activity.pid) - FROM pg_stat_activity WHERE pg_stat_activity.datname = '` + dbName + `';`) - if err != nil { - fmt.Println("Failed to disconnect users") - } - - _, err = db.Exec("DROP DATABASE " + dbName) - if err != nil { - fmt.Println("Failed to drop database") - } - }() - - // A third connection! We can get rid of this once we use move udateDatabse over to pgx - legacyDb, err := sql.Open("pgx", connectionString+" dbname="+dbName) - if err != nil { - return errors.WithStack(err) - } - defer legacyDb.Close() - - return action(testDbPool) -} diff --git a/internal/common/pgkeyvalue/pgkeyvalue.go b/internal/common/pgkeyvalue/pgkeyvalue.go deleted file mode 100644 index 91b820ecd7e..00000000000 --- a/internal/common/pgkeyvalue/pgkeyvalue.go +++ /dev/null @@ -1,139 +0,0 @@ -package pgkeyvalue - -import ( - "fmt" - "time" - - "github.com/jackc/pgx/v5/pgxpool" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/util/clock" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/common/armadaerrors" - "github.com/armadaproject/armada/internal/common/database" - "github.com/armadaproject/armada/internal/common/logging" -) - -type KeyValue struct { - Key string `db:"key"` - Value []byte `db:"value"` - Inserted time.Time `db:"inserted"` -} - -type KeyValueStore interface { - Store(ctx *armadacontext.Context, kvs map[string][]byte) error - Load(ctx *armadacontext.Context, keys []string) (map[string][]byte, error) -} - -// PGKeyValueStore is a time-limited key-value store backed by postgres with a local LRU cache. -// The store is write-only, i.e., writing to an existing key will return an error (of type *armadaerrors.ErrAlreadyExists). -// Keys can only be deleted by running the cleanup function. -// Deleting keys does not cause caches to update, i.e., nodes may have an inconsistent view if keys are deleted. -type PGKeyValueStore struct { - // Postgres connection. - db *pgxpool.Pool - // Name of the postgres table used for storage. - tableName string - // Used to set inserted time - clock clock.Clock -} - -func New(ctx *armadacontext.Context, db *pgxpool.Pool, tableName string) (*PGKeyValueStore, error) { - if db == nil { - return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ - Name: "db", - Value: db, - Message: "db must be non-nil", - }) - } - if tableName == "" { - return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ - Name: "TableName", - Value: tableName, - Message: "TableName must be non-empty", - }) - } - err := createTableIfNotExists(ctx, db, tableName) - if err != nil { - return nil, errors.WithStack(err) - } - return &PGKeyValueStore{ - db: db, - tableName: tableName, - clock: clock.RealClock{}, - }, nil -} - -func (c *PGKeyValueStore) Load(ctx *armadacontext.Context, keys []string) (map[string][]byte, error) { - rows, err := c.db.Query(ctx, fmt.Sprintf("SELECT KEY, VALUE FROM %s WHERE KEY = any($1)", c.tableName), keys) - if err != nil { - return nil, errors.WithStack(err) - } - kv := make(map[string][]byte, len(keys)) - for rows.Next() { - key := "" - var value []byte = nil - err := rows.Scan(&key, &value) - if err != nil { - return nil, errors.WithStack(err) - } - kv[key] = value - } - return kv, nil -} - -func (c *PGKeyValueStore) Store(ctx *armadacontext.Context, kvs map[string][]byte) error { - data := make([]KeyValue, 0, len(kvs)) - for k, v := range kvs { - data = append(data, KeyValue{ - Key: k, - Value: v, - Inserted: c.clock.Now(), - }) - } - return database.UpsertWithTransaction(ctx, c.db, c.tableName, data) -} - -func createTableIfNotExists(ctx *armadacontext.Context, db *pgxpool.Pool, tableName string) error { - _, err := db.Exec(ctx, fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - key TEXT PRIMARY KEY, - value BYTEA, - inserted TIMESTAMP not null - );`, tableName)) - return err -} - -// Cleanup removes all key-value pairs older than lifespan. -func (c *PGKeyValueStore) cleanup(ctx *armadacontext.Context, lifespan time.Duration) error { - sql := fmt.Sprintf("DELETE FROM %s WHERE (inserted <= $1);", c.tableName) - _, err := c.db.Exec(ctx, sql, c.clock.Now().Add(-lifespan)) - if err != nil { - return errors.WithStack(err) - } - return nil -} - -// PeriodicCleanup starts a goroutine that automatically runs the cleanup job -// every interval until the provided context is cancelled. -func (c *PGKeyValueStore) PeriodicCleanup(ctx *armadacontext.Context, interval time.Duration, lifespan time.Duration) error { - log := logrus.StandardLogger().WithField("service", "PGKeyValueStoreCleanup") - log.Info("service started") - ticker := c.clock.NewTicker(interval) - for { - select { - case <-ctx.Done(): - ticker.Stop() - return nil - case <-ticker.C(): - start := time.Now() - err := c.cleanup(ctx, lifespan) - if err != nil { - logging.WithStacktrace(log, err).WithField("delay", time.Since(start)).Warn("cleanup failed") - } else { - log.WithField("delay", c.clock.Since(start)).Info("cleanup succeeded") - } - } - } -} diff --git a/internal/common/pgkeyvalue/pgkeyvalue_test.go b/internal/common/pgkeyvalue/pgkeyvalue_test.go deleted file mode 100644 index 3a913f0b6f6..00000000000 --- a/internal/common/pgkeyvalue/pgkeyvalue_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package pgkeyvalue - -import ( - "testing" - "time" - - "github.com/jackc/pgx/v5/pgxpool" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" - "k8s.io/apimachinery/pkg/util/clock" - - "github.com/armadaproject/armada/internal/common/armadacontext" -) - -func TestLoadStore(t *testing.T) { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 10*time.Second) - defer cancel() - err := withDatabasePgx(func(db *pgxpool.Pool) error { - kvStore, err := New(ctx, db, "cachetable") - require.NoError(t, err) - - data1 := map[string][]byte{ - "a": {0x1}, "b": {0x2}, "c": {0x3}, - } - err = kvStore.Store(ctx, data1) - require.NoError(t, err) - - loaded, err := kvStore.Load(ctx, maps.Keys(data1)) - require.NoError(t, err) - assert.Equal(t, data1, loaded) - - data2 := map[string][]byte{"c": {0x4}, "d": {0x5}} - err = kvStore.Store(ctx, data2) - require.NoError(t, err) - - loaded, err = kvStore.Load(ctx, []string{"a", "b", "c", "d"}) - require.NoError(t, err) - assert.Equal(t, map[string][]byte{ - "a": {0x1}, "b": {0x2}, "c": {0x4}, "d": {0x5}, - }, loaded) - - return nil - }) - require.NoError(t, err) -} - -func TestCleanup(t *testing.T) { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 10*time.Second) - defer cancel() - err := withDatabasePgx(func(db *pgxpool.Pool) error { - baseTime := time.Now() - testClock := clock.NewFakeClock(baseTime) - kvStore, err := New(ctx, db, "cachetable") - kvStore.clock = testClock - require.NoError(t, err) - - // Data that will be cleaned up - data1 := map[string][]byte{"a": {0x1}, "b": {0x2}} - err = kvStore.Store(ctx, data1) - require.NoError(t, err) - - // advance the clock - testClock.SetTime(testClock.Now().Add(5 * time.Second)) - - // Data that won't be cleaned up - data2 := map[string][]byte{"c": {0x3}} - err = kvStore.Store(ctx, data2) - require.NoError(t, err) - - loaded, err := kvStore.Load(ctx, []string{"a", "b", "c"}) - require.NoError(t, err) - assert.Equal(t, map[string][]byte{ - "a": {0x1}, "b": {0x2}, "c": {0x3}, - }, loaded) - - // Run the cleanup. - err = kvStore.cleanup(ctx, 5*time.Second) - require.NoError(t, err) - - loaded, err = kvStore.Load(ctx, []string{"a", "b", "c"}) - require.NoError(t, err) - assert.Equal(t, map[string][]byte{"c": {0x3}}, loaded) - return nil - }) - require.NoError(t, err) -} diff --git a/internal/lookoutv2/schema/migrations/008_job_deduplication.sql b/internal/lookoutv2/schema/migrations/008_job_deduplication.sql new file mode 100644 index 00000000000..ced6e9688df --- /dev/null +++ b/internal/lookoutv2/schema/migrations/008_job_deduplication.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS job_deduplication +( + deduplication_id text NOT NULL PRIMARY KEY, + job_id text NOT NULL +) From 9f561739290e0bf2448ad66147eccccdd5d3ecb5 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sun, 12 May 2024 18:33:58 +0100 Subject: [PATCH 2/6] implement cleanup Signed-off-by: Chris Martin --- cmd/lookoutv2/main.go | 8 ++++- config/lookoutv2/config.yaml | 1 + internal/armada/submit/deduplication_test.go | 9 +++--- internal/armada/submit/deduplicaton.go | 2 +- internal/lookoutv2/configuration/types.go | 7 +++-- internal/lookoutv2/pruner/pruner.go | 30 +++++++++++++++++-- internal/lookoutv2/pruner/pruner_test.go | 2 +- .../migrations/008_job_deduplication.sql | 7 +++-- 8 files changed, 52 insertions(+), 14 deletions(-) diff --git a/cmd/lookoutv2/main.go b/cmd/lookoutv2/main.go index 107335bc8ae..c7a160b27c3 100644 --- a/cmd/lookoutv2/main.go +++ b/cmd/lookoutv2/main.go @@ -98,7 +98,13 @@ func prune(ctx *armadacontext.Context, config configuration.LookoutV2Config) { ctxTimeout, cancel := armadacontext.WithTimeout(ctx, config.PrunerConfig.Timeout) defer cancel() - err = pruner.PruneDb(ctxTimeout, db, config.PrunerConfig.ExpireAfter, config.PrunerConfig.BatchSize, clock.RealClock{}) + err = pruner.PruneDb( + ctxTimeout, + db, + config.PrunerConfig.ExpireAfter, + config.PrunerConfig.DeduplicationExpireAfter, + config.PrunerConfig.BatchSize, + clock.RealClock{}) if err != nil { panic(err) } diff --git a/config/lookoutv2/config.yaml b/config/lookoutv2/config.yaml index 8d433be587b..755caba7ad6 100644 --- a/config/lookoutv2/config.yaml +++ b/config/lookoutv2/config.yaml @@ -17,6 +17,7 @@ postgres: sslmode: disable prunerConfig: expireAfter: 1008h # 42 days, 6 weeks + deduplicationExpireAfter: 168 # 7 days timeout: 1h batchSize: 1000 uiConfig: diff --git a/internal/armada/submit/deduplication_test.go b/internal/armada/submit/deduplication_test.go index c9a18066d29..8a1bd21b8fe 100644 --- a/internal/armada/submit/deduplication_test.go +++ b/internal/armada/submit/deduplication_test.go @@ -1,15 +1,16 @@ package submit import ( - "github.com/armadaproject/armada/internal/common/database/lookout" - "github.com/jackc/pgx/v5/pgxpool" "testing" "time" - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/pkg/api" + "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/armadaproject/armada/internal/common/armadacontext" + "github.com/armadaproject/armada/internal/common/database/lookout" + "github.com/armadaproject/armada/pkg/api" ) type deduplicationIdsWithQueue struct { diff --git a/internal/armada/submit/deduplicaton.go b/internal/armada/submit/deduplicaton.go index e9f1b99cf98..37833611c5f 100644 --- a/internal/armada/submit/deduplicaton.go +++ b/internal/armada/submit/deduplicaton.go @@ -2,8 +2,8 @@ package submit import ( "fmt" - "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/exp/maps" "github.com/armadaproject/armada/internal/common/armadacontext" diff --git a/internal/lookoutv2/configuration/types.go b/internal/lookoutv2/configuration/types.go index f40a5e2d2e5..0fa8a4de544 100644 --- a/internal/lookoutv2/configuration/types.go +++ b/internal/lookoutv2/configuration/types.go @@ -28,9 +28,10 @@ type TlsConfig struct { } type PrunerConfig struct { - ExpireAfter time.Duration - Timeout time.Duration - BatchSize int + ExpireAfter time.Duration + DeduplicationExpireAfter time.Duration + Timeout time.Duration + BatchSize int } type CommandSpec struct { diff --git a/internal/lookoutv2/pruner/pruner.go b/internal/lookoutv2/pruner/pruner.go index 946917fe30a..b4b35468754 100644 --- a/internal/lookoutv2/pruner/pruner.go +++ b/internal/lookoutv2/pruner/pruner.go @@ -3,6 +3,7 @@ package pruner import ( "time" + "github.com/hashicorp/go-multierror" "github.com/jackc/pgx/v5" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -11,9 +12,34 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" ) -func PruneDb(ctx *armadacontext.Context, db *pgx.Conn, keepAfterCompletion time.Duration, batchLimit int, clock clock.Clock) error { +func PruneDb( + ctx *armadacontext.Context, + db *pgx.Conn, + jobLifetime time.Duration, + deduplicationLifetime time.Duration, + batchLimit int, + clock clock.Clock, +) error { + var result *multierror.Error + if err := deleteJobs(ctx, db, jobLifetime, batchLimit, clock); err != nil { + result = multierror.Append(result, err) + } + if err := deleteDeduplications(ctx, db, deduplicationLifetime, clock); err != nil { + result = multierror.Append(result, err) + } + + return result.ErrorOrNil() +} + +func deleteDeduplications(ctx *armadacontext.Context, db *pgx.Conn, deduplicationLifetime time.Duration, clock clock.Clock) error { + cutOffTime := clock.Now().Add(-deduplicationLifetime) + _, err := db.Exec(ctx, "DELETE FROM job_deduplication WHERE inserted <= $1", cutOffTime) + return errors.Wrap(err, "error deleting deduplications from postgres") +} + +func deleteJobs(ctx *armadacontext.Context, db *pgx.Conn, jobLifetime time.Duration, batchLimit int, clock clock.Clock) error { now := clock.Now() - cutOffTime := now.Add(-keepAfterCompletion) + cutOffTime := now.Add(-jobLifetime) totalJobsToDelete, err := createJobIdsToDeleteTempTable(ctx, db, cutOffTime) if err != nil { return errors.WithStack(err) diff --git a/internal/lookoutv2/pruner/pruner_test.go b/internal/lookoutv2/pruner/pruner_test.go index a277665b344..0ed2866b17e 100644 --- a/internal/lookoutv2/pruner/pruner_test.go +++ b/internal/lookoutv2/pruner/pruner_test.go @@ -134,7 +134,7 @@ func TestPruneDb(t *testing.T) { dbConn, err := db.Acquire(ctx) assert.NoError(t, err) - err = PruneDb(ctx, dbConn.Conn(), tc.expireAfter, 10, clock.NewFakeClock(baseTime)) + err = PruneDb(ctx, dbConn.Conn(), tc.expireAfter, 0, 10, clock.NewFakeClock(baseTime)) assert.NoError(t, err) queriedJobIdsPerTable := []map[string]bool{ diff --git a/internal/lookoutv2/schema/migrations/008_job_deduplication.sql b/internal/lookoutv2/schema/migrations/008_job_deduplication.sql index ced6e9688df..eb99e84e7e2 100644 --- a/internal/lookoutv2/schema/migrations/008_job_deduplication.sql +++ b/internal/lookoutv2/schema/migrations/008_job_deduplication.sql @@ -1,5 +1,8 @@ CREATE TABLE IF NOT EXISTS job_deduplication ( deduplication_id text NOT NULL PRIMARY KEY, - job_id text NOT NULL -) + job_id text NOT NULL, + inserted TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_job_deduplication_inserted ON job_deduplication (inserted); From d5c3286b3e5bf453e1882687f86485b0b03cdb36 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sun, 12 May 2024 21:55:23 +0100 Subject: [PATCH 3/6] disallow long client ids Signed-off-by: Chris Martin --- .../armada/submit/validation/validation.go | 10 ++++++ .../submit/validation/validation_test.go | 35 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/internal/armada/submit/validation/validation.go b/internal/armada/submit/validation/validation.go index 4f838c88930..8dca2ce6108 100644 --- a/internal/armada/submit/validation/validation.go +++ b/internal/armada/submit/validation/validation.go @@ -34,6 +34,7 @@ var ( validateTerminationGracePeriod, validateIngresses, validatePorts, + validateClientId, } ) @@ -179,6 +180,15 @@ func validateAffinity(j *api.JobSubmitRequestItem, _ configuration.SubmissionCon return nil } +// Ensures that if a request specifies a ClientId, that clientID is not too long +func validateClientId(j *api.JobSubmitRequestItem, _ configuration.SubmissionConfig) error { + const maxClientIdChars = 100 + if len(j.GetClientId()) > maxClientIdChars { + return fmt.Errorf("client id of length %d is greater than max allowed lenght of %d", len(j.ClientId), maxClientIdChars) + } + return nil +} + // Ensures that if a request specifies a PriorityClass, that priority class is supported by Armada. func validatePriorityClasses(j *api.JobSubmitRequestItem, config configuration.SubmissionConfig) error { spec := j.GetMainPodSpec() diff --git a/internal/armada/submit/validation/validation_test.go b/internal/armada/submit/validation/validation_test.go index 4a6dff1698e..7d16e80d1b6 100644 --- a/internal/armada/submit/validation/validation_test.go +++ b/internal/armada/submit/validation/validation_test.go @@ -2,6 +2,7 @@ package validation import ( "strconv" + "strings" "testing" "time" @@ -727,6 +728,40 @@ func TestValidatePriorityClasses(t *testing.T) { } } +func TestValidateClientId(t *testing.T) { + tests := map[string]struct { + req *api.JobSubmitRequestItem + expectSuccess bool + }{ + "no client id": { + req: &api.JobSubmitRequestItem{}, + expectSuccess: true, + }, + "client id of 100 chars is fine": { + req: &api.JobSubmitRequestItem{ + ClientId: strings.Repeat("a", 100), + }, + expectSuccess: true, + }, + "client id over 100 chars is forbidden": { + req: &api.JobSubmitRequestItem{ + ClientId: strings.Repeat("a", 101), + }, + expectSuccess: false, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + err := validateClientId(tc.req, configuration.SubmissionConfig{}) + if tc.expectSuccess { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + }) + } +} + func TestValidateQueue(t *testing.T) { tests := map[string]struct { req *api.JobSubmitRequest From b3b391a9dd1986e090961469818576c88443f8c6 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sun, 12 May 2024 22:01:15 +0100 Subject: [PATCH 4/6] improved logging Signed-off-by: Chris Martin --- internal/lookoutv2/pruner/pruner.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/lookoutv2/pruner/pruner.go b/internal/lookoutv2/pruner/pruner.go index b4b35468754..e389e2f41b0 100644 --- a/internal/lookoutv2/pruner/pruner.go +++ b/internal/lookoutv2/pruner/pruner.go @@ -21,9 +21,11 @@ func PruneDb( clock clock.Clock, ) error { var result *multierror.Error + if err := deleteJobs(ctx, db, jobLifetime, batchLimit, clock); err != nil { result = multierror.Append(result, err) } + if err := deleteDeduplications(ctx, db, deduplicationLifetime, clock); err != nil { result = multierror.Append(result, err) } @@ -33,8 +35,13 @@ func PruneDb( func deleteDeduplications(ctx *armadacontext.Context, db *pgx.Conn, deduplicationLifetime time.Duration, clock clock.Clock) error { cutOffTime := clock.Now().Add(-deduplicationLifetime) - _, err := db.Exec(ctx, "DELETE FROM job_deduplication WHERE inserted <= $1", cutOffTime) - return errors.Wrap(err, "error deleting deduplications from postgres") + log.Infof("Deleting all rows from job_deduplication older than %s", cutOffTime) + cmdTag, err := db.Exec(ctx, "DELETE FROM job_deduplication WHERE inserted <= $1", cutOffTime) + if err != nil { + return errors.Wrap(err, "error deleting deduplications from postgres") + } + log.Infof("Deleted %d rows", cmdTag.RowsAffected()) + return nil } func deleteJobs(ctx *armadacontext.Context, db *pgx.Conn, jobLifetime time.Duration, batchLimit int, clock clock.Clock) error { From 7a984f5cc7ebe6e185e93879ba4969d37f870e6e Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sun, 12 May 2024 22:14:05 +0100 Subject: [PATCH 5/6] lint Signed-off-by: Chris Martin --- internal/armada/submit/validation/validation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/armada/submit/validation/validation.go b/internal/armada/submit/validation/validation.go index 8dca2ce6108..748f84626bd 100644 --- a/internal/armada/submit/validation/validation.go +++ b/internal/armada/submit/validation/validation.go @@ -40,7 +40,7 @@ var ( // ValidateSubmitRequest ensures that the incoming api.JobSubmitRequest is well-formed. It achieves this // by applying a series of validators that each check a single aspect of the request. Validators may -// chose to validate the whole obSubmitRequest or just a single JobSubmitRequestItem. +// choose to validate the whole obSubmitRequest or just a single JobSubmitRequestItem. // This function will return the error from the first validator that fails, or nil if all validators pass. func ValidateSubmitRequest(req *api.JobSubmitRequest, config configuration.SubmissionConfig) error { for _, validationFunc := range requestValidators { @@ -184,7 +184,7 @@ func validateAffinity(j *api.JobSubmitRequestItem, _ configuration.SubmissionCon func validateClientId(j *api.JobSubmitRequestItem, _ configuration.SubmissionConfig) error { const maxClientIdChars = 100 if len(j.GetClientId()) > maxClientIdChars { - return fmt.Errorf("client id of length %d is greater than max allowed lenght of %d", len(j.ClientId), maxClientIdChars) + return fmt.Errorf("client id of length %d is greater than max allowed length of %d", len(j.ClientId), maxClientIdChars) } return nil } From c209cb9c0dfe560775c099df6d6a9eeb38fa023d Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Wed, 15 May 2024 13:34:39 +0100 Subject: [PATCH 6/6] fix merge conflict Signed-off-by: Chris Martin --- .../{008_job_deduplication.sql => 009_job_deduplication.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename internal/lookoutv2/schema/migrations/{008_job_deduplication.sql => 009_job_deduplication.sql} (100%) diff --git a/internal/lookoutv2/schema/migrations/008_job_deduplication.sql b/internal/lookoutv2/schema/migrations/009_job_deduplication.sql similarity index 100% rename from internal/lookoutv2/schema/migrations/008_job_deduplication.sql rename to internal/lookoutv2/schema/migrations/009_job_deduplication.sql