From 60ef52624b1a9e71a8f567ec707acc521fdb297e Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 30 Mar 2020 11:33:32 -0700 Subject: [PATCH] Add configurable schedule name prefix (#82) --- flyteadmin_config.yaml | 1 + .../schedule/aws/cloud_watch_scheduler.go | 17 ++-- .../aws/cloud_watch_scheduler_test.go | 37 ++++++-- .../schedule/interfaces/event_scheduler.go | 11 ++- .../schedule/mocks/mock_event_scheduler.go | 8 +- pkg/async/schedule/noop/event_scheduler.go | 6 +- pkg/manager/impl/launch_plan_manager.go | 6 +- pkg/manager/impl/launch_plan_manager_test.go | 22 ++--- .../interfaces/application_configuration.go | 88 +++++++++++++++---- 9 files changed, 142 insertions(+), 54 deletions(-) diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index bcdace5cce..42813682f4 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -52,6 +52,7 @@ scheduler: region: "my-region" scheduleRole: "arn:aws:iam::abc123:role/my-iam-role" targetName: "arn:aws:sqs:my-region:abc123:my-queue" + scheduleNamePrefix: "flyte" workflowExecutor: scheme: local region: "my-region" diff --git a/pkg/async/schedule/aws/cloud_watch_scheduler.go b/pkg/async/schedule/aws/cloud_watch_scheduler.go index 17abdff38c..e8068461a7 100644 --- a/pkg/async/schedule/aws/cloud_watch_scheduler.go +++ b/pkg/async/schedule/aws/cloud_watch_scheduler.go @@ -37,7 +37,7 @@ var timeValue = "$.time" const scheduleNameInputsFormat = "%s:%s:%s" const scheduleDescriptionFormat = "Schedule for Project:%s Domain:%s Name:%s launch plan" -const scheduleNameFormat = "flyte_%d" +const scheduleNameFormat = "%s_%d" // Container for initialized metrics objects type cloudWatchSchedulerMetrics struct { @@ -68,9 +68,12 @@ type cloudWatchScheduler struct { metrics cloudWatchSchedulerMetrics } -func getScheduleName(identifier admin.NamedEntityIdentifier) string { +func getScheduleName(scheduleNamePrefix string, identifier admin.NamedEntityIdentifier) string { hashedIdentifier := hashIdentifier(identifier) - return fmt.Sprintf(scheduleNameFormat, hashedIdentifier) + if len(scheduleNamePrefix) > 0 { + return fmt.Sprintf(scheduleNameFormat, scheduleNamePrefix, hashedIdentifier) + } + return fmt.Sprintf("%d", hashedIdentifier) } func getScheduleDescription(identifier admin.NamedEntityIdentifier) string { @@ -115,7 +118,7 @@ func (s *cloudWatchScheduler) AddSchedule(ctx context.Context, input scheduleInt s.metrics.InvalidSchedules.Inc() return err } - scheduleName := getScheduleName(input.Identifier) + scheduleName := getScheduleName(input.ScheduleNamePrefix, input.Identifier) scheduleDescription := getScheduleDescription(input.Identifier) // First define a rule which gets triggered on a schedule. requestInput := cloudwatchevents.PutRuleInput{ @@ -175,8 +178,8 @@ func isResourceNotFoundException(err error) bool { return false } -func (s *cloudWatchScheduler) RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error { - name := getScheduleName(identifier) +func (s *cloudWatchScheduler) RemoveSchedule(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { + name := getScheduleName(input.ScheduleNamePrefix, input.Identifier) // All outbound targets for a rule must be deleted before the rule itself can be deleted. output, err := s.cloudWatchEventClient.RemoveTargets(&cloudwatchevents.RemoveTargetsInput{ Ids: []*string{ @@ -219,7 +222,7 @@ func (s *cloudWatchScheduler) RemoveSchedule(ctx context.Context, identifier adm } s.metrics.RemovedSchedules.Inc() s.metrics.ActiveSchedules.Dec() - logger.Debugf(ctx, "Removed schedule %s for identifier [%+v]", name, identifier) + logger.Debugf(ctx, "Removed schedule %s for identifier [%+v]", name, input.Identifier) return nil } diff --git a/pkg/async/schedule/aws/cloud_watch_scheduler_test.go b/pkg/async/schedule/aws/cloud_watch_scheduler_test.go index 91e00bfcc1..da97c56997 100644 --- a/pkg/async/schedule/aws/cloud_watch_scheduler_test.go +++ b/pkg/async/schedule/aws/cloud_watch_scheduler_test.go @@ -38,11 +38,18 @@ var scope = promutils.NewScope("test_scheduler") var testCloudWatchSchedulerMetrics = newCloudWatchSchedulerMetrics(scope) +const testScheduleNamePrefix = "flyte" + func TestGetScheduleName(t *testing.T) { - scheduleName := getScheduleName(testSchedulerIdentifier) + scheduleName := getScheduleName(testScheduleNamePrefix, testSchedulerIdentifier) assert.Equal(t, "flyte_16301494360130577061", scheduleName) } +func TestGetScheduleName_NoSystemPrefix(t *testing.T) { + scheduleName := getScheduleName("", testSchedulerIdentifier) + assert.Equal(t, "16301494360130577061", scheduleName) +} + func TestGetScheduleDescription(t *testing.T) { scheduleDescription := getScheduleDescription(testSchedulerIdentifier) assert.Equal(t, "Schedule for Project:project Domain:domain Name:name launch plan", scheduleDescription) @@ -135,7 +142,8 @@ func TestAddSchedule(t *testing.T) { }, }, }, - Payload: &testSerializedPayload, + Payload: &testSerializedPayload, + ScheduleNamePrefix: testScheduleNamePrefix, })) } @@ -216,7 +224,10 @@ func TestRemoveSchedule(t *testing.T) { return &cloudwatchevents.DeleteRuleOutput{}, nil }) scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient) - assert.Nil(t, scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier)) + assert.Nil(t, scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{ + Identifier: testSchedulerIdentifier, + ScheduleNamePrefix: testScheduleNamePrefix, + })) } func TestRemoveSchedule_RemoveTargetsError(t *testing.T) { @@ -226,7 +237,10 @@ func TestRemoveSchedule_RemoveTargetsError(t *testing.T) { return nil, expectedError }) scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient) - err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier) + err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{ + Identifier: testSchedulerIdentifier, + ScheduleNamePrefix: testScheduleNamePrefix, + }) assert.Equal(t, codes.Internal, err.(flyteAdminErrors.FlyteAdminError).Code()) } @@ -237,7 +251,10 @@ func TestRemoveSchedule_InvalidTarget(t *testing.T) { return nil, awserr.New(cloudwatchevents.ErrCodeResourceNotFoundException, "foo", expectedError) }) scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient) - err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier) + err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{ + Identifier: testSchedulerIdentifier, + ScheduleNamePrefix: testScheduleNamePrefix, + }) assert.Nil(t, err) } @@ -252,7 +269,10 @@ func TestRemoveSchedule_DeleteRuleError(t *testing.T) { return nil, expectedError }) scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient) - err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier) + err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{ + Identifier: testSchedulerIdentifier, + ScheduleNamePrefix: testScheduleNamePrefix, + }) assert.Equal(t, codes.Internal, err.(flyteAdminErrors.FlyteAdminError).Code()) } @@ -267,6 +287,9 @@ func TestRemoveSchedule_InvalidRule(t *testing.T) { return nil, awserr.New(cloudwatchevents.ErrCodeResourceNotFoundException, "foo", expectedError) }) scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient) - err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier) + err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{ + Identifier: testSchedulerIdentifier, + ScheduleNamePrefix: testScheduleNamePrefix, + }) assert.Nil(t, err) } diff --git a/pkg/async/schedule/interfaces/event_scheduler.go b/pkg/async/schedule/interfaces/event_scheduler.go index 58714d0f29..d3a481cb3c 100644 --- a/pkg/async/schedule/interfaces/event_scheduler.go +++ b/pkg/async/schedule/interfaces/event_scheduler.go @@ -14,6 +14,15 @@ type AddScheduleInput struct { ScheduleExpression admin.Schedule // Message payload encoded as an CloudWatch event rule InputTemplate. Payload *string + // Optional: The application-wide prefix to be applied for schedule names. + ScheduleNamePrefix string +} + +type RemoveScheduleInput struct { + // Defines the unique identifier associated with the schedule + Identifier admin.NamedEntityIdentifier + // Optional: The application-wide prefix to be applied for schedule names. + ScheduleNamePrefix string } type EventScheduler interface { @@ -21,5 +30,5 @@ type EventScheduler interface { AddSchedule(ctx context.Context, input AddScheduleInput) error // Removes an existing schedule. - RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error + RemoveSchedule(ctx context.Context, input RemoveScheduleInput) error } diff --git a/pkg/async/schedule/mocks/mock_event_scheduler.go b/pkg/async/schedule/mocks/mock_event_scheduler.go index 25afbecfb9..e46b554e4b 100644 --- a/pkg/async/schedule/mocks/mock_event_scheduler.go +++ b/pkg/async/schedule/mocks/mock_event_scheduler.go @@ -4,12 +4,10 @@ import ( "context" "github.com/lyft/flyteadmin/pkg/async/schedule/interfaces" - - "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" ) type AddScheduleFunc func(ctx context.Context, input interfaces.AddScheduleInput) error -type RemoveScheduleFunc func(ctx context.Context, identifier admin.NamedEntityIdentifier) error +type RemoveScheduleFunc func(ctx context.Context, input interfaces.RemoveScheduleInput) error type MockEventScheduler struct { addScheduleFunc AddScheduleFunc removeScheduleFunc RemoveScheduleFunc @@ -26,9 +24,9 @@ func (s *MockEventScheduler) SetAddScheduleFunc(addScheduleFunc AddScheduleFunc) s.addScheduleFunc = addScheduleFunc } -func (s *MockEventScheduler) RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error { +func (s *MockEventScheduler) RemoveSchedule(ctx context.Context, input interfaces.RemoveScheduleInput) error { if s.removeScheduleFunc != nil { - return s.removeScheduleFunc(ctx, identifier) + return s.removeScheduleFunc(ctx, input) } return nil } diff --git a/pkg/async/schedule/noop/event_scheduler.go b/pkg/async/schedule/noop/event_scheduler.go index bf4ea986d4..2da139c204 100644 --- a/pkg/async/schedule/noop/event_scheduler.go +++ b/pkg/async/schedule/noop/event_scheduler.go @@ -6,8 +6,6 @@ import ( "github.com/lyft/flyteadmin/pkg/async/schedule/interfaces" - "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/lyft/flytestdlib/logger" ) @@ -19,8 +17,8 @@ func (s *EventScheduler) AddSchedule(ctx context.Context, input interfaces.AddSc return nil } -func (s *EventScheduler) RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error { - logger.Debugf(ctx, "Received call to remove schedule [%+v]", identifier) +func (s *EventScheduler) RemoveSchedule(ctx context.Context, input interfaces.RemoveScheduleInput) error { + logger.Debugf(ctx, "Received call to remove schedule [%+v]", input.Identifier) logger.Debug(ctx, "Not scheduling anything") return nil } diff --git a/pkg/manager/impl/launch_plan_manager.go b/pkg/manager/impl/launch_plan_manager.go index b6e5de18d3..c5b400d28b 100644 --- a/pkg/manager/impl/launch_plan_manager.go +++ b/pkg/manager/impl/launch_plan_manager.go @@ -172,13 +172,17 @@ func (m *LaunchPlanManager) enableSchedule(ctx context.Context, launchPlanIdenti Identifier: launchPlanIdentifier, ScheduleExpression: *launchPlanSpec.EntityMetadata.Schedule, Payload: payload, + ScheduleNamePrefix: m.config.ApplicationConfiguration().GetSchedulerConfig().EventSchedulerConfig.ScheduleNamePrefix, } return m.scheduler.AddSchedule(ctx, addScheduleInput) } func (m *LaunchPlanManager) disableSchedule( ctx context.Context, launchPlanIdentifier admin.NamedEntityIdentifier) error { - return m.scheduler.RemoveSchedule(ctx, launchPlanIdentifier) + return m.scheduler.RemoveSchedule(ctx, scheduleInterfaces.RemoveScheduleInput{ + Identifier: launchPlanIdentifier, + ScheduleNamePrefix: m.config.ApplicationConfiguration().GetSchedulerConfig().EventSchedulerConfig.ScheduleNamePrefix, + }) } func (m *LaunchPlanManager) updateSchedules( diff --git a/pkg/manager/impl/launch_plan_manager_test.go b/pkg/manager/impl/launch_plan_manager_test.go index c851f7d8a8..13a6241178 100644 --- a/pkg/manager/impl/launch_plan_manager_test.go +++ b/pkg/manager/impl/launch_plan_manager_test.go @@ -437,8 +437,8 @@ func TestDisableSchedule(t *testing.T) { repository := getMockRepositoryForLpTest() mockScheduler := mocks.NewMockEventScheduler() mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc( - func(ctx context.Context, identifier admin.NamedEntityIdentifier) error { - assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &identifier)) + func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { + assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &input.Identifier)) return nil }) lpManager := NewLaunchPlanManager(repository, getMockConfigForLpTest(), mockScheduler, mockScope.NewTestScope()) @@ -452,7 +452,7 @@ func TestDisableSchedule_Error(t *testing.T) { repository := getMockRepositoryForLpTest() mockScheduler := mocks.NewMockEventScheduler() mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc( - func(ctx context.Context, identifier admin.NamedEntityIdentifier) error { + func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { return expectedErr }) lpManager := NewLaunchPlanManager(repository, getMockConfigForLpTest(), mockScheduler, mockScope.NewTestScope()) @@ -489,8 +489,8 @@ func TestUpdateSchedules(t *testing.T) { mockScheduler := mocks.NewMockEventScheduler() var removeCalled bool mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc( - func(ctx context.Context, identifier admin.NamedEntityIdentifier) error { - assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &identifier)) + func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { + assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &input.Identifier)) removeCalled = true return nil }) @@ -607,12 +607,12 @@ func TestUpdateSchedules_NothingToEnable(t *testing.T) { mockScheduler := mocks.NewMockEventScheduler() var removeCalled bool mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc( - func(ctx context.Context, identifier admin.NamedEntityIdentifier) error { + func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { assert.True(t, proto.Equal(&admin.NamedEntityIdentifier{ Project: project, Domain: domain, Name: name, - }, &identifier)) + }, &input.Identifier)) removeCalled = true return nil }) @@ -653,7 +653,7 @@ func TestUpdateSchedules_NothingToDo(t *testing.T) { mockScheduler := mocks.NewMockEventScheduler() var removeCalled bool mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc( - func(ctx context.Context, identifier admin.NamedEntityIdentifier) error { + func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { removeCalled = true return nil }) @@ -710,7 +710,7 @@ func TestUpdateSchedules_EnableNoSchedule(t *testing.T) { mockScheduler := mocks.NewMockEventScheduler() var removeCalled bool mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc( - func(ctx context.Context, identifier admin.NamedEntityIdentifier) error { + func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { removeCalled = true return nil }) @@ -795,8 +795,8 @@ func TestDisableLaunchPlan(t *testing.T) { var removeScheduleFuncCalled bool mockScheduler := mocks.NewMockEventScheduler() mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc( - func(ctx context.Context, identifier admin.NamedEntityIdentifier) error { - assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &identifier)) + func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error { + assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &input.Identifier)) removeScheduleFuncCalled = true return nil }) diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index 9797a98100..b67ff1cc56 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -1,17 +1,28 @@ package interfaces +// This configuration section is used to for initiating the database connection with the store that holds registered +// entities (e.g. workflows, tasks, launch plans...) +// This struct specifically maps to the flyteadmin config yaml structure. type DbConfigSection struct { - Host string `json:"host"` - Port int `json:"port"` + // The host name of the database server + Host string `json:"host"` + // The port name of the database server + Port int `json:"port"` + // The database name DbName string `json:"dbname"` - User string `json:"username"` + // The database user who is connecting to the server. + User string `json:"username"` // Either Password or PasswordPath must be set. + // The Password resolves to the database password. Password string `json:"password"` PasswordPath string `json:"passwordPath"` // See http://gorm.io/docs/connecting_to_the_database.html for available options passed, in addition to the above. ExtraOptions string `json:"options"` } +// This represents a configuration used for initiating database connections much like DbConfigSection, however the +// password is *resolved* in this struct and therefore it is used as the value the runtime provider returns to callers +// requesting the database config. type DbConfig struct { Host string `json:"host"` Port int `json:"port"` @@ -23,25 +34,46 @@ type DbConfig struct { // This configuration is the base configuration to start admin type ApplicationConfig struct { - RoleNameKey string `json:"roleNameKey"` - KubeConfig string `json:"kubeconfig"` - MetricsScope string `json:"metricsScope"` - ProfilerPort int `json:"profilerPort"` + // The RoleName key inserted as an annotation (https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) + // in Flyte Workflow CRDs created in the CreateExecution flow. The corresponding role value is defined in the + // launch plan that is used to create the execution. + RoleNameKey string `json:"roleNameKey"` + // Top-level name applied to all metrics emitted by the application. + MetricsScope string `json:"metricsScope"` + // Determines which port the profiling server used for admin monitoring and application debugging uses. + ProfilerPort int `json:"profilerPort"` + // This defines the nested path on the configured external storage provider where workflow closures are remotely + // offloaded. MetadataStoragePrefix []string `json:"metadataStoragePrefix"` } +// This section holds configuration for the event scheduler used to schedule workflow executions. type EventSchedulerConfig struct { - Scheme string `json:"scheme"` - Region string `json:"region"` + // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' + // scheme is used. + Scheme string `json:"scheme"` + // Some cloud providers require a region to be set. + Region string `json:"region"` + // The role assumed to register and activate schedules. ScheduleRole string `json:"scheduleRole"` - TargetName string `json:"targetName"` + // The name of the queue for which scheduled events should enqueue. + TargetName string `json:"targetName"` + // Optional: The application-wide prefix to be applied for schedule names. + ScheduleNamePrefix string `json:"scheduleNamePrefix"` } +// This section holds configuration for the executor that processes workflow scheduled events fired. type WorkflowExecutorConfig struct { - Scheme string `json:"scheme"` - Region string `json:"region"` + // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' + // scheme is used. + Scheme string `json:"scheme"` + // Some cloud providers require a region to be set. + Region string `json:"region"` + // The name of the queue onto which scheduled events will enqueue. ScheduleQueueName string `json:"scheduleQueueName"` - AccountID string `json:"accountId"` + // The account id (according to whichever cloud provider scheme is used) that has permission to read from the above + // queue. + AccountID string `json:"accountId"` } // This configuration is the base configuration for all scheduler-related set-up. @@ -52,42 +84,62 @@ type SchedulerConfig struct { // Configuration specific to setting up signed urls. type SignedURL struct { + // The amount of time for which a signed URL is valid. DurationMinutes int `json:"durationMinutes"` } // This configuration handles all requests to get remote data such as execution inputs & outputs. type RemoteDataConfig struct { - Scheme string `json:"scheme"` + // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' + // scheme is used. + Scheme string `json:"scheme"` + // Some cloud providers require a region to be set. Region string `json:"region"` SignedURL SignedURL `json:"signedUrls"` } +// This section handles configuration for the workflow notifications pipeline. type NotificationsPublisherConfig struct { + // The topic which notifications use, e.g. AWS SNS topics. TopicName string `json:"topicName"` } +// This section handles configuration for processing workflow events. type NotificationsProcessorConfig struct { + // The name of the queue onto which workflow notifications will enqueue. QueueName string `json:"queueName"` + // The account id (according to whichever cloud provider scheme is used) that has permission to read from the above + // queue. AccountID string `json:"accountId"` } +// This section handles the configuration of notifications emails. type NotificationsEmailerConfig struct { + // The optionally templatized subject used in notification emails. Subject string `json:"subject"` - Sender string `json:"sender"` - Body string `json:"body"` + // The optionally templatized sender used in notification emails. + Sender string `json:"sender"` + // The optionally templatized body the sender used in notification emails. + Body string `json:"body"` } // Configuration specific to notifications handling type NotificationsConfig struct { - Type string `json:"type"` + // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' + // scheme is used. + Type string `json:"type"` + // Some cloud providers require a region to be set. Region string `json:"region"` NotificationsPublisherConfig NotificationsPublisherConfig `json:"publisher"` NotificationsProcessorConfig NotificationsProcessorConfig `json:"processor"` NotificationsEmailerConfig NotificationsEmailerConfig `json:"emailer"` } +// Domains are always globally set in the application config, whereas individual projects can be individually registered. type Domain struct { - ID string `json:"id"` + // Unique identifier for a domain. + ID string `json:"id"` + // Human readable name for a domain. Name string `json:"name"` }