This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
event_scheduler_impl.go
87 lines (77 loc) · 2.95 KB
/
event_scheduler_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package dbapi
import (
"context"
"fmt"
"github.com/flyteorg/flyteadmin/pkg/async/schedule/interfaces"
scheduleInterfaces "github.com/flyteorg/flyteadmin/pkg/async/schedule/interfaces"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteadmin/scheduler/repositories"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
)
// eventScheduler used for saving the scheduler entries after launch plans are enabled or disabled.
type eventScheduler struct {
db repositories.SchedulerRepoInterface
}
func (s *eventScheduler) CreateScheduleInput(ctx context.Context, appConfig *runtimeInterfaces.SchedulerConfig,
identifier core.Identifier, schedule *admin.Schedule) (interfaces.AddScheduleInput, error) {
addScheduleInput := scheduleInterfaces.AddScheduleInput{
Identifier: identifier,
ScheduleExpression: *schedule,
}
return addScheduleInput, nil
}
func (s *eventScheduler) AddSchedule(ctx context.Context, input interfaces.AddScheduleInput) error {
logger.Infof(ctx, "Received call to add schedule [%+v]", input)
var cronString string
var fixedRateValue uint32
var fixedRateUnit admin.FixedRateUnit
switch v := input.ScheduleExpression.GetScheduleExpression().(type) {
case *admin.Schedule_Rate:
fixedRateValue = v.Rate.Value
fixedRateUnit = v.Rate.Unit
case *admin.Schedule_CronSchedule:
cronString = v.CronSchedule.Schedule
default:
return fmt.Errorf("failed adding schedule for unknown schedule expression type %v", v)
}
active := true
modelInput := models.SchedulableEntity{
CronExpression: cronString,
FixedRateValue: fixedRateValue,
Unit: fixedRateUnit,
KickoffTimeInputArg: input.ScheduleExpression.KickoffTimeInputArg,
Active: &active,
SchedulableEntityKey: models.SchedulableEntityKey{
Project: input.Identifier.Project,
Domain: input.Identifier.Domain,
Name: input.Identifier.Name,
Version: input.Identifier.Version,
},
}
err := s.db.SchedulableEntityRepo().Activate(ctx, modelInput)
if err != nil {
return err
}
logger.Infof(ctx, "Activated scheduled entity for %v ", input)
return nil
}
func (s *eventScheduler) RemoveSchedule(ctx context.Context, input interfaces.RemoveScheduleInput) error {
logger.Infof(ctx, "Received call to remove schedule [%+v]. Will deactivate it in the scheduler", input.Identifier)
err := s.db.SchedulableEntityRepo().Deactivate(ctx, models.SchedulableEntityKey{
Project: input.Identifier.Project,
Domain: input.Identifier.Domain,
Name: input.Identifier.Name,
Version: input.Identifier.Version,
})
if err != nil {
return err
}
logger.Infof(ctx, "Deactivated the schedule %v in the scheduler", input)
return nil
}
func New(db repositories.SchedulerRepoInterface) interfaces.EventScheduler {
return &eventScheduler{db: db}
}