Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Firs Round Of Fixes From E2E Scheduling Testing #2072

Merged
merged 6 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/scheduler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ func RootCmd() *cobra.Command {
}

cmd.PersistentFlags().StringSlice(
"armadaUrl",
CustomConfigLocation,
[]string{},
"Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)")

err := viper.BindPFlag(CustomConfigLocation, cmd.PersistentFlags().Lookup(CustomConfigLocation))
if err != nil {
panic(err)
}
cmd.AddCommand(
runCmd(),
migrateDbCmd(),
Expand Down
6 changes: 6 additions & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ pulsar:
maxConnectionsPerBroker: 1
compressionType: zlib
compressionLevel: faster
redis:
addrs:
- redis:6379
password: ""
db: 0
poolSize: 1000
postgres:
connection:
host: postgres
Expand Down
7 changes: 7 additions & 0 deletions config/scheduleringester/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ pulsar:
subscriptionName: "scheduler-ingester"
batchSize: 10000
batchDuration: 500ms
priorityClasses:
armada-default:
priority: 1000
armada-preemptible:
priority: 900


5 changes: 4 additions & 1 deletion internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func Serve(ctx context.Context, config *configuration.ArmadaConfig, healthChecks
// We support multiple simultaneous authentication services (e.g., username/password OpenId).
// For each gRPC request, we try them all until one succeeds, at which point the process is
// short-circuited.
authServices := auth.ConfigureAuth(config.Auth)
authServices, err := auth.ConfigureAuth(config.Auth)
if err != nil {
return err
}
grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices)

// Shut down grpcServer if the context is cancelled.
Expand Down
8 changes: 7 additions & 1 deletion internal/binoculars/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ func StartUp(config *configuration.BinocularsConfig) (func(), *sync.WaitGroup) {
os.Exit(-1)
}

grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, auth.ConfigureAuth(config.Auth))
authServices, err := auth.ConfigureAuth(config.Auth)
if err != nil {
log.Errorf("Failed to create auth services %s", err)
os.Exit(-1)
}

grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices)

logService := logs.NewKubernetesLogService(kubernetesClientProvider)
binocularsServer := server.NewBinocularsServer(logService)
Expand Down
15 changes: 8 additions & 7 deletions internal/common/auth/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package auth

import (
"context"
"errors"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/auth/authorization"
"github.com/armadaproject/armada/internal/common/auth/authorization/groups"
"github.com/armadaproject/armada/internal/common/auth/configuration"
)

