From 3b9ac8a268519728a6dbb4d044efe41b172855f0 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 31 Jan 2023 11:15:54 +0000 Subject: [PATCH] Firs Round Of Fixes From E2E Scheduling Testing (#2072) * initial commit * wip * linting * fix tests * linting --------- Co-authored-by: Chris Martin --- cmd/scheduler/cmd/root.go | 7 ++- config/scheduler/config.yaml | 6 +++ config/scheduleringester/config.yaml | 7 +++ internal/armada/server.go | 5 +- internal/binoculars/server.go | 8 ++- internal/common/auth/setup.go | 15 +++--- internal/common/grpc/grpc.go | 16 ++++++ internal/scheduler/api.go | 11 +++- internal/scheduler/api_test.go | 29 +++++++---- internal/scheduler/jobdb.go | 10 ---- internal/scheduler/jobdb_test.go | 2 - internal/scheduler/publisher_test.go | 2 - internal/scheduler/scheduler.go | 29 +++++------ internal/scheduler/schedulerapp.go | 51 +++++++++++++++---- internal/scheduler/scheduling_algo.go | 8 +-- internal/scheduler/scheduling_algo_test.go | 6 ++- internal/scheduleringester/ingester.go | 19 +++---- .../scheduleringester/schedulerdb_test.go | 1 - 18 files changed, 153 insertions(+), 79 deletions(-) diff --git a/cmd/scheduler/cmd/root.go b/cmd/scheduler/cmd/root.go index 990e7d7269e..fc799d32cca 100644 --- a/cmd/scheduler/cmd/root.go +++ b/cmd/scheduler/cmd/root.go @@ -21,10 +21,13 @@ func RootCmd() *cobra.Command { } cmd.PersistentFlags().StringSlice( - "armadaUrl", + CustomConfigLocation, []string{}, "Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)") - + err := viper.BindPFlag(CustomConfigLocation, cmd.PersistentFlags().Lookup(CustomConfigLocation)) + if err != nil { + panic(err) + } cmd.AddCommand( runCmd(), migrateDbCmd(), diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 3288ebde173..d76d3223718 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -9,6 +9,12 @@ pulsar: maxConnectionsPerBroker: 1 compressionType: zlib compressionLevel: faster +redis: + addrs: + - redis:6379 + password: "" + db: 0 + poolSize: 1000 postgres: connection: host: postgres diff --git a/config/scheduleringester/config.yaml b/config/scheduleringester/config.yaml index e0c0fd48768..8c5ab1f9fb6 100644 --- a/config/scheduleringester/config.yaml +++ b/config/scheduleringester/config.yaml @@ -22,3 +22,10 @@ pulsar: subscriptionName: "scheduler-ingester" batchSize: 10000 batchDuration: 500ms +priorityClasses: + armada-default: + priority: 1000 + armada-preemptible: + priority: 900 + + diff --git a/internal/armada/server.go b/internal/armada/server.go index f5e8a294d66..de923919d0b 100644 --- a/internal/armada/server.go +++ b/internal/armada/server.go @@ -76,7 +76,10 @@ func Serve(ctx context.Context, config *configuration.ArmadaConfig, healthChecks // We support multiple simultaneous authentication services (e.g., username/password OpenId). // For each gRPC request, we try them all until one succeeds, at which point the process is // short-circuited. - authServices := auth.ConfigureAuth(config.Auth) + authServices, err := auth.ConfigureAuth(config.Auth) + if err != nil { + return err + } grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices) // Shut down grpcServer if the context is cancelled. diff --git a/internal/binoculars/server.go b/internal/binoculars/server.go index e8caca24c5d..9025d20053f 100644 --- a/internal/binoculars/server.go +++ b/internal/binoculars/server.go @@ -30,7 +30,13 @@ func StartUp(config *configuration.BinocularsConfig) (func(), *sync.WaitGroup) { os.Exit(-1) } - grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, auth.ConfigureAuth(config.Auth)) + authServices, err := auth.ConfigureAuth(config.Auth) + if err != nil { + log.Errorf("Failed to create auth services %s", err) + os.Exit(-1) + } + + grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices) logService := logs.NewKubernetesLogService(kubernetesClientProvider) binocularsServer := server.NewBinocularsServer(logService) diff --git a/internal/common/auth/setup.go b/internal/common/auth/setup.go index a46db9d73dc..c99f7e6e698 100644 --- a/internal/common/auth/setup.go +++ b/internal/common/auth/setup.go @@ -2,15 +2,16 @@ package auth import ( "context" - "errors" + + "github.com/pkg/errors" "github.com/armadaproject/armada/internal/common/auth/authorization" "github.com/armadaproject/armada/internal/common/auth/authorization/groups" "github.com/armadaproject/armada/internal/common/auth/configuration" ) -func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService { - authServices := []authorization.AuthService{} +func ConfigureAuth(config configuration.AuthConfig) ([]authorization.AuthService, error) { + var authServices []authorization.AuthService if len(config.BasicAuth.Users) > 0 { authServices = append(authServices, @@ -25,7 +26,7 @@ func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService if config.OpenIdAuth.ProviderUrl != "" { openIdAuthService, err := authorization.NewOpenIdAuthServiceForProvider(context.Background(), &config.OpenIdAuth) if err != nil { - panic(err) + return nil, errors.WithMessage(err, "error initialising openId auth") } authServices = append(authServices, openIdAuthService) } @@ -43,14 +44,14 @@ func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService kerberosAuthService, err := authorization.NewKerberosAuthService(&config.Kerberos, groupLookup) if err != nil { - panic(err) + return nil, errors.WithMessage(err, "error initialising kerberos auth") } authServices = append(authServices, kerberosAuthService) } if len(authServices) == 0 { - panic(errors.New("At least one auth method must be specified in config")) + return nil, errors.New("at least one auth method must be specified in config") } - return authServices + return authServices, nil } diff --git a/internal/common/grpc/grpc.go b/internal/common/grpc/grpc.go index aa042f9f124..89ed83e398f 100644 --- a/internal/common/grpc/grpc.go +++ b/internal/common/grpc/grpc.go @@ -1,10 +1,12 @@ package grpc import ( + "context" "fmt" "net" "runtime/debug" "sync" + "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" @@ -105,6 +107,20 @@ func Listen(port uint16, grpcServer *grpc.Server, wg *sync.WaitGroup) { }() } +// CreateShutdownHandler returns a function that shuts down the grpcServer when the context is closed. +// The server is given gracePeriod to perform a graceful showdown and is then forcably stopped if necessary +func CreateShutdownHandler(ctx context.Context, gracePeriod time.Duration, grpcServer *grpc.Server) func() error { + return func() error { + <-ctx.Done() + go func() { + time.Sleep(gracePeriod) + grpcServer.Stop() + }() + grpcServer.GracefulStop() + return nil + } +} + // This function is called whenever a gRPC handler panics. func panicRecoveryHandler(p interface{}) (err error) { log.Errorf("Request triggered panic with cause %v \n%s", p, string(debug.Stack())) diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index 91580ece98d..e0a28e31b55 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -39,7 +39,14 @@ func NewExecutorApi(producer pulsar.Producer, executorRepository database.ExecutorRepository, allowedPriorities []int32, maxJobsPerCall uint, -) *ExecutorApi { +) (*ExecutorApi, error) { + if len(allowedPriorities) == 0 { + return nil, errors.New("allowedPriorities cannot be empty") + } + if maxJobsPerCall == 0 { + return nil, errors.New("maxJobsPerCall cannot be 0") + } + return &ExecutorApi{ producer: producer, jobRepository: jobRepository, @@ -48,7 +55,7 @@ func NewExecutorApi(producer pulsar.Producer, maxJobsPerCall: maxJobsPerCall, maxPulsarMessageSize: 1024 * 1024 * 2, clock: clock.RealClock{}, - } + }, nil } // LeaseJobRuns performs the following actions: diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index 11002032e05..a360a80b5aa 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/clock" "github.com/armadaproject/armada/internal/common/compress" @@ -47,11 +48,18 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { Pool: "test-pool", Nodes: []*schedulerobjects.Node{ { - Id: "test-executor-test-node", - TotalResources: schedulerobjects.ResourceList{}, - JobRuns: []string{runId1.String(), runId2.String()}, - AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{}, - LastSeen: testClock.Now().UTC(), + Id: "test-executor-test-node", + TotalResources: schedulerobjects.ResourceList{}, + JobRuns: []string{runId1.String(), runId2.String()}, + AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{ + 1000: { + Resources: map[string]resource.Quantity{}, + }, + 2000: { + Resources: map[string]resource.Quantity{}, + }, + }, + LastSeen: testClock.Now().UTC(), }, }, MinimumJobSize: schedulerobjects.ResourceList{}, @@ -140,13 +148,14 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { capturedEvents = append(capturedEvents, msg) }).AnyTimes() - server := NewExecutorApi( + server, err := NewExecutorApi( mockPulsarProducer, mockJobRepository, mockExecutorRepository, - []int32{}, + []int32{1000, 2000}, maxJobsPerCall, ) + require.NoError(t, err) server.clock = testClock err = server.LeaseJobRuns(mockStream) @@ -209,14 +218,16 @@ func TestExecutorApi_Publish(t *testing.T) { callback(pulsarutils.NewMessageId(1), msg, nil) }).AnyTimes() - server := NewExecutorApi( + server, err := NewExecutorApi( mockPulsarProducer, mockJobRepository, mockExecutorRepository, - []int32{}, + []int32{1000, 2000}, 100, ) + require.NoError(t, err) + empty, err := server.ReportEvents(ctx, &executorapi.EventList{Events: tc.sequences}) require.NoError(t, err) assert.NotNil(t, empty) diff --git a/internal/scheduler/jobdb.go b/internal/scheduler/jobdb.go index 15e16d72165..dd4b9f570a3 100644 --- a/internal/scheduler/jobdb.go +++ b/internal/scheduler/jobdb.go @@ -53,12 +53,6 @@ type SchedulerJob struct { // Jobs with identical Queue and Priority // are sorted by timestamp. Timestamp int64 - // Name of the executor to which this job has been assigned. - // Empty if this job has not yet been assigned. - Executor string - // Name of the node to which this job has been assigned. - // Empty if this job has not yet been assigned. - Node string // True if the job is currently queued. // If this is set then the job will not be considered for scheduling Queued bool @@ -155,7 +149,6 @@ func (job *SchedulerJob) DeepCopy() *SchedulerJob { Jobset: job.Jobset, Priority: job.Priority, Timestamp: job.Timestamp, - Node: job.Node, Queued: job.Queued, jobSchedulingInfo: proto.Clone(job.jobSchedulingInfo).(*schedulerobjects.JobSchedulingInfo), CancelRequested: job.CancelRequested, @@ -172,8 +165,6 @@ type JobRun struct { RunID uuid.UUID // The name of the executor this run has been leased to Executor string - // True if the job has been reported as pending by the executor - Pending bool // True if the job has been reported as running by the executor Running bool // True if the job has been reported as succeeded by the executor @@ -199,7 +190,6 @@ func (run *JobRun) DeepCopy() *JobRun { return &JobRun{ RunID: run.RunID, Executor: run.Executor, - Pending: run.Pending, Running: run.Running, Succeeded: run.Succeeded, Failed: run.Failed, diff --git a/internal/scheduler/jobdb_test.go b/internal/scheduler/jobdb_test.go index 73939fff765..486941ab652 100644 --- a/internal/scheduler/jobdb_test.go +++ b/internal/scheduler/jobdb_test.go @@ -22,7 +22,6 @@ var job1 = &SchedulerJob{ Queue: "A", Priority: 0, Timestamp: 10, - Node: "", jobSchedulingInfo: nil, } @@ -31,7 +30,6 @@ var job2 = &SchedulerJob{ Queue: "A", Priority: 0, Timestamp: 10, - Node: "", jobSchedulingInfo: nil, } diff --git a/internal/scheduler/publisher_test.go b/internal/scheduler/publisher_test.go index f588efec52f..200d5581d1a 100644 --- a/internal/scheduler/publisher_test.go +++ b/internal/scheduler/publisher_test.go @@ -12,7 +12,6 @@ import ( "github.com/golang/mock/gomock" "github.com/google/uuid" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -185,7 +184,6 @@ func TestPulsarPublisher_TestPublishMarkers(t *testing.T) { capturedPartitions[key] = true } if numPublished > tc.numSuccessfulPublishes { - log.Info("returning error") return pulsarutils.NewMessageId(numPublished), errors.New("error from mock pulsar producer") } return pulsarutils.NewMessageId(numPublished), nil diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index bdbb459e6e1..5b419a3167f 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -263,21 +263,24 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*SchedulerJob, error) { if err != nil { return nil, errors.Wrapf(err, "error retrieving job %s from jobDb ", jobId) } + + // If the job is nil at this point then it cannot be active. + // In this case we can ignore the run + if job == nil { + log.Debugf("Job %s is not an active job. Ignoring update for run %s", jobId, dbRun.RunID) + continue + } + job = job.DeepCopy() jobsToUpdateById[jobId] = job } - // If the job is nil at this point then it cannot be active. - // In this case we can ignore the run - if job == nil { - log.Debugf("Job %s is not an active job. Ignoring update for run %s", jobId, dbRun.RunID) - continue - } - returnProcessed := false run := job.RunById(dbRun.RunID) if run == nil { run = s.createSchedulerRun(&dbRun) + // TODO: we need to ensure that runs end up in the correct order here + // This will need us to store an order id in the db job.Runs = append(job.Runs, run) } else { returnProcessed = run.Returned @@ -290,8 +293,6 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*SchedulerJob, error) { // do the same, but eventually we should send an actual queued message and this bit of code can disappear if !returnProcessed && run.Returned && job.NumReturned() <= s.maxLeaseReturns { job.Queued = true - job.Node = "" - job.Executor = "" run.Failed = false // unset failed here so that we don't generate a job failed message later } } @@ -299,11 +300,7 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*SchedulerJob, error) { // any jobs that have don't have active run need to be marked as queued for _, job := range jobsToUpdateById { run := job.CurrentRun() - if run == nil || run.InTerminalState() { - job.Queued = true - job.Node = "" - job.Executor = "" - } + job.Queued = run == nil || run.InTerminalState() } jobsToUpdate := maps.Values(jobsToUpdateById) @@ -343,7 +340,7 @@ func (s *Scheduler) generateLeaseMessages(scheduledJobs []*SchedulerJob) ([]*arm JobRunLeased: &armadaevents.JobRunLeased{ RunId: armadaevents.ProtoUuidFromUuid(job.CurrentRun().RunID), JobId: jobId, - ExecutorId: job.Executor, + ExecutorId: job.CurrentRun().Executor, }, }, }, @@ -608,7 +605,7 @@ func (s *Scheduler) ensureDbUpToDate(ctx context.Context, pollInterval time.Dura log.Infof("Successfully ensured that database state is up to date") return nil } - log.Infof("Recevied %d partitions, still waiting on %d", numSent, numSent-numReceived) + log.Infof("Recevied %d partitions, still waiting on %d", numReceived, numSent-numReceived) s.clock.Sleep(pollInterval) } } diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 814fa449e2f..ead41cabf14 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -4,9 +4,7 @@ import ( "fmt" "net" "strings" - - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" + "time" "github.com/apache/pulsar-client-go/pulsar" "github.com/go-redis/redis" @@ -14,7 +12,10 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/app" "github.com/armadaproject/armada/internal/common/auth" dbcommon "github.com/armadaproject/armada/internal/common/database" @@ -29,11 +30,17 @@ import ( func Run(config Configuration) error { g, ctx := errgroup.WithContext(app.CreateContextWithShutdown()) + // List of services to run concurrently. + // Because we want to start services only once all input validation has been completed, + // we add all services to a slice and start them together at the end of this function. + var services []func() error + ////////////////////////////////////////////////////////////////////////// // Database setup (postgres and redis) ////////////////////////////////////////////////////////////////////////// log.Infof("Setting up database connections") db, err := dbcommon.OpenPgxPool(config.Postgres) + defer db.Close() if err != nil { return errors.WithMessage(err, "Error opening connection to postgres") } @@ -76,7 +83,7 @@ func Run(config Configuration) error { if err != nil { return errors.WithMessage(err, "error creating leader controller") } - g.Go(func() error { return leaderController.Run(ctx) }) + services = append(services, func() error { return leaderController.Run(ctx) }) ////////////////////////////////////////////////////////////////////////// // Executor Api @@ -92,22 +99,33 @@ func Run(config Configuration) error { if err != nil { return errors.Wrapf(err, "error creating pulsar producer for executor api") } - authServices := auth.ConfigureAuth(config.Auth) + defer apiProducer.Close() + authServices, err := auth.ConfigureAuth(config.Auth) + if err != nil { + return errors.WithMessage(err, "error creating auth services") + } grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices) defer grpcServer.GracefulStop() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Grpc.Port)) if err != nil { return errors.WithMessage(err, "error setting up grpc server") } - executorServer := NewExecutorApi(apiProducer, jobRepository, executorRepository, []int32{}, config.Scheduling.MaximumJobsToSchedule) + allowedPcs := allowedPrioritiesFromPriorityClasses(config.Scheduling.Preemption.PriorityClasses) + executorServer, err := NewExecutorApi(apiProducer, jobRepository, executorRepository, allowedPcs, config.Scheduling.MaximumJobsToSchedule) + if err != nil { + return errors.WithMessage(err, "error creating executorApi") + } executorapi.RegisterExecutorApiServer(grpcServer, executorServer) - g.Go(func() error { return grpcServer.Serve(lis) }) - log.Infof("Executor api listening on %s", lis.Addr()) + services = append(services, func() error { + log.Infof("Executor api listening on %s", lis.Addr()) + return grpcServer.Serve(lis) + }) + services = append(services, grpcCommon.CreateShutdownHandler(ctx, 5*time.Second, grpcServer)) ////////////////////////////////////////////////////////////////////////// // Scheduling ////////////////////////////////////////////////////////////////////////// - log.Infof("Starting up scheduling loop") + log.Infof("setting up scheduling loop") stringInterner, err := util.NewStringInterner(config.InternedStringsCacheSize) if err != nil { return errors.WithMessage(err, "error creating string interner") @@ -125,7 +143,12 @@ func Run(config Configuration) error { if err != nil { return errors.WithMessage(err, "error creating scheduler") } - g.Go(func() error { return scheduler.Run(ctx) }) + services = append(services, func() error { return scheduler.Run(ctx) }) + + // start all services + for _, service := range services { + g.Go(service) + } return g.Wait() } @@ -150,3 +173,11 @@ func createLeaderController(config LeaderConfig) (LeaderController, error) { return nil, errors.Errorf("%s is not a value leader mode", config.Mode) } } + +func allowedPrioritiesFromPriorityClasses(pcs map[string]configuration.PriorityClass) []int32 { + allowedPcs := make([]int32, 0, len(pcs)) + for _, v := range pcs { + allowedPcs = append(allowedPcs, v.Priority) + } + return allowedPcs +} diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 93ecc7718c7..985de10da7c 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -6,6 +6,7 @@ import ( "math/rand" "time" + "github.com/google/uuid" "github.com/hashicorp/go-memdb" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -202,10 +203,11 @@ func (l *LegacySchedulingAlgo) scheduleOnExecutor( for i, report := range legacyScheduler.SchedulingRoundReport.SuccessfulJobSchedulingReports() { jobCopy := report.Job.(*SchedulerJob).DeepCopy() jobCopy.Queued = false - jobCopy.Executor = executor.Id - if len(report.PodSchedulingReports) > 0 { - jobCopy.Node = report.PodSchedulingReports[0].Node.GetId() + run := JobRun{ + RunID: uuid.New(), + Executor: executor.Id, } + jobCopy.Runs = append(jobCopy.Runs, &run) updatedJobs[i] = jobCopy } return updatedJobs, nil diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index c7fc11efc7f..c657a7bea90 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -164,8 +164,10 @@ func TestLegacySchedulingAlgo_TestSchedule(t *testing.T) { for _, job := range scheduledJobs { expectedExecutor, ok := tc.expectedJobs[job.JobId] - assert.True(t, ok) - assert.Equal(t, expectedExecutor, job.Executor) + require.True(t, ok) + run := job.CurrentRun() + require.NotEqual(t, t, run) + assert.Equal(t, expectedExecutor, run.Executor) assert.Equal(t, false, job.Queued) } diff --git a/internal/scheduleringester/ingester.go b/internal/scheduleringester/ingester.go index a4eb5b262ef..d7b1bc40082 100644 --- a/internal/scheduleringester/ingester.go +++ b/internal/scheduleringester/ingester.go @@ -3,31 +3,28 @@ package scheduleringester import ( "time" - "github.com/armadaproject/armada/internal/common/compress" - - "github.com/armadaproject/armada/internal/common/database" - - "github.com/armadaproject/armada/internal/common/ingest/metrics" - "github.com/armadaproject/armada/pkg/armadaevents" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/armadaproject/armada/internal/common/app" + "github.com/armadaproject/armada/internal/common/compress" + "github.com/armadaproject/armada/internal/common/database" "github.com/armadaproject/armada/internal/common/ingest" + "github.com/armadaproject/armada/internal/common/ingest/metrics" + "github.com/armadaproject/armada/pkg/armadaevents" ) // Run will create a pipeline that will take Armada event messages from Pulsar and update the // Scheduler database accordingly. This pipeline will run until a SIGTERM is received func Run(config Configuration) { - metrics := metrics.NewMetrics(metrics.ArmadaEventIngesterMetricsPrefix + "armada_scheduler_ingester_") + svcMetrics := metrics.NewMetrics(metrics.ArmadaEventIngesterMetricsPrefix + "armada_scheduler_ingester_") log.Infof("Opening connection pool to postgres") db, err := database.OpenPgxPool(config.Postgres) if err != nil { panic(errors.WithMessage(err, "Error opening connection to postgres")) } - schedulerDb := NewSchedulerDb(db, metrics, 100*time.Millisecond, 60*time.Second) + schedulerDb := NewSchedulerDb(db, svcMetrics, 100*time.Millisecond, 60*time.Second) // Discard submit job messages not intended for this scheduler. submitJobFilter := func(event *armadaevents.EventSequence_Event) bool { @@ -42,7 +39,7 @@ func Run(config Configuration) { if err != nil { panic(errors.WithMessage(err, "Error creating compressor")) } - converter := NewInstructionConverter(metrics, submitJobFilter, config.PriorityClasses, compressor) + converter := NewInstructionConverter(svcMetrics, submitJobFilter, config.PriorityClasses, compressor) ingester := ingest.NewIngestionPipeline( config.Pulsar, @@ -52,7 +49,7 @@ func Run(config Configuration) { converter, schedulerDb, config.Metrics, - metrics) + svcMetrics) err = ingester.Run(app.CreateContextWithShutdown()) if err != nil { diff --git a/internal/scheduleringester/schedulerdb_test.go b/internal/scheduleringester/schedulerdb_test.go index dc2682f794b..98e6ab53eda 100644 --- a/internal/scheduleringester/schedulerdb_test.go +++ b/internal/scheduleringester/schedulerdb_test.go @@ -272,7 +272,6 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] v.LastModified = job.LastModified } } - // assert.Equal(t, expected, actual) for k, v := range expected { assert.Equal(t, v, actual[k]) }