This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 62
/
application_configuration.go
410 lines (349 loc) · 15.1 KB
/
application_configuration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package interfaces
import (
"github.com/flyteorg/flytestdlib/config"
"golang.org/x/time/rate"
)
// This configuration section is used to for initiating the database connection with the store that holds registered
// entities (e.g. workflows, tasks, launch plans...)
// This struct specifically maps to the flyteadmin config yaml structure.
type DbConfigSection struct {
// The host name of the database server
Host string `json:"host"`
// The port name of the database server
Port int `json:"port"`
// The database name
DbName string `json:"dbname"`
// The database user who is connecting to the server.
User string `json:"username"`
// Either Password or PasswordPath must be set.
// The Password resolves to the database password.
Password string `json:"password"`
PasswordPath string `json:"passwordPath"`
// See http://gorm.io/docs/connecting_to_the_database.html for available options passed, in addition to the above.
ExtraOptions string `json:"options"`
// Whether or not to start the database connection with debug mode enabled.
Debug bool `json:"debug"`
}
// This represents a configuration used for initiating database connections much like DbConfigSection, however the
// password is *resolved* in this struct and therefore it is used as the value the runtime provider returns to callers
// requesting the database config.
type DbConfig struct {
Host string `json:"host"`
Port int `json:"port"`
DbName string `json:"dbname"`
User string `json:"username"`
Password string `json:"password"`
ExtraOptions string `json:"options"`
Debug bool `json:"debug"`
}
// This configuration is the base configuration to start admin
type ApplicationConfig struct {
// The RoleName key inserted as an annotation (https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/)
// in Flyte Workflow CRDs created in the CreateExecution flow. The corresponding role value is defined in the
// launch plan that is used to create the execution.
RoleNameKey string `json:"roleNameKey"`
// Top-level name applied to all metrics emitted by the application.
MetricsScope string `json:"metricsScope"`
// Determines which port the profiling server used for admin monitoring and application debugging uses.
ProfilerPort int `json:"profilerPort"`
// This defines the nested path on the configured external storage provider where workflow closures are remotely
// offloaded.
MetadataStoragePrefix []string `json:"metadataStoragePrefix"`
// Event version to be used for Flyte workflows
EventVersion int `json:"eventVersion"`
// Specifies the shared buffer size which is used to queue asynchronous event writes.
AsyncEventsBufferSize int `json:"asyncEventsBufferSize"`
// Controls the maximum number of task nodes that can be run in parallel for the entire workflow.
// This is useful to achieve fairness. Note: MapTasks are regarded as one unit,
// and parallelism/concurrency of MapTasks is independent from this.
MaxParallelism int32 `json:"maxParallelism"`
}
func (a *ApplicationConfig) GetRoleNameKey() string {
return a.RoleNameKey
}
func (a *ApplicationConfig) GetMetricsScope() string {
return a.MetricsScope
}
func (a *ApplicationConfig) GetProfilerPort() int {
return a.ProfilerPort
}
func (a *ApplicationConfig) GetMetadataStoragePrefix() []string {
return a.MetadataStoragePrefix
}
func (a *ApplicationConfig) GetEventVersion() int {
return a.EventVersion
}
func (a *ApplicationConfig) GetAsyncEventsBufferSize() int {
return a.AsyncEventsBufferSize
}
func (a *ApplicationConfig) GetMaxParallelism() int32 {
return a.MaxParallelism
}
// This section holds common config for AWS
type AWSConfig struct {
Region string `json:"region"`
}
// This section holds common config for GCP
type GCPConfig struct {
ProjectID string `json:"projectId"`
}
// This section holds configuration for the event scheduler used to schedule workflow executions.
type EventSchedulerConfig struct {
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
// scheme is used.
Scheme string `json:"scheme"`
// Deprecated : Some cloud providers require a region to be set.
Region string `json:"region"`
// Deprecated : The role assumed to register and activate schedules.
ScheduleRole string `json:"scheduleRole"`
// Deprecated : The name of the queue for which scheduled events should enqueue.
TargetName string `json:"targetName"`
// Deprecated : Optional: The application-wide prefix to be applied for schedule names.
ScheduleNamePrefix string `json:"scheduleNamePrefix"`
AWSSchedulerConfig *AWSSchedulerConfig `json:"aws"`
FlyteSchedulerConfig *FlyteSchedulerConfig `json:"local"`
}
func (e *EventSchedulerConfig) GetScheme() string {
return e.Scheme
}
func (e *EventSchedulerConfig) GetRegion() string {
return e.Region
}
func (e *EventSchedulerConfig) GetScheduleRole() string {
return e.ScheduleRole
}
func (e *EventSchedulerConfig) GetTargetName() string {
return e.TargetName
}
func (e *EventSchedulerConfig) GetScheduleNamePrefix() string {
return e.ScheduleNamePrefix
}
func (e *EventSchedulerConfig) GetAWSSchedulerConfig() *AWSSchedulerConfig {
return e.AWSSchedulerConfig
}
func (e *EventSchedulerConfig) GetFlyteSchedulerConfig() *FlyteSchedulerConfig {
return e.FlyteSchedulerConfig
}
type AWSSchedulerConfig struct {
// Some cloud providers require a region to be set.
Region string `json:"region"`
// The role assumed to register and activate schedules.
ScheduleRole string `json:"scheduleRole"`
// The name of the queue for which scheduled events should enqueue.
TargetName string `json:"targetName"`
// Optional: The application-wide prefix to be applied for schedule names.
ScheduleNamePrefix string `json:"scheduleNamePrefix"`
}
func (a *AWSSchedulerConfig) GetRegion() string {
return a.Region
}
func (a *AWSSchedulerConfig) GetScheduleRole() string {
return a.ScheduleRole
}
func (a *AWSSchedulerConfig) GetTargetName() string {
return a.TargetName
}
func (a *AWSSchedulerConfig) GetScheduleNamePrefix() string {
return a.ScheduleNamePrefix
}
// FlyteSchedulerConfig is the config for native or default flyte scheduler
type FlyteSchedulerConfig struct {
}
// This section holds configuration for the executor that processes workflow scheduled events fired.
type WorkflowExecutorConfig struct {
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
// scheme is used.
Scheme string `json:"scheme"`
// Deprecated : Some cloud providers require a region to be set.
Region string `json:"region"`
// Deprecated : The name of the queue onto which scheduled events will enqueue.
ScheduleQueueName string `json:"scheduleQueueName"`
// Deprecated : The account id (according to whichever cloud provider scheme is used) that has permission to read from the above
// queue.
AccountID string `json:"accountId"`
AWSWorkflowExecutorConfig *AWSWorkflowExecutorConfig `json:"aws"`
FlyteWorkflowExecutorConfig *FlyteWorkflowExecutorConfig `json:"local"`
}
func (w *WorkflowExecutorConfig) GetScheme() string {
return w.Scheme
}
func (w *WorkflowExecutorConfig) GetRegion() string {
return w.Region
}
func (w *WorkflowExecutorConfig) GetScheduleScheduleQueueName() string {
return w.ScheduleQueueName
}
func (w *WorkflowExecutorConfig) GetAccountID() string {
return w.AccountID
}
func (w *WorkflowExecutorConfig) GetAWSWorkflowExecutorConfig() *AWSWorkflowExecutorConfig {
return w.AWSWorkflowExecutorConfig
}
func (w *WorkflowExecutorConfig) GetFlyteWorkflowExecutorConfig() *FlyteWorkflowExecutorConfig {
return w.FlyteWorkflowExecutorConfig
}
type AWSWorkflowExecutorConfig struct {
// Some cloud providers require a region to be set.
Region string `json:"region"`
// The name of the queue onto which scheduled events will enqueue.
ScheduleQueueName string `json:"scheduleQueueName"`
// The account id (according to whichever cloud provider scheme is used) that has permission to read from the above
// queue.
AccountID string `json:"accountId"`
}
func (a *AWSWorkflowExecutorConfig) GetRegion() string {
return a.Region
}
func (a *AWSWorkflowExecutorConfig) GetScheduleScheduleQueueName() string {
return a.ScheduleQueueName
}
func (a *AWSWorkflowExecutorConfig) GetAccountID() string {
return a.AccountID
}
// FlyteWorkflowExecutorConfig specifies the workflow executor configuration for the native flyte scheduler
type FlyteWorkflowExecutorConfig struct {
// This allows to control the number of TPS that hit admin using the scheduler.
// eg : 100 TPS will send at the max 100 schedule requests to admin per sec.
// Burst specifies burst traffic count
AdminRateLimit *AdminRateLimit `json:"adminRateLimit"`
}
func (f *FlyteWorkflowExecutorConfig) GetAdminRateLimit() *AdminRateLimit {
return f.AdminRateLimit
}
type AdminRateLimit struct {
Tps rate.Limit `json:"tps"`
Burst int `json:"burst"`
}
func (f *AdminRateLimit) GetTps() rate.Limit {
return f.Tps
}
func (f *AdminRateLimit) GetBurst() int {
return f.Burst
}
// This configuration is the base configuration for all scheduler-related set-up.
type SchedulerConfig struct {
// Determines which port the profiling server used for scheduler monitoring and application debugging uses.
ProfilerPort config.Port `json:"profilerPort"`
EventSchedulerConfig EventSchedulerConfig `json:"eventScheduler"`
WorkflowExecutorConfig WorkflowExecutorConfig `json:"workflowExecutor"`
// Specifies the number of times to attempt recreating a workflow executor client should there be any disruptions.
ReconnectAttempts int `json:"reconnectAttempts"`
// Specifies the time interval to wait before attempting to reconnect the workflow executor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}
func (s *SchedulerConfig) GetEventSchedulerConfig() EventSchedulerConfig {
return s.EventSchedulerConfig
}
func (s *SchedulerConfig) GetWorkflowExecutorConfig() WorkflowExecutorConfig {
return s.WorkflowExecutorConfig
}
func (s *SchedulerConfig) GetReconnectAttempts() int {
return s.ReconnectAttempts
}
func (s *SchedulerConfig) GetReconnectDelaySeconds() int {
return s.ReconnectDelaySeconds
}
// Configuration specific to setting up signed urls.
type SignedURL struct {
// The amount of time for which a signed URL is valid.
DurationMinutes int `json:"durationMinutes"`
// The principal that signs the URL. This is only applicable to GCS URL.
SigningPrincipal string `json:"signingPrincipal"`
}
// This configuration handles all requests to get remote data such as execution inputs & outputs.
type RemoteDataConfig struct {
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
// scheme is used.
Scheme string `json:"scheme"`
// Some cloud providers require a region to be set.
Region string `json:"region"`
SignedURL SignedURL `json:"signedUrls"`
// Specifies the max size in bytes for which execution data such as inputs and outputs will be populated in line.
MaxSizeInBytes int64 `json:"maxSizeInBytes"`
}
// This section handles configuration for the workflow notifications pipeline.
type NotificationsPublisherConfig struct {
// The topic which notifications use, e.g. AWS SNS topics.
TopicName string `json:"topicName"`
}
// This section handles configuration for processing workflow events.
type NotificationsProcessorConfig struct {
// The name of the queue onto which workflow notifications will enqueue.
QueueName string `json:"queueName"`
// The account id (according to whichever cloud provider scheme is used) that has permission to read from the above
// queue.
AccountID string `json:"accountId"`
}
type EmailServerConfig struct {
ServiceName string `json:"serviceName"`
// Only one of these should be set.
APIKeyEnvVar string `json:"apiKeyEnvVar"`
APIKeyFilePath string `json:"apiKeyFilePath"`
}
// This section handles the configuration of notifications emails.
type NotificationsEmailerConfig struct {
// For use with external email services (mailchimp/sendgrid)
EmailerConfig EmailServerConfig `json:"emailServerConfig"`
// The optionally templatized subject used in notification emails.
Subject string `json:"subject"`
// The optionally templatized sender used in notification emails.
Sender string `json:"sender"`
// The optionally templatized body the sender used in notification emails.
Body string `json:"body"`
}
// This section handles configuration for the workflow notifications pipeline.
type EventsPublisherConfig struct {
// The topic which events should be published, e.g. node, task, workflow
TopicName string `json:"topicName"`
// Event types: task, node, workflow executions
EventTypes []string `json:"eventTypes"`
}
type ExternalEventsConfig struct {
Enable bool `json:"enable"`
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
// scheme is used.
Type string `json:"type"`
AWSConfig AWSConfig `json:"aws"`
GCPConfig GCPConfig `json:"gcp"`
// Publish events to a pubsub tops
EventsPublisherConfig EventsPublisherConfig `json:"eventsPublisher"`
// Number of times to attempt recreating a notifications processor client should there be any disruptions.
ReconnectAttempts int `json:"reconnectAttempts"`
// Specifies the time interval to wait before attempting to reconnect the notifications processor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}
// Configuration specific to notifications handling
type NotificationsConfig struct {
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
// scheme is used.
Type string `json:"type"`
// Deprecated: Please use AWSConfig instead.
Region string `json:"region"`
AWSConfig AWSConfig `json:"aws"`
GCPConfig GCPConfig `json:"gcp"`
NotificationsPublisherConfig NotificationsPublisherConfig `json:"publisher"`
NotificationsProcessorConfig NotificationsProcessorConfig `json:"processor"`
NotificationsEmailerConfig NotificationsEmailerConfig `json:"emailer"`
// Number of times to attempt recreating a notifications processor client should there be any disruptions.
ReconnectAttempts int `json:"reconnectAttempts"`
// Specifies the time interval to wait before attempting to reconnect the notifications processor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}
// Domains are always globally set in the application config, whereas individual projects can be individually registered.
type Domain struct {
// Unique identifier for a domain.
ID string `json:"id"`
// Human readable name for a domain.
Name string `json:"name"`
}
type DomainsConfig = []Domain
// Defines the interface to return top-level config structs necessary to start up a flyteadmin application.
type ApplicationConfiguration interface {
GetDbConfig() DbConfig
GetTopLevelConfig() *ApplicationConfig
GetSchedulerConfig() *SchedulerConfig
GetRemoteDataConfig() *RemoteDataConfig
GetNotificationsConfig() *NotificationsConfig
GetDomainsConfig() *DomainsConfig
GetExternalEventsConfig() *ExternalEventsConfig
}