This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
schedule_executor.go
116 lines (95 loc) · 4.31 KB
/
schedule_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package scheduler
import (
"context"
"time"
repositoryInterfaces "github.com/flyteorg/flyteadmin/scheduler/repositories/interfaces"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteadmin/scheduler/core"
"github.com/flyteorg/flyteadmin/scheduler/executor"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flytestdlib/futures"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/util/wait"
)
const snapshotWriterDuration = 30 * time.Second
const scheduleUpdaterDuration = 30 * time.Second
const snapShotVersion = 1
// ScheduledExecutor used for executing the schedules saved by the native flyte scheduler in the database.
type ScheduledExecutor struct {
scheduler core.Scheduler
snapshoter snapshoter.Persistence
db repositoryInterfaces.SchedulerRepoInterface
scope promutils.Scope
adminServiceClient service.AdminServiceClient
workflowExecutorConfig *runtimeInterfaces.FlyteWorkflowExecutorConfig
}
func (w *ScheduledExecutor) Run(ctx context.Context) error {
logger.Infof(ctx, "Flyte native scheduler started successfully")
defer logger.Infof(ctx, "Flyte native scheduler shutdown")
// Read snapshot from the DB. Each snapshot is versioned and helps in maintaining backward compatibility
// Snapshot contains the lastexecution times for each schedule and is captured every 30 secs
snapShotReader := &snapshoter.VersionedSnapshot{Version: snapShotVersion}
snapshot, err := w.snapshoter.Read(ctx, snapShotReader)
if err != nil {
logger.Errorf(ctx, "unable to read the snapshot from the db due to %v. Aborting", err)
return err
}
// Read all the schedules from the DB
schedules, err := w.db.SchedulableEntityRepo().GetAll(ctx)
if err != nil {
logger.Errorf(ctx, "unable to read the schedules from the db due to %v. Aborting", err)
return err
}
logger.Infof(ctx, "Number of schedules retrieved %v", len(schedules))
adminRateLimit := w.workflowExecutorConfig.GetAdminRateLimit()
// Set the rate limit on the admin
rateLimiter := rate.NewLimiter(adminRateLimit.GetTps(), adminRateLimit.GetBurst())
// Set the executor to send executions to admin
executor := executor.New(w.scope, w.adminServiceClient)
// Create the scheduler using GoCronScheduler implementation
// Also Bootstrap the schedules from the snapshot
bootStrapCtx, bootStrapCancel := context.WithCancel(ctx)
defer bootStrapCancel()
gcronScheduler := core.NewGoCronScheduler(bootStrapCtx, schedules, w.scope, snapshot, rateLimiter, executor)
w.scheduler = gcronScheduler
// Start the go routine to write the update schedules periodically
updaterCtx, updaterCancel := context.WithCancel(ctx)
defer updaterCancel()
gcronUpdater := core.NewUpdater(w.db, gcronScheduler)
go wait.UntilWithContext(updaterCtx, gcronUpdater.UpdateGoCronSchedules, scheduleUpdaterDuration)
// Catch up simulataneously on all the schedules in the scheduler
currTime := time.Now()
af := futures.NewAsyncFuture(ctx, func(ctx context.Context) (interface{}, error) {
return gcronScheduler.CatchupAll(ctx, currTime), nil
})
isCatchupSuccess, err := af.Get(ctx)
if err != nil {
logger.Errorf(ctx, "failed to get future value for catchup due to %v", err)
return err
}
if !isCatchupSuccess.(bool) {
logger.Errorf(ctx, "failed to catch up on all the schedules. Aborting")
return err
}
snapshotRunner := core.NewSnapshotRunner(w.snapshoter, w.scheduler)
// Start the go routine to write the snapshot periodically
snapshoterCtx, snapshoterCancel := context.WithCancel(ctx)
defer snapshoterCancel()
wait.UntilWithContext(snapshoterCtx, snapshotRunner.Run, snapshotWriterDuration)
<-ctx.Done()
return nil
}
func NewScheduledExecutor(db repositoryInterfaces.SchedulerRepoInterface,
workflowExecutorConfig runtimeInterfaces.WorkflowExecutorConfig,
scope promutils.Scope, adminServiceClient service.AdminServiceClient) ScheduledExecutor {
return ScheduledExecutor{
db: db,
scope: scope,
adminServiceClient: adminServiceClient,
workflowExecutorConfig: workflowExecutorConfig.GetFlyteWorkflowExecutorConfig(),
snapshoter: snapshoter.New(scope, db),
}
}