/
default.go
236 lines (195 loc) · 6.84 KB
/
default.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors.
//
// SPDX-License-Identifier: Apache-2.0
package cmd
import (
"context"
"errors"
goflag "flag"
"fmt"
"os"
"time"
flag "github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/yaml"
lsinstall "github.com/gardener/landscaper/apis/core/install"
lsv1alpha1 "github.com/gardener/landscaper/apis/core/v1alpha1"
"github.com/gardener/landscaper/controller-utils/pkg/logging"
"github.com/gardener/landscaper/pkg/api"
"github.com/gardener/landscaper/pkg/deployer/lib"
lsutils "github.com/gardener/landscaper/pkg/utils"
"github.com/gardener/landscaper/pkg/utils/read_write_layer"
)
// DefaultOptions defines all default deployer options.
type DefaultOptions struct {
LsUncachedClient client.Client
LsCachedClient client.Client
HostUncachedClient client.Client
HostCachedClient client.Client
configPath string
LsKubeconfig string
Log logging.Logger
LsMgr manager.Manager
HostMgr manager.Manager
decoder runtime.Decoder
FinishedObjectCache *lsutils.FinishedObjectCache
}
// NewDefaultOptions creates new default options for a deployer.
func NewDefaultOptions(deployerScheme *runtime.Scheme) *DefaultOptions {
return &DefaultOptions{
decoder: api.NewDecoder(deployerScheme),
}
}
func (o *DefaultOptions) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&o.configPath, "config", "", "Specify the path to the configuration file")
fs.StringVar(&o.LsKubeconfig, "landscaper-kubeconfig", "", "Specify the path to the landscaper kubeconfig cluster")
logging.InitFlags(fs)
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
}
// Complete parses all options and flags and initializes the basic functions
func (o *DefaultOptions) Complete() error {
log, err := logging.GetLogger()
if err != nil {
return err
}
log = log.WithName("deployer")
o.Log = log
ctrl.SetLogger(log.Logr())
ctx := logging.NewContext(context.Background(), o.Log)
hostAndResourceClusterDifferent := len(o.LsKubeconfig) != 0
burst, qps := lsutils.GetHostClientRequestRestrictions(log, hostAndResourceClusterDifferent)
opts := manager.Options{
LeaderElection: false,
Metrics: metricsserver.Options{BindAddress: "0"}, // disable the metrics serving by default
Cache: cache.Options{SyncPeriod: ptr.To[time.Duration](time.Hour * 24 * 1000)},
}
hostRestConfig, err := ctrl.GetConfig()
if err != nil {
return fmt.Errorf("unable to get host kubeconfig: %w", err)
}
hostRestConfig = lsutils.RestConfigWithModifiedClientRequestRestrictions(log, hostRestConfig, burst, qps)
o.HostMgr, err = ctrl.NewManager(hostRestConfig, opts)
if err != nil {
return fmt.Errorf("unable to setup host manager")
}
o.LsMgr = o.HostMgr
if hostAndResourceClusterDifferent {
data, err := os.ReadFile(o.LsKubeconfig)
if err != nil {
return fmt.Errorf("unable to read landscaper kubeconfig from %s: %w", o.LsKubeconfig, err)
}
lsRestConfig, err := clientcmd.RESTConfigFromKubeConfig(data)
if err != nil {
return fmt.Errorf("unable to build landscaper cluster rest client: %w", err)
}
burst, qps = lsutils.GetResourceClientRequestRestrictions(log)
lsRestConfig = lsutils.RestConfigWithModifiedClientRequestRestrictions(log, lsRestConfig, burst, qps)
o.LsMgr, err = ctrl.NewManager(lsRestConfig, opts)
if err != nil {
return fmt.Errorf("unable to setup ls manager")
}
}
lsinstall.Install(o.LsMgr.GetScheme())
o.LsUncachedClient, o.LsCachedClient, o.HostUncachedClient, o.HostCachedClient, err = lsutils.ClientsFromManagers(o.LsMgr, o.HostMgr)
if err != nil {
return err
}
if err := o.prepareFinishedObjectCache(ctx); err != nil {
return err
}
return nil
}
func (o *DefaultOptions) prepareFinishedObjectCache(ctx context.Context) error {
log, ctx := logging.FromContextOrNew(ctx, nil)
o.FinishedObjectCache = lsutils.NewFinishedObjectCache()
namespaces := &v1.NamespaceList{}
if err := read_write_layer.ListNamespaces(ctx, o.LsUncachedClient, namespaces, read_write_layer.R000093); err != nil {
return err
}
perfTotal := lsutils.StartPerformanceMeasurement(&log, "prepare finished object for dis")
defer perfTotal.Stop()
for _, namespace := range namespaces.Items {
perf := lsutils.StartPerformanceMeasurement(&log, "prepare finished object cache for dis: fetch from namespace "+namespace.Name)
diList := &lsv1alpha1.DeployItemList{}
if err := read_write_layer.ListDeployItems(ctx, o.LsUncachedClient, diList, read_write_layer.R000094,
client.InNamespace(namespace.Name)); err != nil {
return err
}
perf.Stop()
perf = lsutils.StartPerformanceMeasurement(&log, "prepare finished object cache for dis: add for namespace "+namespace.Name)
for diIndex := range diList.Items {
di := &diList.Items[diIndex]
if lib.IsDeployItemFinished(di) {
o.FinishedObjectCache.Add(&di.ObjectMeta)
}
}
perf.Stop()
}
return nil
}
// StartManagers starts the host and landscaper managers.
func (o *DefaultOptions) StartManagers(ctx context.Context, deployerJobs ...DeployerJob) error {
o.Log.Info("Starting the controllers")
eg, ctx := errgroup.WithContext(ctx)
if o.LsMgr != o.HostMgr {
eg.Go(func() error {
if err := o.HostMgr.Start(ctx); err != nil {
return fmt.Errorf("error while running host manager: %w", err)
}
return nil
})
o.Log.Info("Waiting for host cluster cache to sync")
if !o.HostMgr.GetCache().WaitForCacheSync(ctx) {
return errors.New("unable to sync host cluster cache")
}
o.Log.Info("Cache of host cluster successfully synced")
}
eg.Go(func() error {
if err := o.LsMgr.Start(ctx); err != nil {
return fmt.Errorf("error while running landscaper manager: %w", err)
}
return nil
})
for i := range deployerJobs {
nextJob := deployerJobs[i]
eg.Go(func() error {
if err := nextJob.StartDeployerJob(ctx); err != nil {
return fmt.Errorf("error while running deployerJob: %w", err)
}
return nil
})
}
return eg.Wait()
}
// GetConfig reads and parses the configured configuration file.
func (o *DefaultOptions) GetConfig(obj runtime.Object) error {
if len(o.configPath) == 0 {
return nil
}
data, err := os.ReadFile(o.configPath)
if err != nil {
return fmt.Errorf("uable to read config from %q: %w", o.configPath, err)
}
if _, _, err := o.decoder.Decode(data, nil, obj); err != nil {
return err
}
if o.Log.Enabled(logging.INFO) {
// print configuration if enabled
configBytes, err := yaml.Marshal(obj)
if err != nil {
o.Log.Error(err, "unable to marshal configuration")
} else {
fmt.Println(string(configBytes))
}
}
return nil
}