Skip to content

Commit

Permalink
Add configurable schedule name prefix (flyteorg#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan committed Mar 30, 2020
1 parent ccc0d05 commit 60ef526
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 54 deletions.
1 change: 1 addition & 0 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ scheduler:
region: "my-region"
scheduleRole: "arn:aws:iam::abc123:role/my-iam-role"
targetName: "arn:aws:sqs:my-region:abc123:my-queue"
scheduleNamePrefix: "flyte"
workflowExecutor:
scheme: local
region: "my-region"
Expand Down
17 changes: 10 additions & 7 deletions pkg/async/schedule/aws/cloud_watch_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var timeValue = "$.time"

const scheduleNameInputsFormat = "%s:%s:%s"
const scheduleDescriptionFormat = "Schedule for Project:%s Domain:%s Name:%s launch plan"
const scheduleNameFormat = "flyte_%d"
const scheduleNameFormat = "%s_%d"

// Container for initialized metrics objects
type cloudWatchSchedulerMetrics struct {
Expand Down Expand Up @@ -68,9 +68,12 @@ type cloudWatchScheduler struct {
metrics cloudWatchSchedulerMetrics
}

func getScheduleName(identifier admin.NamedEntityIdentifier) string {
func getScheduleName(scheduleNamePrefix string, identifier admin.NamedEntityIdentifier) string {
hashedIdentifier := hashIdentifier(identifier)
return fmt.Sprintf(scheduleNameFormat, hashedIdentifier)
if len(scheduleNamePrefix) > 0 {
return fmt.Sprintf(scheduleNameFormat, scheduleNamePrefix, hashedIdentifier)
}
return fmt.Sprintf("%d", hashedIdentifier)
}

func getScheduleDescription(identifier admin.NamedEntityIdentifier) string {
Expand Down Expand Up @@ -115,7 +118,7 @@ func (s *cloudWatchScheduler) AddSchedule(ctx context.Context, input scheduleInt
s.metrics.InvalidSchedules.Inc()
return err
}
scheduleName := getScheduleName(input.Identifier)
scheduleName := getScheduleName(input.ScheduleNamePrefix, input.Identifier)
scheduleDescription := getScheduleDescription(input.Identifier)
// First define a rule which gets triggered on a schedule.
requestInput := cloudwatchevents.PutRuleInput{
Expand Down Expand Up @@ -175,8 +178,8 @@ func isResourceNotFoundException(err error) bool {
return false
}

func (s *cloudWatchScheduler) RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
name := getScheduleName(identifier)
func (s *cloudWatchScheduler) RemoveSchedule(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
name := getScheduleName(input.ScheduleNamePrefix, input.Identifier)
// All outbound targets for a rule must be deleted before the rule itself can be deleted.
output, err := s.cloudWatchEventClient.RemoveTargets(&cloudwatchevents.RemoveTargetsInput{
Ids: []*string{
Expand Down Expand Up @@ -219,7 +222,7 @@ func (s *cloudWatchScheduler) RemoveSchedule(ctx context.Context, identifier adm
}
s.metrics.RemovedSchedules.Inc()
s.metrics.ActiveSchedules.Dec()
logger.Debugf(ctx, "Removed schedule %s for identifier [%+v]", name, identifier)
logger.Debugf(ctx, "Removed schedule %s for identifier [%+v]", name, input.Identifier)
return nil
}

Expand Down
37 changes: 30 additions & 7 deletions pkg/async/schedule/aws/cloud_watch_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ var scope = promutils.NewScope("test_scheduler")

var testCloudWatchSchedulerMetrics = newCloudWatchSchedulerMetrics(scope)

const testScheduleNamePrefix = "flyte"

func TestGetScheduleName(t *testing.T) {
scheduleName := getScheduleName(testSchedulerIdentifier)
scheduleName := getScheduleName(testScheduleNamePrefix, testSchedulerIdentifier)
assert.Equal(t, "flyte_16301494360130577061", scheduleName)
}

func TestGetScheduleName_NoSystemPrefix(t *testing.T) {
scheduleName := getScheduleName("", testSchedulerIdentifier)
assert.Equal(t, "16301494360130577061", scheduleName)
}

func TestGetScheduleDescription(t *testing.T) {
scheduleDescription := getScheduleDescription(testSchedulerIdentifier)
assert.Equal(t, "Schedule for Project:project Domain:domain Name:name launch plan", scheduleDescription)
Expand Down Expand Up @@ -135,7 +142,8 @@ func TestAddSchedule(t *testing.T) {
},
},
},
Payload: &testSerializedPayload,
Payload: &testSerializedPayload,
ScheduleNamePrefix: testScheduleNamePrefix,
}))
}

Expand Down Expand Up @@ -216,7 +224,10 @@ func TestRemoveSchedule(t *testing.T) {
return &cloudwatchevents.DeleteRuleOutput{}, nil
})
scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient)
assert.Nil(t, scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier))
assert.Nil(t, scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{
Identifier: testSchedulerIdentifier,
ScheduleNamePrefix: testScheduleNamePrefix,
}))
}

