This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
factory.go
114 lines (102 loc) · 4.23 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package schedule
import (
"context"
"time"
repoInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
gizmoConfig "github.com/NYTimes/gizmo/pubsub/aws"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/flyteorg/flyteadmin/pkg/async"
awsSchedule "github.com/flyteorg/flyteadmin/pkg/async/schedule/aws"
"github.com/flyteorg/flyteadmin/pkg/async/schedule/interfaces"
"github.com/flyteorg/flyteadmin/pkg/async/schedule/noop"
"github.com/flyteorg/flyteadmin/pkg/common"
managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
flytescheduler "github.com/flyteorg/flyteadmin/scheduler/dbapi"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
)
type WorkflowSchedulerConfig struct {
Retries int
SchedulerConfig runtimeInterfaces.SchedulerConfig
Scope promutils.Scope
}
type WorkflowScheduler interface {
GetEventScheduler() interfaces.EventScheduler
GetWorkflowExecutor(executionManager managerInterfaces.ExecutionInterface,
launchPlanManager managerInterfaces.LaunchPlanInterface) interfaces.WorkflowExecutor
}
type workflowScheduler struct {
cfg WorkflowSchedulerConfig
eventScheduler interfaces.EventScheduler
workflowExecutor interfaces.WorkflowExecutor
}
func (w *workflowScheduler) GetEventScheduler() interfaces.EventScheduler {
return w.eventScheduler
}
func (w *workflowScheduler) GetWorkflowExecutor(
executionManager managerInterfaces.ExecutionInterface,
launchPlanManager managerInterfaces.LaunchPlanInterface) interfaces.WorkflowExecutor {
if w.workflowExecutor == nil {
sqsConfig := gizmoConfig.SQSConfig{
QueueName: w.cfg.SchedulerConfig.WorkflowExecutorConfig.ScheduleQueueName,
QueueOwnerAccountID: w.cfg.SchedulerConfig.WorkflowExecutorConfig.AccountID,
}
sqsConfig.Region = w.cfg.SchedulerConfig.WorkflowExecutorConfig.Region
w.workflowExecutor = awsSchedule.NewWorkflowExecutor(
sqsConfig, w.cfg.SchedulerConfig, executionManager, launchPlanManager, w.cfg.Scope.NewSubScope("workflow_executor"))
}
return w.workflowExecutor
}
func NewWorkflowScheduler(db repoInterfaces.Repository, cfg WorkflowSchedulerConfig) WorkflowScheduler {
var eventScheduler interfaces.EventScheduler
var workflowExecutor interfaces.WorkflowExecutor
switch cfg.SchedulerConfig.EventSchedulerConfig.Scheme {
case common.AWS:
awsConfig := aws.NewConfig().WithRegion(cfg.SchedulerConfig.WorkflowExecutorConfig.Region).WithMaxRetries(cfg.Retries)
var sess *session.Session
var err error
err = async.Retry(cfg.SchedulerConfig.ReconnectAttempts,
time.Duration(cfg.SchedulerConfig.ReconnectDelaySeconds)*time.Second, func() error {
sess, err = session.NewSession(awsConfig)
if err != nil {
logger.Warnf(context.TODO(), "Failed to initialize new event scheduler with aws config: [%+v] and err: %v", awsConfig, err)
}
return err
})
if err != nil {
panic(err)
}
eventScheduler = awsSchedule.NewCloudWatchScheduler(
cfg.SchedulerConfig.EventSchedulerConfig.ScheduleRole, cfg.SchedulerConfig.EventSchedulerConfig.TargetName, sess, awsConfig,
cfg.Scope.NewSubScope("cloudwatch_scheduler"))
case common.Local:
logger.Infof(context.Background(),
"Using default flyte scheduler implementation")
eventScheduler = flytescheduler.New(db)
default:
logger.Infof(context.Background(),
"Using default noop event scheduler implementation for cloud provider type [%s]",
cfg.SchedulerConfig.EventSchedulerConfig.Scheme)
eventScheduler = noop.NewNoopEventScheduler()
}
switch cfg.SchedulerConfig.WorkflowExecutorConfig.Scheme {
case common.AWS:
// Do nothing, this special case depends on the execution manager and launch plan manager having been
// initialized and is handled in GetWorkflowExecutor.
break
case common.Local:
fallthrough
default:
logger.Infof(context.Background(),
"Using default noop workflow executor implementation for cloud provider type [%s]",
cfg.SchedulerConfig.EventSchedulerConfig.Scheme)
workflowExecutor = noop.NewWorkflowExecutor()
}
return &workflowScheduler{
cfg: cfg,
eventScheduler: eventScheduler,
workflowExecutor: workflowExecutor,
}
}