func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService {
authServices := []authorization.AuthService{}
func ConfigureAuth(config configuration.AuthConfig) ([]authorization.AuthService, error) {
var authServices []authorization.AuthService

if len(config.BasicAuth.Users) > 0 {
authServices = append(authServices,
Expand All @@ -25,7 +26,7 @@ func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService
if config.OpenIdAuth.ProviderUrl != "" {
openIdAuthService, err := authorization.NewOpenIdAuthServiceForProvider(context.Background(), &config.OpenIdAuth)
if err != nil {
panic(err)
return nil, errors.WithMessage(err, "error initialising openId auth")
}
authServices = append(authServices, openIdAuthService)
}
Expand All @@ -43,14 +44,14 @@ func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService

kerberosAuthService, err := authorization.NewKerberosAuthService(&config.Kerberos, groupLookup)
if err != nil {
panic(err)
return nil, errors.WithMessage(err, "error initialising kerberos auth")
}
authServices = append(authServices, kerberosAuthService)
}

if len(authServices) == 0 {
panic(errors.New("At least one auth method must be specified in config"))
return nil, errors.New("at least one auth method must be specified in config")
}

return authServices
return authServices, nil
}
16 changes: 16 additions & 0 deletions internal/common/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package grpc

import (
"context"
"fmt"
"net"
"runtime/debug"
"sync"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
Expand Down Expand Up @@ -105,6 +107,20 @@ func Listen(port uint16, grpcServer *grpc.Server, wg *sync.WaitGroup) {
}()
}

// CreateShutdownHandler returns a function that shuts down the grpcServer when the context is closed.
// The server is given gracePeriod to perform a graceful showdown and is then forcably stopped if necessary
func CreateShutdownHandler(ctx context.Context, gracePeriod time.Duration, grpcServer *grpc.Server) func() error {
return func() error {
<-ctx.Done()
go func() {
time.Sleep(gracePeriod)
grpcServer.Stop()
}()
grpcServer.GracefulStop()
return nil
}
}

// This function is called whenever a gRPC handler panics.
func panicRecoveryHandler(p interface{}) (err error) {
log.Errorf("Request triggered panic with cause %v \n%s", p, string(debug.Stack()))
Expand Down
11 changes: 9 additions & 2 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ func NewExecutorApi(producer pulsar.Producer,
executorRepository database.ExecutorRepository,
allowedPriorities []int32,
maxJobsPerCall uint,
) *ExecutorApi {
) (*ExecutorApi, error) {
if len(allowedPriorities) == 0 {
return nil, errors.New("allowedPriorities cannot be empty")
}
if maxJobsPerCall == 0 {
return nil, errors.New("maxJobsPerCall cannot be 0")
}

return &ExecutorApi{
producer: producer,
jobRepository: jobRepository,
Expand All @@ -48,7 +55,7 @@ func NewExecutorApi(producer pulsar.Producer,
maxJobsPerCall: maxJobsPerCall,
maxPulsarMessageSize: 1024 * 1024 * 2,
clock: clock.RealClock{},
}
}, nil
}

// LeaseJobRuns performs the following actions:
Expand Down
29 changes: 20 additions & 9 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/compress"
Expand Down Expand Up @@ -47,11 +48,18 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
Pool: "test-pool",
Nodes: []*schedulerobjects.Node{
{
Id: "test-executor-test-node",
TotalResources: schedulerobjects.ResourceList{},
JobRuns: []string{runId1.String(), runId2.String()},
AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{},
LastSeen: testClock.Now().UTC(),
Id: "test-executor-test-node",
TotalResources: schedulerobjects.ResourceList{},
JobRuns: []string{runId1.String(), runId2.String()},
AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{
1000: {
Resources: map[string]resource.Quantity{},
},
2000: {
Resources: map[string]resource.Quantity{},
},
},
LastSeen: testClock.Now().UTC(),
},
},
MinimumJobSize: schedulerobjects.ResourceList{},
Expand Down Expand Up @@ -140,13 +148,14 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
capturedEvents = append(capturedEvents, msg)
}).AnyTimes()

server := NewExecutorApi(
server, err := NewExecutorApi(
mockPulsarProducer,
mockJobRepository,
mockExecutorRepository,
[]int32{},
[]int32{1000, 2000},
maxJobsPerCall,
)
require.NoError(t, err)
server.clock = testClock

err = server.LeaseJobRuns(mockStream)
Expand Down Expand Up @@ -209,14 +218,16 @@ func TestExecutorApi_Publish(t *testing.T) {
callback(pulsarutils.NewMessageId(1), msg, nil)
}).AnyTimes()

server := NewExecutorApi(
server, err := NewExecutorApi(
mockPulsarProducer,
mockJobRepository,
mockExecutorRepository,
[]int32{},
[]int32{1000, 2000},
100,
)

require.NoError(t, err)

empty, err := server.ReportEvents(ctx, &executorapi.EventList{Events: tc.sequences})
require.NoError(t, err)
assert.NotNil(t, empty)
Expand Down
10 changes: 0 additions & 10 deletions internal/scheduler/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ type SchedulerJob struct {
// Jobs with identical Queue and Priority
// are sorted by timestamp.
Timestamp int64
// Name of the executor to which this job has been assigned.
// Empty if this job has not yet been assigned.
Executor string
// Name of the node to which this job has been assigned.
// Empty if this job has not yet been assigned.
Node string
// True if the job is currently queued.
// If this is set then the job will not be considered for scheduling
Queued bool
Expand Down Expand Up @@ -155,7 +149,6 @@ func (job *SchedulerJob) DeepCopy() *SchedulerJob {
Jobset: job.Jobset,
Priority: job.Priority,
Timestamp: job.Timestamp,
Node: job.Node,
Queued: job.Queued,
jobSchedulingInfo: proto.Clone(job.jobSchedulingInfo).(*schedulerobjects.JobSchedulingInfo),
CancelRequested: job.CancelRequested,
Expand All @@ -172,8 +165,6 @@ type JobRun struct {
RunID uuid.UUID
// The name of the executor this run has been leased to
Executor string
// True if the job has been reported as pending by the executor
Pending bool
// True if the job has been reported as running by the executor
Running bool
// True if the job has been reported as succeeded by the executor
Expand All @@ -199,7 +190,6 @@ func (run *JobRun) DeepCopy() *JobRun {
return &JobRun{
RunID: run.RunID,
Executor: run.Executor,
Pending: run.Pending,
Running: run.Running,
Succeeded: run.Succeeded,
Failed: run.Failed,
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/jobdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ var job1 = &SchedulerJob{
Queue: "A",
Priority: 0,
Timestamp: 10,
Node: "",
jobSchedulingInfo: nil,
}

Expand All @@ -31,7 +30,6 @@ var job2 = &SchedulerJob{
Queue: "A",
Priority: 0,
Timestamp: 10,
Node: "",
jobSchedulingInfo: nil,
}

Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -185,7 +184,6 @@ func TestPulsarPublisher_TestPublishMarkers(t *testing.T) {
capturedPartitions[key] = true
}
if numPublished > tc.numSuccessfulPublishes {
log.Info("returning error")
return pulsarutils.NewMessageId(numPublished), errors.New("error from mock pulsar producer")
}
return pulsarutils.NewMessageId(numPublished), nil
Expand Down
29 changes: 13 additions & 16 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,24 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*SchedulerJob, error) {
if err != nil {
return nil, errors.Wrapf(err, "error retrieving job %s from jobDb ", jobId)
}

// If the job is nil at this point then it cannot be active.
// In this case we can ignore the run
if job == nil {
log.Debugf("Job %s is not an active job. Ignoring update for run %s", jobId, dbRun.RunID)
continue
}

job = job.DeepCopy()
jobsToUpdateById[jobId] = job
}

// If the job is nil at this point then it cannot be active.
// In this case we can ignore the run
if job == nil {
log.Debugf("Job %s is not an active job. Ignoring update for run %s", jobId, dbRun.RunID)
continue
}

returnProcessed := false
run := job.RunById(dbRun.RunID)
if run == nil {
run = s.createSchedulerRun(&dbRun)
// TODO: we need to ensure that runs end up in the correct order here
// This will need us to store an order id in the db
job.Runs = append(job.Runs, run)
} else {
returnProcessed = run.Returned
Expand All @@ -290,20 +293,14 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*SchedulerJob, error) {
// do the same, but eventually we should send an actual queued message and this bit of code can disappear
if !returnProcessed && run.Returned && job.NumReturned() <= s.maxLeaseReturns {
job.Queued = true
job.Node = ""
job.Executor = ""
run.Failed = false // unset failed here so that we don't generate a job failed message later
}
}

// any jobs that have don't have active run need to be marked as queued
for _, job := range jobsToUpdateById {
run := job.CurrentRun()
if run == nil || run.InTerminalState() {
job.Queued = true
job.Node = ""
job.Executor = ""
}
job.Queued = run == nil || run.InTerminalState()
}

jobsToUpdate := maps.Values(jobsToUpdateById)
Expand Down Expand Up @@ -343,7 +340,7 @@ func (s *Scheduler) generateLeaseMessages(scheduledJobs []*SchedulerJob) ([]*arm
JobRunLeased: &armadaevents.JobRunLeased{
RunId: armadaevents.ProtoUuidFromUuid(job.CurrentRun().RunID),
JobId: jobId,
ExecutorId: job.Executor,
ExecutorId: job.CurrentRun().Executor,
},
},
},
Expand Down Expand Up @@ -608,7 +605,7 @@ func (s *Scheduler) ensureDbUpToDate(ctx context.Context, pollInterval time.Dura
log.Infof("Successfully ensured that database state is up to date")
return nil
}
log.Infof("Recevied %d partitions, still waiting on %d", numSent, numSent-numReceived)
log.Infof("Recevied %d partitions, still waiting on %d", numReceived, numSent-numReceived)
s.clock.Sleep(pollInterval)
}
}
Expand Down
Loading