func TestRemoveSchedule_RemoveTargetsError(t *testing.T) {
Expand All @@ -226,7 +237,10 @@ func TestRemoveSchedule_RemoveTargetsError(t *testing.T) {
return nil, expectedError
})
scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient)
err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier)
err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{
Identifier: testSchedulerIdentifier,
ScheduleNamePrefix: testScheduleNamePrefix,
})
assert.Equal(t, codes.Internal, err.(flyteAdminErrors.FlyteAdminError).Code())
}

Expand All @@ -237,7 +251,10 @@ func TestRemoveSchedule_InvalidTarget(t *testing.T) {
return nil, awserr.New(cloudwatchevents.ErrCodeResourceNotFoundException, "foo", expectedError)
})
scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient)
err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier)
err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{
Identifier: testSchedulerIdentifier,
ScheduleNamePrefix: testScheduleNamePrefix,
})
assert.Nil(t, err)
}

Expand All @@ -252,7 +269,10 @@ func TestRemoveSchedule_DeleteRuleError(t *testing.T) {
return nil, expectedError
})
scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient)
err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier)
err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{
Identifier: testSchedulerIdentifier,
ScheduleNamePrefix: testScheduleNamePrefix,
})
assert.Equal(t, codes.Internal, err.(flyteAdminErrors.FlyteAdminError).Code())
}

Expand All @@ -267,6 +287,9 @@ func TestRemoveSchedule_InvalidRule(t *testing.T) {
return nil, awserr.New(cloudwatchevents.ErrCodeResourceNotFoundException, "foo", expectedError)
})
scheduler := getCloudWatchSchedulerForTest(mockCloudWatchEventClient)
err := scheduler.RemoveSchedule(context.Background(), testSchedulerIdentifier)
err := scheduler.RemoveSchedule(context.Background(), scheduleInterfaces.RemoveScheduleInput{
Identifier: testSchedulerIdentifier,
ScheduleNamePrefix: testScheduleNamePrefix,
})
assert.Nil(t, err)
}
11 changes: 10 additions & 1 deletion pkg/async/schedule/interfaces/event_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ type AddScheduleInput struct {
ScheduleExpression admin.Schedule
// Message payload encoded as an CloudWatch event rule InputTemplate.
Payload *string
// Optional: The application-wide prefix to be applied for schedule names.
ScheduleNamePrefix string
}

type RemoveScheduleInput struct {
// Defines the unique identifier associated with the schedule
Identifier admin.NamedEntityIdentifier
// Optional: The application-wide prefix to be applied for schedule names.
ScheduleNamePrefix string
}

type EventScheduler interface {
// Schedules an event.
AddSchedule(ctx context.Context, input AddScheduleInput) error

// Removes an existing schedule.
RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error
RemoveSchedule(ctx context.Context, input RemoveScheduleInput) error
}
8 changes: 3 additions & 5 deletions pkg/async/schedule/mocks/mock_event_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"

"github.com/lyft/flyteadmin/pkg/async/schedule/interfaces"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
)

type AddScheduleFunc func(ctx context.Context, input interfaces.AddScheduleInput) error
type RemoveScheduleFunc func(ctx context.Context, identifier admin.NamedEntityIdentifier) error
type RemoveScheduleFunc func(ctx context.Context, input interfaces.RemoveScheduleInput) error
type MockEventScheduler struct {
addScheduleFunc AddScheduleFunc
removeScheduleFunc RemoveScheduleFunc
Expand All @@ -26,9 +24,9 @@ func (s *MockEventScheduler) SetAddScheduleFunc(addScheduleFunc AddScheduleFunc)
s.addScheduleFunc = addScheduleFunc
}

