From 0a8991f9e7ba67c6ea8f1924a9f7d1100a6091ed Mon Sep 17 00:00:00 2001 From: Akash Kumar Saw <57334555+akash-kumar-saw@users.noreply.github.com> Date: Fri, 7 Apr 2023 00:49:48 +0530 Subject: [PATCH 1/4] Updated "cmd/armadactl/README.md" (#2344) * added cmd/armadactl/README.md * Added README.md * Updated cmd/armadactl/README.md * Updated cmd/armadactl/README.md * Updated cmd/armadactl/README.md * Updated cmd/armadactl/README.md * Updated cmd/armadactl/README.md * Updated cmd/armadactl/README.md * Updated "cmd/armadactl/README.md" --------- Co-authored-by: Kevin Hannon --- cmd/armadactl/README.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cmd/armadactl/README.md b/cmd/armadactl/README.md index 2d919ca36f8..42174649619 100644 --- a/cmd/armadactl/README.md +++ b/cmd/armadactl/README.md @@ -57,13 +57,18 @@ armadactl version [flags] ```bash armadactl watch [deployment_name] [flags] ``` -- getQueueSchedulingReport : This subcommand retrieves a report of the current scheduling status of all queues in the Armada cluster. +- queue-report : This subcommand retrieves a report of the current scheduling status of all queues in the Armada cluster. ```bash -armadactl getQueueSchedulingReport +armadactl queue-report ``` -- getJobSchedulingReport : This subcommand retrieves a report of the current scheduling status of all jobs in the Armada cluster. +- job-report : This subcommand retrieves a report of the current scheduling status of all jobs in the Armada cluster. ```bash -armadactl getJobSchedulingReport +armadactl job-report +``` + +- scheduling-report : This subcommand retrieves a report of the current scheduling status in the Armada cluster. +```bash +armadactl scheduling-report ``` For a full list of subcommands and options, you can run **armadactl --help**. From b2b30558b08c728c10127d1b446e588d2adcf6da Mon Sep 17 00:00:00 2001 From: Kevin Hannon Date: Thu, 6 Apr 2023 16:35:52 -0400 Subject: [PATCH 2/4] update python dependencies (#2341) --- client/python/pyproject.toml | 4 ++-- third_party/airflow/pyproject.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index 49510cc05b1..0019f1dcd9a 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -9,9 +9,9 @@ license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] [project.optional-dependencies] -format = ["black==23.1.0", "flake8==6.0.0", "pylint==2.16.3"] +format = ["black==23.3.0", "flake8==6.0.0", "pylint==2.17.2"] docs = ["sphinx", "sphinx-jekyll-builder", "sphinx-toolbox==3.2.0b1"] -test = ["pytest==7.2.1", "coverage>=6.5.0"] +test = ["pytest==7.2.2", "coverage>=6.5.0"] [build-system] requires = ["setuptools"] diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index 2a7fbb45087..254388b0715 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -19,8 +19,8 @@ license = { text = "Apache Software License" } readme = "README.md" [project.optional-dependencies] -format = ["black==23.1.0", "flake8==6.0.0", "pylint==2.16.1"] -test = ["pytest==7.2.1", "coverage>=6.5.0"] +format = ["black==23.3.0", "flake8==6.0.0", "pylint==2.17.2"] +test = ["pytest==7.2.2", "coverage>=6.5.0"] docs = ["sphinx", "sphinx-jekyll-builder"] [build-system] From ece9f3c9a155095f2477cc526783dca3cfa05576 Mon Sep 17 00:00:00 2001 From: Rich Scott Date: Thu, 6 Apr 2023 15:09:42 -0600 Subject: [PATCH 3/4] Job Service: add Postgres as alternate backend db (#2298) * Add support for using postgres for JobService Add support for using Postgres (instead of SQLite) for JobService. See config/jobservice/config.yaml for configuration details. * Use mutex lock on db write operations only when using SQLite. * Use test args to determine which Job Service db type to use The Job Service db repository code can be verified for 'postgres' or 'sqlite' by passing the `jsDatabase` arg to the test run: go test -v -v -count=1 ./internal/jobservice/repository/... \ -args -jsDatabase=postgres * In 'tests' Makefile target, run separate tests for Job Svc Run separate tests for the Job Service repository, once using Postgres, and once using SQLite, so we test both db types in each build run. Rename each test func in the internal/jobservice/ repository package to have prefix of 'TestJobSvc', so that we can use the '-skip' option to 'go test' to ignore them, and then we run the tests on those separately. * Use different heuristic to exclude JS repo logic from coverage test. Using the `-skip ` doesn't work with Go 1.19, due to how flags.Parse() is called, so just use the `go list ./.. | grep -v pat` technique. Revert the prefix-renaming of the tests in internal/ jobservice/repository/. * Add support for using postgres for JobService Add support for using Postgres (instead of SQLite) for JobService. See config/jobservice/config.yaml for configuration details. * Use mutex lock on db write operations only when using SQLite. * Use test args to determine which Job Service db type to use The Job Service db repository code can be verified for 'postgres' or 'sqlite' by passing the `jsDatabase` arg to the test run: go test -v -v -count=1 ./internal/jobservice/repository/... \ -args -jsDatabase=postgres * In 'tests' Makefile target, run separate tests for Job Svc Run separate tests for the Job Service repository, once using Postgres, and once using SQLite, so we test both db types in each build run. Rename each test func in the internal/jobservice/ repository package to have prefix of 'TestJobSvc', so that we can use the '-skip' option to 'go test' to ignore them, and then we run the tests on those separately. * Use different heuristic to exclude JS repo logic from coverage test. Using the `-skip ` doesn't work with Go 1.19, due to how flags.Parse() is called, so just use the `go list ./.. | grep -v pat` technique. Revert the prefix-renaming of the tests in internal/ jobservice/repository/. * Start a Postgres container for tests-e2e-airflow Make target. * Revert "Start a Postgres container for tests-e2e-airflow Make target." This reverts commit b9b406c960de2fd12a5075d683591a93725ad827. * Start Postgres container when running tests-e2e-airflow target. * Remove separate postgres container from tests-e2-airflow target * changes to make e2e tests work --------- Co-authored-by: kannon92 --- config/jobservice/config.yaml | 16 ++ e2e/setup/jobservice.yaml | 13 +- internal/jobservice/application.go | 47 +++- internal/jobservice/configuration/types.go | 18 +- .../jobservice/repository/sql_job_service.go | 245 +++++++++++++----- .../repository/sql_job_service_test.go | 51 +++- makefile | 9 +- 7 files changed, 318 insertions(+), 81 deletions(-) diff --git a/config/jobservice/config.yaml b/config/jobservice/config.yaml index e4bbec88016..e9e39351ded 100644 --- a/config/jobservice/config.yaml +++ b/config/jobservice/config.yaml @@ -2,7 +2,23 @@ grpcPort: 60003 httpPort: 8090 purgeJobSetTime: 1000 subscribeJobSetTime: 100 +# databaseType can be either 'postgres' or 'sqlite' +databaseType: "postgres" +# databasePath specifies the location of the back-end +# storage file when using database type 'sqlite' databasePath: "/var/jobservice.db" +# Connection details when using database type 'postgres' +postgresConfig: + maxOpenConns: 50 + maxIdleConns: 10 + connMaxLifetime: 30m + connection: + host: postgres + port: 5432 + user: postgres + password: psw + dbname: postgres + sslmode: disable grpc: keepaliveParams: maxConnectionIdle: 5m diff --git a/e2e/setup/jobservice.yaml b/e2e/setup/jobservice.yaml index 42040864369..4c05dcd660f 100644 --- a/e2e/setup/jobservice.yaml +++ b/e2e/setup/jobservice.yaml @@ -1,6 +1,17 @@ -databasePath: "/tmp/jobservice.db" subscribeJobSetTime: 60 purgeJobSetTime: 10000 +databaseType: "postgres" +postgresConfig: + maxOpenConns: 50 + maxIdleConns: 10 + connMaxLifetime: 30m + connection: + host: postgres + port: 5432 + user: postgres + password: psw + dbname: postgres + sslmode: disable apiConnection: armadaUrl: "server:50051" forceNoTls: true diff --git a/internal/jobservice/application.go b/internal/jobservice/application.go index 4672b93f3db..1290f1c17b1 100644 --- a/internal/jobservice/application.go +++ b/internal/jobservice/application.go @@ -9,10 +9,12 @@ import ( "path/filepath" "time" + _ "github.com/jackc/pgx/v4/stdlib" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "github.com/armadaproject/armada/internal/common/auth/authorization" + "github.com/armadaproject/armada/internal/common/database" grpcCommon "github.com/armadaproject/armada/internal/common/grpc" "github.com/armadaproject/armada/internal/common/logging" "github.com/armadaproject/armada/internal/jobservice/configuration" @@ -44,22 +46,41 @@ func (a *App) StartUp(ctx context.Context, config *configuration.JobServiceConfi []authorization.AuthService{&authorization.AnonymousAuthService{}}, ) - dbDir := filepath.Dir(config.DatabasePath) - if _, err := os.Stat(dbDir); os.IsNotExist(err) { - if errMkDir := os.Mkdir(dbDir, 0o755); errMkDir != nil { - log.Fatalf("error: could not make directory at %s for sqlite db: %v", dbDir, errMkDir) + var db *sql.DB + + if config.DatabaseType == "postgres" { + var err error + log.Info("using postgres") + db, err = sql.Open("pgx", database.CreateConnectionString(config.PostgresConfig.Connection)) + if err != nil { + return err + } + db.SetMaxOpenConns(config.PostgresConfig.MaxOpenConns) + db.SetMaxIdleConns(config.PostgresConfig.MaxIdleConns) + db.SetConnMaxLifetime(config.PostgresConfig.ConnMaxLifetime) + + } else if config.DatabaseType == "sqlite" { + log.Info("using sqlite") + var err error + + dbDir := filepath.Dir(config.DatabasePath) + if _, err := os.Stat(dbDir); os.IsNotExist(err) { + if errMkDir := os.Mkdir(dbDir, 0o755); errMkDir != nil { + log.Fatalf("error: could not make directory at %s for sqlite db: %v", dbDir, errMkDir) + } } - } - db, err := sql.Open("sqlite", config.DatabasePath) - if err != nil { - log.Fatalf("error opening sqlite DB from %s %v", config.DatabasePath, err) - } - defer func() { - if err := db.Close(); err != nil { - log.Warnf("error closing database: %v", err) + db, err = sql.Open("sqlite", config.DatabasePath) + if err != nil { + log.Fatalf("error opening sqlite DB from %s %v", config.DatabasePath, err) } - }() + defer func() { + if err := db.Close(); err != nil { + log.Warnf("error closing database: %v", err) + } + }() + } + sqlJobRepo := repository.NewSQLJobService(config, db) sqlJobRepo.Setup() jobService := server.NewJobService(config, sqlJobRepo) diff --git a/internal/jobservice/configuration/types.go b/internal/jobservice/configuration/types.go index 4b41a63ce2e..84e24bc18fc 100644 --- a/internal/jobservice/configuration/types.go +++ b/internal/jobservice/configuration/types.go @@ -1,10 +1,19 @@ package configuration import ( + "time" + grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration" "github.com/armadaproject/armada/pkg/client" ) +type PostgresConfig struct { + MaxOpenConns int + MaxIdleConns int + ConnMaxLifetime time.Duration + Connection map[string]string +} + type JobServiceConfiguration struct { HttpPort uint16 GrpcPort uint16 @@ -18,6 +27,13 @@ type JobServiceConfiguration struct { SubscribeJobSetTime int64 // Purging JobSets PurgeJobSetTime int64 - // Absolute or relative path for sqllite database and must include the db name + // Type of database used - must be either 'postgres' or 'sqlite' + DatabaseType string + // Absolute or relative path for sqlite database and must include the db name + // This field is only read when DatabaseType is 'sqlite' DatabasePath string + + // Configuration details for using a Postgres database; this field is + // ignored if the DatabaseType above is not 'postgres' + PostgresConfig PostgresConfig } diff --git a/internal/jobservice/repository/sql_job_service.go b/internal/jobservice/repository/sql_job_service.go index 97d6866f114..f83ee606dbe 100644 --- a/internal/jobservice/repository/sql_job_service.go +++ b/internal/jobservice/repository/sql_job_service.go @@ -38,7 +38,10 @@ func NewSQLJobService(config *configuration.JobServiceConfiguration, db *sql.DB) // Call on a newly created SQLJobService object to setup the DB for use. func (s *SQLJobService) Setup() { - s.useWAL() + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.useWAL() + } + s.CreateTable() } @@ -60,28 +63,36 @@ type SubscribedTuple struct { // Create a Table from a hard-coded schema. func (s *SQLJobService) CreateTable() { - s.lock.Lock() - defer s.lock.Unlock() + var integerType string + if s.jobServiceConfig.DatabaseType == "sqlite" { + integerType = "INT" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + integerType = "INTEGER" + } + + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } _, err := s.db.Exec("DROP TABLE IF EXISTS jobservice") if err != nil { panic(err) } - _, err = s.db.Exec(` -CREATE TABLE jobservice ( -Queue TEXT, -JobSetId TEXT, -JobId TEXT, -JobResponseState TEXT, -JobResponseError TEXT, -Timestamp INT, -PRIMARY KEY(JobId) -)`) + _, err = s.db.Exec(fmt.Sprintf(` + CREATE TABLE jobservice ( + Queue TEXT, + JobSetId TEXT, + JobId TEXT, + JobResponseState TEXT, + JobResponseError TEXT, + Timestamp %s, + PRIMARY KEY(JobId))`, integerType)) + if err != nil { panic(err) } - _, errIndex := s.db.Exec(`CREATE INDEX idx_job_set_queue -ON jobservice (Queue, JobSetId)`) + _, errIndex := s.db.Exec(`CREATE INDEX idx_job_set_queue ON jobservice (Queue, JobSetId)`) if errIndex != nil { panic(errIndex) } @@ -90,14 +101,14 @@ ON jobservice (Queue, JobSetId)`) panic(err) } - _, err = s.db.Exec(` - CREATE TABLE jobsets ( - Queue TEXT, - JobSetId TEXT, - Timestamp INT, - ConnectionError TEXT, - FromMessageId TEXT, - UNIQUE(Queue,JobSetId))`) + _, err = s.db.Exec(fmt.Sprintf(` + CREATE TABLE jobsets ( + Queue TEXT, + JobSetId TEXT, + Timestamp %s, + ConnectionError TEXT, + FromMessageId TEXT, + UNIQUE(Queue,JobSetId))`, integerType)) if err != nil { panic(err) } @@ -105,9 +116,19 @@ ON jobservice (Queue, JobSetId)`) // Get the JobStatus given the jodId func (s *SQLJobService) GetJobStatus(jobId string) (*js.JobServiceResponse, error) { - s.lock.Lock() - defer s.lock.Unlock() - row := s.db.QueryRow("SELECT Queue, JobSetId, JobResponseState, JobResponseError FROM jobservice WHERE JobId=?", jobId) + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "SELECT Queue, JobSetId, JobResponseState, JobResponseError FROM jobservice WHERE JobId = ?" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = "SELECT Queue, JobSetId, JobResponseState, JobResponseError FROM jobservice WHERE JobId = $1" + } + + row := s.db.QueryRow(sqlStmt, jobId) var queue, jobSetId, jobState, jobError string err := row.Scan(&queue, &jobSetId, &jobState, &jobError) @@ -165,17 +186,31 @@ func jobStateStrToJSRState(jobState string) (js.JobServiceResponse_State, error) // Update database with JobTable. func (s *SQLJobService) UpdateJobServiceDb(jobTable *JobStatus) error { - // SQLite only allows one write at a time. Therefore we must serialize - // writes in order to avoid SQL_BUSY errors. - s.lock.Lock() - defer s.lock.Unlock() + if s.jobServiceConfig.DatabaseType == "sqlite" { + // SQLite only allows one write at a time. Therefore we must serialize + // writes in order to avoid SQL_BUSY errors. + s.lock.Lock() + defer s.lock.Unlock() + } - stmt, err := s.db.Prepare("INSERT OR REPLACE INTO jobservice VALUES (?, ?, ?, ?, ?, ?)") + var sqlStmt string + + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "INSERT OR REPLACE INTO jobservice VALUES (?, ?, ?, ?, ?, ?)" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = `INSERT INTO jobservice (Queue, JobSetId, JobId, JobResponseState, JobResponseError, Timestamp) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (JobId) DO UPDATE SET + (Queue, JobSetId, JobResponseState, JobResponseError, Timestamp) = + (excluded.Queue, excluded.JobSetId, excluded.JobResponseState, excluded.JobResponseError, excluded.Timestamp)` + } + + stmt, err := s.db.Prepare(sqlStmt) if err != nil { return err } defer stmt.Close() - _, errExec := stmt.Exec(jobTable.queue, jobTable.jobSetId, jobTable.jobId, jobTable.jobResponse.State.String(), jobTable.jobResponse.Error, jobTable.timeStamp) + _, errExec := stmt.Exec(jobTable.queue, jobTable.jobSetId, jobTable.jobId, + jobTable.jobResponse.State.String(), jobTable.jobResponse.Error, jobTable.timeStamp) return errExec } @@ -187,9 +222,22 @@ func (s *SQLJobService) UpdateJobSetDb(queue string, jobSet string, fromMessageI if !subscribe { return fmt.Errorf("queue %s jobSet %s is already unsubscribed", queue, jobSet) } - s.lock.Lock() - defer s.lock.Unlock() - jobSetState, err := s.db.Prepare("INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)") + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET + (Timestamp, ConnectionError, FromMessageId) = + (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` + } + + jobSetState, err := s.db.Prepare(sqlStmt) if err != nil { return err } @@ -201,10 +249,13 @@ func (s *SQLJobService) UpdateJobSetDb(queue string, jobSet string, fromMessageI return nil } -// Simple Health Check to Verify if SqlLite is working. +// Simple Health Check to Verify if SQLite is working. func (s *SQLJobService) HealthCheck() (bool, error) { - s.lock.Lock() - defer s.lock.Unlock() + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + row := s.db.QueryRow("SELECT 1") var col int err := row.Scan(&col) @@ -217,9 +268,19 @@ func (s *SQLJobService) HealthCheck() (bool, error) { // Check if JobSet is in our map. func (s *SQLJobService) IsJobSetSubscribed(queue string, jobSet string) (bool, string, error) { - s.lock.Lock() - defer s.lock.Unlock() - row := s.db.QueryRow("SELECT Queue, JobSetId, FromMessageId FROM jobsets WHERE Queue=? AND JobSetId=?", queue, jobSet) + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + + var sqlStmt string + + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "SELECT Queue, JobSetId, FromMessageId FROM jobsets WHERE Queue = ? AND JobSetId = ?" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = "SELECT Queue, JobSetId, FromMessageId FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + } + row := s.db.QueryRow(sqlStmt, queue, jobSet) var queueScan, jobSetIdScan, fromMessageId string err := row.Scan(&queueScan, &jobSetIdScan, &fromMessageId) @@ -239,16 +300,29 @@ func (s *SQLJobService) AddMessageIdAndClearSubscriptionError(queue string, jobS // Set subscription error if present func (s *SQLJobService) SetSubscriptionError(queue string, jobSet string, connErr string, fromMessageId string) error { - s.lock.Lock() - defer s.lock.Unlock() + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET + (Timestamp, ConnectionError, FromMessageId) = + (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` + } - jobSetState, err := s.db.Prepare("INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)") + jobSetState, err := s.db.Prepare(sqlStmt) if err != nil { return err } defer jobSetState.Close() subscribeTable := NewSubscribeTable(queue, jobSet) - _, jobSetErr := jobSetState.Exec(subscribeTable.queue, jobSet, subscribeTable.lastRequestTimeStamp, connErr, fromMessageId) + _, jobSetErr := jobSetState.Exec(subscribeTable.queue, jobSet, subscribeTable.lastRequestTimeStamp, + connErr, fromMessageId) if jobSetErr != nil { return jobSetErr } @@ -257,7 +331,13 @@ func (s *SQLJobService) SetSubscriptionError(queue string, jobSet string, connEr // Get subscription error if present func (s *SQLJobService) GetSubscriptionError(queue string, jobSet string) (string, error) { - row := s.db.QueryRow("SELECT ConnectionError FROM jobsets WHERE Queue=? AND JobSetId=?", queue, jobSet) + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "SELECT ConnectionError FROM jobsets WHERE Queue = ? AND JobSetId = ?" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = "SELECT ConnectionError FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + } + row := s.db.QueryRow(sqlStmt, queue, jobSet) var connError string err := row.Scan(&connError) @@ -274,16 +354,29 @@ func (s *SQLJobService) GetSubscriptionError(queue string, jobSet string) (strin // SubscribeTable contains Queue, JobSet and time when it was created. func (s *SQLJobService) SubscribeJobSet(queue string, jobSet string, fromMessageId string) error { - s.lock.Lock() - defer s.lock.Unlock() + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } - jobSetState, err := s.db.Prepare("INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)") + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "INSERT OR REPLACE INTO jobsets VALUES(?, ?, ?, ?, ?)" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = `INSERT INTO jobsets (Queue, JobSetId, Timestamp, ConnectionError, FromMessageId) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (Queue, JobSetId) DO UPDATE SET + (Timestamp, ConnectionError, FromMessageId) = + (excluded.Timestamp, excluded.ConnectionError, excluded.FromMessageId)` + } + + jobSetState, err := s.db.Prepare(sqlStmt) if err != nil { return err } defer jobSetState.Close() subscribeTable := NewSubscribeTable(queue, jobSet) - _, jobSetErr := jobSetState.Exec(subscribeTable.queue, subscribeTable.jobSet, subscribeTable.lastRequestTimeStamp, "", fromMessageId) + _, jobSetErr := jobSetState.Exec(subscribeTable.queue, subscribeTable.jobSet, + subscribeTable.lastRequestTimeStamp, "", fromMessageId) return jobSetErr } @@ -308,10 +401,20 @@ func (s *SQLJobService) CheckToUnSubscribe(queue string, jobSet string, configTi if !jobSetFound { return false, nil } - s.lock.Lock() - defer s.lock.Unlock() - row := s.db.QueryRow("SELECT Timestamp FROM jobsets WHERE Queue=? AND JobSetId=?", queue, jobSet) + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "SELECT Timestamp FROM jobsets WHERE Queue = ? AND JobSetId = ?" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = "SELECT Timestamp FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + } + + row := s.db.QueryRow(sqlStmt, queue, jobSet) var timeStamp int timeErr := row.Scan(&timeStamp) @@ -330,10 +433,19 @@ func (s *SQLJobService) CheckToUnSubscribe(queue string, jobSet string, configTi } func (s *SQLJobService) UnsubscribeJobSet(queue, jobSet string) (int64, error) { - s.lock.Lock() - defer s.lock.Unlock() + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "DELETE FROM jobsets WHERE Queue = ? AND JobSetId = ?" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = "DELETE FROM jobsets WHERE Queue = $1 AND JobSetId = $2" + } - result, err := s.db.Exec("DELETE FROM jobsets WHERE Queue=? AND JobSetId=?", queue, jobSet) + result, err := s.db.Exec(sqlStmt, queue, jobSet) if err != nil { return 0, err } @@ -342,10 +454,19 @@ func (s *SQLJobService) UnsubscribeJobSet(queue, jobSet string) (int64, error) { // Delete Jobs in the database func (s *SQLJobService) DeleteJobsInJobSet(queue string, jobSet string) (int64, error) { - s.lock.Lock() - defer s.lock.Unlock() + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } + + var sqlStmt string + if s.jobServiceConfig.DatabaseType == "sqlite" { + sqlStmt = "DELETE FROM jobservice WHERE Queue = ? AND JobSetId = ?" + } else if s.jobServiceConfig.DatabaseType == "postgres" { + sqlStmt = "DELETE FROM jobservice WHERE Queue = $1 AND JobSetId = $2" + } - result, err := s.db.Exec("DELETE FROM jobservice WHERE Queue=? AND JobSetId=?", queue, jobSet) + result, err := s.db.Exec(sqlStmt, queue, jobSet) if err != nil { return 0, err } @@ -353,8 +474,10 @@ func (s *SQLJobService) DeleteJobsInJobSet(queue string, jobSet string) (int64, } func (s *SQLJobService) GetSubscribedJobSets() ([]SubscribedTuple, error) { - s.lock.Lock() - defer s.lock.Unlock() + if s.jobServiceConfig.DatabaseType == "sqlite" { + s.lock.Lock() + defer s.lock.Unlock() + } rows, err := s.db.Query("SELECT Queue, JobSetId, FromMessageId FROM jobsets") if err != nil { diff --git a/internal/jobservice/repository/sql_job_service_test.go b/internal/jobservice/repository/sql_job_service_test.go index 9fee9a73e2d..0a2e2522961 100644 --- a/internal/jobservice/repository/sql_job_service_test.go +++ b/internal/jobservice/repository/sql_job_service_test.go @@ -6,10 +6,12 @@ import ( "os" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/armadaproject/armada/internal/common/database" "github.com/armadaproject/armada/internal/jobservice/configuration" "github.com/armadaproject/armada/pkg/api/jobservice" ) @@ -392,14 +394,55 @@ func TestConcurrentJobStatusUpdating(t *testing.T) { } func WithSqlServiceRepo(action func(r *SQLJobService)) { + var db *sql.DB + var err error config := &configuration.JobServiceConfiguration{} - db, err := sql.Open("sqlite", "test.db") - if err != nil { - panic(err) + + // If JSDBTYPE is not specified in the environment, default to 'sqlite' + jsDatabase := "sqlite" + + if os.Getenv("JSDBTYPE") == "postgres" { + jsDatabase = "postgres" + } + + if jsDatabase == "sqlite" { + config.DatabaseType = "sqlite" + + db, err = sql.Open("sqlite", "test.db") + if err != nil { + panic(err) + } + } else if jsDatabase == "postgres" { + config.DatabaseType = "postgres" + config.PostgresConfig = configuration.PostgresConfig{ + MaxOpenConns: 20, + MaxIdleConns: 5, + ConnMaxLifetime: 30 * time.Second, + Connection: map[string]string{ + "host": "localhost", + "port": "5432", + "user": "postgres", + "password": "psw", + "dbname": "postgres", + "sslmode": "disable", + }, + } + + db, err = sql.Open("pgx", database.CreateConnectionString(config.PostgresConfig.Connection)) + if err != nil { + panic(err) + } + db.SetMaxOpenConns(config.PostgresConfig.MaxOpenConns) + db.SetMaxIdleConns(config.PostgresConfig.MaxIdleConns) + db.SetConnMaxLifetime(config.PostgresConfig.ConnMaxLifetime) } + repo := NewSQLJobService(config, db) repo.Setup() action(repo) db.Close() - os.Remove("test.db") + + if config.DatabaseType == "sqlite" { + os.Remove("test.db") + } } diff --git a/makefile b/makefile index 9ea4b788abd..3b318337fc5 100644 --- a/makefile +++ b/makefile @@ -362,6 +362,8 @@ tests-no-setup: gotestsum $(GOTESTSUM) -- -v ./pkg... 2>&1 | tee test_reports/pkg.txt $(GOTESTSUM) -- -v ./cmd... 2>&1 | tee test_reports/cmd.txt + +# Note that we do separate Job Service repository test runs for both sqlite and postgres database types .ONESHELL: tests: gotestsum mkdir -p test_reports @@ -369,7 +371,12 @@ tests: gotestsum docker run -d --name=postgres $(DOCKER_NET) -p 5432:5432 -e POSTGRES_PASSWORD=psw postgres:14.2 sleep 3 function tearDown { docker rm -f redis postgres; }; trap tearDown EXIT - $(GOTESTSUM) -- -coverprofile internal_coverage.xml -v ./internal... 2>&1 | tee test_reports/internal.txt + $(GOTESTSUM) -- $(shell go list ./internal/... | grep -v 'jobservice/repository') \ + -coverprofile internal_coverage.xml -v 2>&1 | tee test_reports/internal.txt + env JSDBTYPE=sqlite $(GOTESTSUM) -- -v \ + ./internal/jobservice/repository/... 2>&1 | tee -a test_reports/internal.txt + env JSDBTYPE=postgres $(GOTESTSUM) -- -v \ + ./internal/jobservice/repository/... 2>&1 | tee -a test_reports/internal.txt $(GOTESTSUM) -- -coverprofile pkg_coverage.xml -v ./pkg... 2>&1 | tee test_reports/pkg.txt $(GOTESTSUM) -- -coverprofile cmd_coverage.xml -v ./cmd... 2>&1 | tee test_reports/cmd.txt From ec82b5cfdeb79499f002601b1011bcd85a46c468 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Tue, 11 Apr 2023 15:38:02 +0200 Subject: [PATCH 4/4] Fix node reserved resources (#2365) * Fix node reserved resources * Lint --- config/executor/config.yaml | 5 +- internal/armada/server/lease.go | 1 - internal/executor/application.go | 9 +- internal/executor/configuration/types.go | 17 +- .../executor/service/cluster_allocation.go | 3 - .../utilisation/cluster_utilisation.go | 169 +++++------- .../schedulerobjects/resourcelist.go | 64 +++-- .../schedulerobjects/resourcelist_test.go | 252 ++++++++++++++++++ 8 files changed, 382 insertions(+), 138 deletions(-) diff --git a/config/executor/config.yaml b/config/executor/config.yaml index e34db679540..536c5fd472d 100644 --- a/config/executor/config.yaml +++ b/config/executor/config.yaml @@ -46,7 +46,10 @@ kubernetes: maxTerminatedPods: 1000 # Should be lower than kube-controller-managed terminated-pod-gc-threshold (default 12500) stuckTerminatingPodExpiry: 1m podKillTimeout: 5m - nodeReservedResourcesPriority: 2000001000 # same priority as system-node-critical + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNode: + cpu: 100m + memory: 50Mi + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority: 2000001000 # same priority as system-node-critical podDefaults: ingress: hostnameSuffix: "svc" diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 8734e1c9542..4dc9b8e5408 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -376,7 +376,6 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL for _, job := range jobs { nodeIdByJobId[job.Id] = node.Id } - nodes = append(nodes, node) } indexedResources := q.schedulingConfig.IndexedResources diff --git a/internal/executor/application.go b/internal/executor/application.go index c177d5414b3..851eb92886d 100644 --- a/internal/executor/application.go +++ b/internal/executor/application.go @@ -155,8 +155,8 @@ func setupExecutorApiComponents( nil, config.Kubernetes.TrackedNodeLabels, config.Kubernetes.NodeIdLabel, - config.Kubernetes.NodeReservedResources, - config.Kubernetes.NodeReservedResourcesPriority, + config.Kubernetes.MinimumResourcesMarkedAllocatedToNonArmadaPodsPerNode, + config.Kubernetes.MinimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority, ) eventReporter, stopReporter := reporter.NewJobEventReporter( @@ -261,8 +261,8 @@ func setupServerApiComponents( usageClient, config.Kubernetes.TrackedNodeLabels, config.Kubernetes.NodeIdLabel, - config.Kubernetes.NodeReservedResources, - config.Kubernetes.NodeReservedResourcesPriority, + config.Kubernetes.MinimumResourcesMarkedAllocatedToNonArmadaPodsPerNode, + config.Kubernetes.MinimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority, ) jobLeaseService := service.NewJobLeaseService( @@ -286,7 +286,6 @@ func setupServerApiComponents( clusterUtilisationService, submitter, etcdHealthMonitor, - config.Kubernetes.NodeReservedResources, ) jobManager := service.NewJobManager( diff --git a/internal/executor/configuration/types.go b/internal/executor/configuration/types.go index c18fa8c5c9e..8ba9ebb7cc4 100644 --- a/internal/executor/configuration/types.go +++ b/internal/executor/configuration/types.go @@ -55,14 +55,15 @@ type KubernetesConfiguration struct { PodDefaults *PodDefaults PendingPodChecks *podchecks.Checks FatalPodSubmissionErrors []string - // NodeReservedResources config is used to factor in reserved resources on each node - // when validating can a job be scheduled on a node during job submit (i.e. factor in resources for daemonset pods) - NodeReservedResources armadaresource.ComputeResources - // NodeReservedResourcesPriority - The priority the reserved resource is reported at - // All pods in kubernetes have a priority - and we report to the Armada API resource for a given priority - // Therefore we also need to set a priority for the reserved resource - NodeReservedResourcesPriority int32 - PodKillTimeout time.Duration + // Minimum amount of resources marked as allocated to non-Armada pods on each node. + // I.e., if the total resources allocated to non-Armada pods on some node drops below this value, + // the executor adds a fictional allocation to make up the difference, such that the total is at least this. + // Hence, specifying can ensure that, e.g., if a deamonset pod restarts, those resources are not considered for scheduling. + MinimumResourcesMarkedAllocatedToNonArmadaPodsPerNode armadaresource.ComputeResources + // When adding a fictional allocation to ensure resources allocated to non-Armada pods is at least + // MinimumResourcesMarkedAllocatedToNonArmadaPodsPerNode, those resources are marked allocated at this priority. + MinimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority int32 + PodKillTimeout time.Duration } type EtcdConfiguration struct { diff --git a/internal/executor/service/cluster_allocation.go b/internal/executor/service/cluster_allocation.go index 1b837621c70..1b00eeceab0 100644 --- a/internal/executor/service/cluster_allocation.go +++ b/internal/executor/service/cluster_allocation.go @@ -123,7 +123,6 @@ type LegacyClusterAllocationService struct { clusterContext executorContext.ClusterContext submitter job.Submitter etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor - reserved armadaresource.ComputeResources } func NewLegacyClusterAllocationService( @@ -133,7 +132,6 @@ func NewLegacyClusterAllocationService( utilisationService utilisation.UtilisationService, submitter job.Submitter, etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor, - reserved armadaresource.ComputeResources, ) *LegacyClusterAllocationService { return &LegacyClusterAllocationService{ leaseService: leaseService, @@ -142,7 +140,6 @@ func NewLegacyClusterAllocationService( clusterContext: clusterContext, submitter: submitter, etcdHealthMonitor: etcdHealthMonitor, - reserved: reserved, } } diff --git a/internal/executor/utilisation/cluster_utilisation.go b/internal/executor/utilisation/cluster_utilisation.go index f3ae6bc77e1..d7828f6e7f6 100644 --- a/internal/executor/utilisation/cluster_utilisation.go +++ b/internal/executor/utilisation/cluster_utilisation.go @@ -14,7 +14,7 @@ import ( "github.com/armadaproject/armada/internal/executor/domain" "github.com/armadaproject/armada/internal/executor/node" "github.com/armadaproject/armada/internal/executor/util" - . "github.com/armadaproject/armada/internal/executor/util" + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/pkg/api" ) @@ -24,14 +24,14 @@ type UtilisationService interface { } type ClusterUtilisationService struct { - clusterContext context.ClusterContext - queueUtilisationService PodUtilisationService - nodeInfoService node.NodeInfoService - usageClient api.UsageClient - trackedNodeLabels []string - nodeIdLabel string - nodeReservedResources armadaresource.ComputeResources - nodeReservedResourcesPriority int32 + clusterContext context.ClusterContext + queueUtilisationService PodUtilisationService + nodeInfoService node.NodeInfoService + usageClient api.UsageClient + trackedNodeLabels []string + nodeIdLabel string + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNode armadaresource.ComputeResources + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority int32 } func NewClusterUtilisationService( @@ -41,18 +41,18 @@ func NewClusterUtilisationService( usageClient api.UsageClient, trackedNodeLabels []string, nodeIdLabel string, - nodeReservedResources armadaresource.ComputeResources, - nodeReservedResourcesPriority int32, + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNode armadaresource.ComputeResources, + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority int32, ) *ClusterUtilisationService { return &ClusterUtilisationService{ - clusterContext: clusterContext, - queueUtilisationService: queueUtilisationService, - nodeInfoService: nodeInfoService, - usageClient: usageClient, - trackedNodeLabels: trackedNodeLabels, - nodeIdLabel: nodeIdLabel, - nodeReservedResources: nodeReservedResources, - nodeReservedResourcesPriority: nodeReservedResourcesPriority, + clusterContext: clusterContext, + queueUtilisationService: queueUtilisationService, + nodeInfoService: nodeInfoService, + usageClient: usageClient, + trackedNodeLabels: trackedNodeLabels, + nodeIdLabel: nodeIdLabel, + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNode: minimumResourcesMarkedAllocatedToNonArmadaPodsPerNode, + minimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority: minimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority, } } @@ -71,7 +71,7 @@ func (clusterUtilisationService *ClusterUtilisationService) ReportClusterUtilisa return } // We only report cluster utilisation for legacy use cases - allBatchPods = util.FilterPods(allBatchPods, IsLegacyManagedPod) + allBatchPods = util.FilterPods(allBatchPods, util.IsLegacyManagedPod) nodeGroupInfos, err := clusterUtilisationService.GetAllNodeGroupAllocationInfo(true) if err != nil { @@ -81,7 +81,7 @@ func (clusterUtilisationService *ClusterUtilisationService) ReportClusterUtilisa nodeGroupReports := make([]api.NodeTypeUsageReport, 0, len(nodeGroupInfos)) for _, nodeGroup := range nodeGroupInfos { - managedPodsOnNodes := GetPodsOnNodes(allBatchPods, nodeGroup.Nodes) + managedPodsOnNodes := util.GetPodsOnNodes(allBatchPods, nodeGroup.Nodes) queueReports := clusterUtilisationService.createReportsOfQueueUsages(managedPodsOnNodes) unschedulableNodes := util. @@ -130,7 +130,7 @@ func (cls *ClusterUtilisationService) GetAvailableClusterCapacity(legacy bool) ( } allPodsRequiringResource := getAllPodsRequiringResourceOnProcessingNodes(allPods, processingNodes) - allNonCompletePodsRequiringResource := FilterNonCompletedPods(allPodsRequiringResource) + allNonCompletePodsRequiringResource := util.FilterNonCompletedPods(allPodsRequiringResource) totalNodeResource := armadaresource.CalculateTotalResource(processingNodes) totalPodResource := armadaresource.CalculateTotalResourceRequest(allNonCompletePodsRequiringResource) @@ -142,31 +142,40 @@ func (cls *ClusterUtilisationService) GetAvailableClusterCapacity(legacy bool) ( runningPodsByNode := groupPodsByNodes(allNonCompletePodsRequiringResource) nodes := make([]api.NodeInfo, 0, len(processingNodes)) runIdsByNode := cls.getRunIdsByNode(processingNodes, allPods, legacy) - for _, n := range processingNodes { - allocatable := armadaresource.FromResourceList(n.Status.Allocatable) + for _, node := range processingNodes { + allocatable := armadaresource.FromResourceList(node.Status.Allocatable) available := allocatable.DeepCopy() - available.Sub(nodesUsage[n.Name]) + available.Sub(nodesUsage[node.Name]) - runningNodePods := runningPodsByNode[n.Name] + runningNodePods := runningPodsByNode[node.Name] runningNodePodsNonArmada := util.FilterPods(runningNodePods, func(pod *v1.Pod) bool { return !util.IsManagedPod(pod) }) - allocated := getAllocatedResourcesByPriority(runningNodePods) - allocatedNonArmada := getAllocatedResourcesByPriority(runningNodePodsNonArmada) - - reserved := calculateReservedNodeResource(cls.nodeReservedResources, armadaresource.CalculateTotalResourceRequest(runningNodePodsNonArmada)) - addReservedResource(reserved, cls.nodeReservedResourcesPriority, allocatedNonArmada) - + allocatedByPriority := allocatedByPriorityAndResourceTypeFromPods(runningNodePods) + allocatedByPriorityNonArmada := allocatedByPriorityAndResourceTypeFromPods(runningNodePodsNonArmada) + allocatedByPriorityNonArmada.MaxAggregatedByResource( + cls.minimumResourcesMarkedAllocatedToNonArmadaPodsPerNodePriority, + schedulerobjects.ResourceList{Resources: cls.minimumResourcesMarkedAllocatedToNonArmadaPodsPerNode}, + ) + + nodeAllocatedResources := make(map[int32]api.ComputeResource) + for p, rl := range allocatedByPriority { + nodeAllocatedResources[p] = api.ComputeResource{Resources: rl.Resources} + } + nodeNonArmadaAllocatedResources := make(map[int32]api.ComputeResource) + for p, rl := range allocatedByPriorityNonArmada { + nodeNonArmadaAllocatedResources[p] = api.ComputeResource{Resources: rl.Resources} + } nodes = append(nodes, api.NodeInfo{ - Name: n.Name, - Labels: cls.filterTrackedLabels(n.Labels), - Taints: n.Spec.Taints, + Name: node.Name, + Labels: cls.filterTrackedLabels(node.Labels), + Taints: node.Spec.Taints, AllocatableResources: allocatable, AvailableResources: available, TotalResources: allocatable, - AllocatedResources: allocated, - RunIdsByState: runIdsByNode[n.Name], - NonArmadaAllocatedResources: allocatedNonArmada, + AllocatedResources: nodeAllocatedResources, + RunIdsByState: runIdsByNode[node.Name], + NonArmadaAllocatedResources: nodeNonArmadaAllocatedResources, }) } @@ -221,35 +230,6 @@ func (clusterUtilisationService *ClusterUtilisationService) getRunIdsByNode(node return result } -func calculateReservedNodeResource( - reserved armadaresource.ComputeResources, - existingNodeResource armadaresource.ComputeResources, -) armadaresource.ComputeResources { - if reserved == nil { - return armadaresource.ComputeResources{} - } - reservedRemaining := reserved - reservedRemaining.Sub(existingNodeResource) - reservedRemaining.LimitToZero() - return reservedRemaining -} - -func addReservedResource( - reserved armadaresource.ComputeResources, - reservedPriority int32, - resourceByPriority map[int32]api.ComputeResource, -) { - if reserved.IsValid() && !reserved.IsZero() { - if resourceAtPriority, present := resourceByPriority[reservedPriority]; present { - totalResource := armadaresource.ComputeResources(resourceAtPriority.Resources) - totalResource.Add(reserved) - resourceByPriority[reservedPriority] = api.ComputeResource{Resources: totalResource} - } else { - resourceByPriority[reservedPriority] = api.ComputeResource{Resources: reserved} - } - } -} - func getJobRunState(pod *v1.Pod) api.JobState { switch { case pod.Status.Phase == v1.PodPending: @@ -289,31 +269,18 @@ func groupPodsByNodes(pods []*v1.Pod) map[string][]*v1.Pod { return podsByNodes } -func getAllocatedResourcesByPriority(pods []*v1.Pod) map[int32]api.ComputeResource { - resourceUsageByPriority := make(map[int32]api.ComputeResource) - - podsByPriority := groupPodsByPriority(pods) - - for priority, podsForPriority := range podsByPriority { - resources := api.ComputeResource{Resources: armadaresource.CalculateTotalResourceRequest(podsForPriority)} - resourceUsageByPriority[priority] = resources - } - - return resourceUsageByPriority -} - -func groupPodsByPriority(pods []*v1.Pod) map[int32][]*v1.Pod { - priorityMap := make(map[int32][]*v1.Pod) - - for _, p := range pods { +func allocatedByPriorityAndResourceTypeFromPods(pods []*v1.Pod) schedulerobjects.QuantityByPriorityAndResourceType { + rv := make(schedulerobjects.QuantityByPriorityAndResourceType) + for _, pod := range pods { var priority int32 = 0 - if p.Spec.Priority != nil { - priority = *(p.Spec.Priority) + if pod.Spec.Priority != nil { + priority = *(pod.Spec.Priority) } - priorityMap[priority] = append(priorityMap[priority], p) + request := armadaresource.TotalPodResourceRequest(&pod.Spec) + rl := schedulerobjects.ResourceList{Resources: request} + rv.AddResourceList(priority, rl) } - - return priorityMap + return rv } // GetAllNodeGroupAllocationInfo returns allocation information for all nodes on the cluster. @@ -336,7 +303,7 @@ func (clusterUtilisationService *ClusterUtilisationService) GetAllNodeGroupAlloc return []*NodeGroupAllocationInfo{}, err } if legacy { - batchPods = util.FilterPods(batchPods, IsLegacyManagedPod) + batchPods = util.FilterPods(batchPods, util.IsLegacyManagedPod) } nodeGroups := clusterUtilisationService.nodeInfoService.GroupNodesByType(allAvailableProcessingNodes) @@ -364,7 +331,7 @@ func (clusterUtilisationService *ClusterUtilisationService) GetAllNodeGroupAlloc // significant resource is running on cordoned nodes. func getCordonedResource(nodes []*v1.Node, pods []*v1.Pod) armadaresource.ComputeResources { cordonedNodes := util.FilterNodes(nodes, func(node *v1.Node) bool { return node.Spec.Unschedulable }) - podsOnNodes := GetPodsOnNodes(pods, cordonedNodes) + podsOnNodes := util.GetPodsOnNodes(pods, cordonedNodes) usage := armadaresource.ComputeResources{} for _, pod := range podsOnNodes { for _, container := range pod.Spec.Containers { @@ -380,23 +347,23 @@ func getCordonedResource(nodes []*v1.Node, pods []*v1.Pod) armadaresource.Comput func (clusterUtilisationService *ClusterUtilisationService) getAllocatableResourceByNodeType() (map[string]armadaresource.ComputeResources, error) { allAvailableProcessingNodes, err := clusterUtilisationService.nodeInfoService.GetAllAvailableProcessingNodes() if err != nil { - return map[string]armadaresource.ComputeResources{}, fmt.Errorf("Failed getting total allocatable cluster capacity due to: %s", err) + return map[string]armadaresource.ComputeResources{}, fmt.Errorf("failed getting total allocatable cluster capacity due to: %s", err) } allPods, err := clusterUtilisationService.clusterContext.GetAllPods() if err != nil { - return map[string]armadaresource.ComputeResources{}, fmt.Errorf("Failed getting total allocatable cluster capacity due to: %s", err) + return map[string]armadaresource.ComputeResources{}, fmt.Errorf("failed getting total allocatable cluster capacity due to: %s", err) } - unmanagedPods := FilterPods(allPods, func(pod *v1.Pod) bool { - return !IsManagedPod(pod) + unmanagedPods := util.FilterPods(allPods, func(pod *v1.Pod) bool { + return !util.IsManagedPod(pod) }) - activeUnmanagedPods := FilterPodsWithPhase(unmanagedPods, v1.PodRunning) + activeUnmanagedPods := util.FilterPodsWithPhase(unmanagedPods, v1.PodRunning) nodeGroups := clusterUtilisationService.nodeInfoService.GroupNodesByType(allAvailableProcessingNodes) result := map[string]armadaresource.ComputeResources{} for _, nodeGroup := range nodeGroups { - activeUnmanagedPodsOnNodes := GetPodsOnNodes(activeUnmanagedPods, nodeGroup.Nodes) + activeUnmanagedPodsOnNodes := util.GetPodsOnNodes(activeUnmanagedPods, nodeGroup.Nodes) unmanagedPodResource := armadaresource.CalculateTotalResourceRequest(activeUnmanagedPodsOnNodes) totalNodeGroupResource := armadaresource.CalculateTotalResource(nodeGroup.Nodes) allocatableNodeGroupResource := totalNodeGroupResource.DeepCopy() @@ -426,7 +393,7 @@ func getAllPodsRequiringResourceOnProcessingNodes(allPods []*v1.Pod, processingN for _, pod := range allPods { if _, presentOnProcessingNode := nodeMap[pod.Spec.NodeName]; presentOnProcessingNode { podsUsingResourceOnProcessingNodes = append(podsUsingResourceOnProcessingNodes, pod) - } else if IsManagedPod(pod) && pod.Spec.NodeName == "" { + } else if util.IsManagedPod(pod) && pod.Spec.NodeName == "" { podsUsingResourceOnProcessingNodes = append(podsUsingResourceOnProcessingNodes, pod) } } @@ -435,13 +402,13 @@ func getAllPodsRequiringResourceOnProcessingNodes(allPods []*v1.Pod, processingN } func (clusterUtilisationService *ClusterUtilisationService) createReportsOfQueueUsages(pods []*v1.Pod) []*api.QueueReport { - podsByQueue := GroupByQueue(pods) + podsByQueue := util.GroupByQueue(pods) queueReports := make([]*api.QueueReport, 0, len(podsByQueue)) for queueName, queuePods := range podsByQueue { - runningPods := FilterPodsWithPhase(queuePods, v1.PodRunning) + runningPods := util.FilterPodsWithPhase(queuePods, v1.PodRunning) resourceAllocated := armadaresource.CalculateTotalResourceRequest(runningPods) resourceUsed := clusterUtilisationService.getTotalPodUtilisation(queuePods) - phaseSummary := CountPodsByPhase(queuePods) + phaseSummary := util.CountPodsByPhase(queuePods) queueReport := api.QueueReport{ Name: queueName, diff --git a/internal/scheduler/schedulerobjects/resourcelist.go b/internal/scheduler/schedulerobjects/resourcelist.go index eaa2fcf3a9a..6edad9a2d80 100644 --- a/internal/scheduler/schedulerobjects/resourcelist.go +++ b/internal/scheduler/schedulerobjects/resourcelist.go @@ -76,25 +76,13 @@ func (a QuantityByPriorityAndResourceType) SubResourceList(priority int32, rlb R } func (a QuantityByPriorityAndResourceType) Equal(b QuantityByPriorityAndResourceType) bool { - if len(a) != len(b) { - return false - } - if a == nil { - if b == nil { - return true - } else { + for p, rla := range a { + if !rla.Equal(b[p]) { return false } } - if b == nil && a != nil { - return false - } - for p, rla := range a { - if rlb, ok := b[p]; ok { - if !rla.Equal(rlb) { - return false - } - } else { + for p, rlb := range b { + if !rlb.Equal(a[p]) { return false } } @@ -131,6 +119,26 @@ func (a QuantityByPriorityAndResourceType) AggregateByResource() ResourceList { return rv } +// MaxAggregatedByResource updates a in-place such that for each resource type t +// a[p1][t] + ... + a[pn][t] = max(a[p1][t] + ... + a[pn][t], rl[t]), +// where p1, ..., pn are the priorities in a, for each resource set explicitly in rl. +// +// If necessary to add resources to make up the difference, those resources are added at priority p. +func (a QuantityByPriorityAndResourceType) MaxAggregatedByResource(p int32, rl ResourceList) { + aggregate := a.AggregateByResource() + var difference ResourceList + for t, q := range rl.Resources { + q = q.DeepCopy() + q.Sub(aggregate.Get(t)) + if q.Cmp(resource.Quantity{}) == 1 { + difference.AddQuantity(t, q) + } + } + if len(difference.Resources) > 0 { + a.AddResourceList(p, difference) + } +} + func (a *ResourceList) Get(resourceType string) resource.Quantity { if a.Resources == nil { return resource.Quantity{} @@ -149,10 +157,15 @@ func (a *ResourceList) Add(b ResourceList) { } } +func (rl *ResourceList) AddQuantity(resourceType string, quantity resource.Quantity) { + rl.initialise() + q := rl.Resources[resourceType] + q.Add(quantity) + rl.Resources[resourceType] = q +} + func (a *ResourceList) Sub(b ResourceList) { - if a.Resources == nil { - a.Resources = make(map[string]resource.Quantity) - } + a.initialise() for t, qb := range b.Resources { qa := a.Resources[t] qa.Sub(qb) @@ -160,6 +173,13 @@ func (a *ResourceList) Sub(b ResourceList) { } } +func (rl *ResourceList) SubQuantity(resourceType string, quantity resource.Quantity) { + rl.initialise() + q := rl.Resources[resourceType] + q.Sub(quantity) + rl.Resources[resourceType] = q +} + func (rl ResourceList) DeepCopy() ResourceList { if rl.Resources == nil { return ResourceList{} @@ -222,6 +242,12 @@ func (rl ResourceList) CompactString() string { return sb.String() } +func (rl *ResourceList) initialise() { + if rl.Resources == nil { + rl.Resources = make(map[string]resource.Quantity) + } +} + // AllocatableByPriorityAndResourceType accounts for resources that can be allocated to pods of a given priority. // E.g., AllocatableByPriorityAndResourceType[5]["cpu"] is the amount of CPU available to pods with priority 5, // where alloctable resources = unused resources + resources allocated to lower-priority pods. diff --git a/internal/scheduler/schedulerobjects/resourcelist_test.go b/internal/scheduler/schedulerobjects/resourcelist_test.go index ebc4e3aaf8c..d2668525dca 100644 --- a/internal/scheduler/schedulerobjects/resourcelist_test.go +++ b/internal/scheduler/schedulerobjects/resourcelist_test.go @@ -146,6 +146,171 @@ func TestQuantityByPriorityAndResourceTypeSub(t *testing.T) { } } +func TestQuantityByPriorityAndResourceTypeEqual(t *testing.T) { + tests := map[string]struct { + a QuantityByPriorityAndResourceType + b QuantityByPriorityAndResourceType + expected bool + }{ + "both empty": { + a: QuantityByPriorityAndResourceType{}, + b: QuantityByPriorityAndResourceType{}, + expected: true, + }, + "both with an empty map": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{}, + }, + b: QuantityByPriorityAndResourceType{ + 0: ResourceList{}, + }, + expected: true, + }, + "one empty map": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{}, + }, + b: QuantityByPriorityAndResourceType{}, + expected: true, + }, + "zero equals empty": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "foo": resource.MustParse("0"), + }, + }, + }, + b: QuantityByPriorityAndResourceType{ + 0: ResourceList{}, + }, + expected: true, + }, + "zero equals missing": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{}, + }, + b: QuantityByPriorityAndResourceType{}, + expected: true, + }, + "zero equals missing with empty ResourceList": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "foo": resource.MustParse("0"), + }, + }, + }, + b: QuantityByPriorityAndResourceType{}, + expected: true, + }, + "simple equal": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("3"), + }, + }, + }, + b: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("3"), + }, + }, + }, + expected: true, + }, + "equal with two priorities": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("3"), + }, + }, + 1: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("4"), + "memory": resource.MustParse("5"), + "foo": resource.MustParse("6"), + }, + }, + }, + b: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("3"), + }, + }, + 1: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("4"), + "memory": resource.MustParse("5"), + "foo": resource.MustParse("6"), + }, + }, + }, + expected: true, + }, + "simple unequal": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("3"), + }, + }, + }, + b: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("4"), + }, + }, + }, + expected: false, + }, + "unequal differing priority": { + a: QuantityByPriorityAndResourceType{ + 0: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("3"), + }, + }, + }, + b: QuantityByPriorityAndResourceType{ + 1: ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("2"), + "foo": resource.MustParse("3"), + }, + }, + }, + expected: false, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.expected, tc.a.Equal(tc.b)) + assert.Equal(t, tc.expected, tc.b.Equal(tc.a)) + }) + } +} + func TestQuantityByPriorityAndResourceTypeIsStrictlyNonNegative(t *testing.T) { tests := map[string]struct { m QuantityByPriorityAndResourceType @@ -186,6 +351,93 @@ func TestQuantityByPriorityAndResourceTypeIsStrictlyNonNegative(t *testing.T) { } } +func TestQuantityByPriorityAndResourceTypeMaxAggregatedByResource(t *testing.T) { + tests := map[string]struct { + q QuantityByPriorityAndResourceType + p int32 + rl ResourceList + expected QuantityByPriorityAndResourceType + }{ + "no change": { + q: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + p: 1, + rl: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + expected: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + }, + "empty": { + q: QuantityByPriorityAndResourceType{}, + p: 0, + rl: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + expected: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + }, + "add same resource at same priority": { + q: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + p: 0, + rl: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("2")}}, + expected: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("2")}}, + }, + }, + "add different resource at same priority": { + q: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + p: 0, + rl: ResourceList{Resources: map[string]resource.Quantity{"memory": resource.MustParse("1Gi")}}, + expected: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1"), "memory": resource.MustParse("1Gi")}}, + }, + }, + "add same resource at different priority": { + q: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + p: 1, + rl: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("2")}}, + expected: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + 1: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + }, + "add different resource at different priority": { + q: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + }, + p: 1, + rl: ResourceList{Resources: map[string]resource.Quantity{"memory": resource.MustParse("1Gi")}}, + expected: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, + 1: ResourceList{Resources: map[string]resource.Quantity{"memory": resource.MustParse("1Gi")}}, + }, + }, + "multiple resources": { + q: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("100m"), "memory": resource.MustParse("50Mi")}}, + }, + p: 1, + rl: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("10"), "memory": resource.MustParse("4000Mi")}}, + expected: QuantityByPriorityAndResourceType{ + 0: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("100m"), "memory": resource.MustParse("50Mi")}}, + 1: ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("9900m"), "memory": resource.MustParse("3950Mi")}}, + }, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + tc.q.MaxAggregatedByResource(tc.p, tc.rl) + assert.True(t, tc.expected.Equal(tc.q), "expected %s, but got %s", tc.expected.String(), tc.q.String()) + }) + } +} + func TestAllocatableByPriorityAndResourceType(t *testing.T) { tests := map[string]struct { Priorities []int32