-
Notifications
You must be signed in to change notification settings - Fork 3.2k
/
config.go
110 lines (101 loc) · 3.41 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package controller
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/yaml"
"github.com/argoproj/argo-workflows/v3"
"github.com/argoproj/argo-workflows/v3/config"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
)
func (wfc *WorkflowController) updateConfig(v interface{}) error {
config := v.(*config.Config)
bytes, err := yaml.Marshal(config)
if err != nil {
return err
}
log.Info("Configuration:\n" + string(bytes))
wfc.Config = *config
if wfc.session != nil {
err := wfc.session.Close()
if err != nil {
return err
}
}
wfc.session = nil
wfc.artifactRepositories = artifactrepositories.New(wfc.kubeclientset, wfc.namespace, &wfc.Config.ArtifactRepository)
wfc.offloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo
wfc.wfArchive = sqldb.NullWorkflowArchive
wfc.archiveLabelSelector = labels.Everything()
persistence := wfc.Config.Persistence
if persistence != nil {
log.Info("Persistence configuration enabled")
session, tableName, err := sqldb.CreateDBSession(wfc.kubeclientset, wfc.namespace, persistence)
if err != nil {
return err
}
log.Info("Persistence Session created successfully")
if !persistence.SkipMigration {
err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background())
if err != nil {
return err
}
} else {
log.Info("DB migration is disabled")
}
wfc.session = session
if persistence.NodeStatusOffload {
wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if err != nil {
return err
}
log.Info("Node status offloading is enabled")
} else {
log.Info("Node status offloading is disabled")
}
if persistence.Archive {
instanceIDService := instanceid.NewService(wfc.Config.InstanceID)
wfc.archiveLabelSelector, err = persistence.GetArchiveLabelSelector()
if err != nil {
return err
}
wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.managedNamespace, instanceIDService)
log.Info("Workflow archiving is enabled")
} else {
log.Info("Workflow archiving is disabled")
}
} else {
log.Info("Persistence configuration disabled")
}
wfc.hydrator = hydrator.New(wfc.offloadNodeStatusRepo)
wfc.updateEstimatorFactory()
wfc.rateLimiter = wfc.newRateLimiter()
return nil
}
func (wfc *WorkflowController) newRateLimiter() *rate.Limiter {
return rate.NewLimiter(rate.Limit(wfc.Config.GetResourceRateLimit().Limit), wfc.Config.GetResourceRateLimit().Burst)
}
// executorImage returns the image to use for the workflow executor
func (wfc *WorkflowController) executorImage() string {
if wfc.cliExecutorImage != "" {
return wfc.cliExecutorImage
}
if v := wfc.Config.GetExecutor().Image; v != "" {
return v
}
return fmt.Sprintf("quay.io/argoproj/argoexec:" + argo.ImageTag())
}
// executorImagePullPolicy returns the imagePullPolicy to use for the workflow executor
func (wfc *WorkflowController) executorImagePullPolicy() apiv1.PullPolicy {
if wfc.cliExecutorImagePullPolicy != "" {
return apiv1.PullPolicy(wfc.cliExecutorImagePullPolicy)
} else {
return wfc.Config.GetExecutor().ImagePullPolicy
}
}