func (s *MockEventScheduler) RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
func (s *MockEventScheduler) RemoveSchedule(ctx context.Context, input interfaces.RemoveScheduleInput) error {
if s.removeScheduleFunc != nil {
return s.removeScheduleFunc(ctx, identifier)
return s.removeScheduleFunc(ctx, input)
}
return nil
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/async/schedule/noop/event_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (

"github.com/lyft/flyteadmin/pkg/async/schedule/interfaces"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flytestdlib/logger"
)

Expand All @@ -19,8 +17,8 @@ func (s *EventScheduler) AddSchedule(ctx context.Context, input interfaces.AddSc
return nil
}

func (s *EventScheduler) RemoveSchedule(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
logger.Debugf(ctx, "Received call to remove schedule [%+v]", identifier)
func (s *EventScheduler) RemoveSchedule(ctx context.Context, input interfaces.RemoveScheduleInput) error {
logger.Debugf(ctx, "Received call to remove schedule [%+v]", input.Identifier)
logger.Debug(ctx, "Not scheduling anything")
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/manager/impl/launch_plan_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,17 @@ func (m *LaunchPlanManager) enableSchedule(ctx context.Context, launchPlanIdenti
Identifier: launchPlanIdentifier,
ScheduleExpression: *launchPlanSpec.EntityMetadata.Schedule,
Payload: payload,
ScheduleNamePrefix: m.config.ApplicationConfiguration().GetSchedulerConfig().EventSchedulerConfig.ScheduleNamePrefix,
}
return m.scheduler.AddSchedule(ctx, addScheduleInput)
}

func (m *LaunchPlanManager) disableSchedule(
ctx context.Context, launchPlanIdentifier admin.NamedEntityIdentifier) error {
return m.scheduler.RemoveSchedule(ctx, launchPlanIdentifier)
return m.scheduler.RemoveSchedule(ctx, scheduleInterfaces.RemoveScheduleInput{
Identifier: launchPlanIdentifier,
ScheduleNamePrefix: m.config.ApplicationConfiguration().GetSchedulerConfig().EventSchedulerConfig.ScheduleNamePrefix,
})
}

func (m *LaunchPlanManager) updateSchedules(
Expand Down
22 changes: 11 additions & 11 deletions pkg/manager/impl/launch_plan_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ func TestDisableSchedule(t *testing.T) {
repository := getMockRepositoryForLpTest()
mockScheduler := mocks.NewMockEventScheduler()
mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc(
func(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &identifier))
func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &input.Identifier))
return nil
})
lpManager := NewLaunchPlanManager(repository, getMockConfigForLpTest(), mockScheduler, mockScope.NewTestScope())
Expand All @@ -452,7 +452,7 @@ func TestDisableSchedule_Error(t *testing.T) {
repository := getMockRepositoryForLpTest()
mockScheduler := mocks.NewMockEventScheduler()
mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc(
func(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
return expectedErr
})
lpManager := NewLaunchPlanManager(repository, getMockConfigForLpTest(), mockScheduler, mockScope.NewTestScope())
Expand Down Expand Up @@ -489,8 +489,8 @@ func TestUpdateSchedules(t *testing.T) {
mockScheduler := mocks.NewMockEventScheduler()
var removeCalled bool
mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc(
func(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &identifier))
func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &input.Identifier))
removeCalled = true
return nil
})
Expand Down Expand Up @@ -607,12 +607,12 @@ func TestUpdateSchedules_NothingToEnable(t *testing.T) {
mockScheduler := mocks.NewMockEventScheduler()
var removeCalled bool
mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc(
func(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
assert.True(t, proto.Equal(&admin.NamedEntityIdentifier{
Project: project,
Domain: domain,
Name: name,
}, &identifier))
}, &input.Identifier))
removeCalled = true
return nil
})
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestUpdateSchedules_NothingToDo(t *testing.T) {
mockScheduler := mocks.NewMockEventScheduler()
var removeCalled bool
mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc(
func(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
removeCalled = true
return nil
})
Expand Down Expand Up @@ -710,7 +710,7 @@ func TestUpdateSchedules_EnableNoSchedule(t *testing.T) {
mockScheduler := mocks.NewMockEventScheduler()
var removeCalled bool
mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc(
func(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
removeCalled = true
return nil
})
Expand Down Expand Up @@ -795,8 +795,8 @@ func TestDisableLaunchPlan(t *testing.T) {
var removeScheduleFuncCalled bool
mockScheduler := mocks.NewMockEventScheduler()
mockScheduler.(*mocks.MockEventScheduler).SetRemoveScheduleFunc(
func(ctx context.Context, identifier admin.NamedEntityIdentifier) error {
assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &identifier))
func(ctx context.Context, input scheduleInterfaces.RemoveScheduleInput) error {
assert.True(t, proto.Equal(&launchPlanNamedIdentifier, &input.Identifier))
removeScheduleFuncCalled = true
return nil
})
Expand Down
Loading

0 comments on commit 60ef526

Please sign in to comment.