This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
registry.go
54 lines (46 loc) · 1.54 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package impl
import (
"context"
"sync"
interfaces2 "github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flytestdlib/logger"
)
// Implements interfaces.WorkflowExecutorRegistry.
type workflowExecutorRegistry struct {
// m is a read/write lock used for fetching and updating the K8sWorkflowExecutors.
m sync.RWMutex
executor interfaces.WorkflowExecutor
defaultExecutor interfaces.WorkflowExecutor
}
func (r *workflowExecutorRegistry) Register(executor interfaces.WorkflowExecutor) {
r.m.Lock()
defer r.m.Unlock()
if r.executor == nil {
logger.Debugf(context.TODO(), "setting flyte k8s workflow executor [%s]", executor.ID())
} else {
logger.Debugf(context.TODO(), "updating flyte k8s workflow executor [%s]", executor.ID())
}
r.executor = executor
}
func (r *workflowExecutorRegistry) RegisterDefault(executor interfaces.WorkflowExecutor) {
r.m.Lock()
defer r.m.Unlock()
if r.defaultExecutor == nil {
logger.Debugf(context.TODO(), "setting default flyte k8s workflow executor [%s]", executor.ID())
} else {
logger.Debugf(context.TODO(), "updating default flyte k8s workflow executor [%s]", executor.ID())
}
r.defaultExecutor = executor
}
func (r *workflowExecutorRegistry) GetExecutor() interfaces.WorkflowExecutor {
r.m.RLock()
defer r.m.RUnlock()
if r.executor == nil {
return r.defaultExecutor
}
return r.executor
}
func NewRegistry() interfaces2.WorkflowExecutorRegistry {
return &workflowExecutorRegistry{}
}