-
Notifications
You must be signed in to change notification settings - Fork 916
/
waiter.go
81 lines (70 loc) · 2.36 KB
/
waiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/*
Copyright 2021 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package initializer
import (
"context"
"time"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/crossplane/crossplane-runtime/pkg/errors"
"github.com/crossplane/crossplane-runtime/pkg/logging"
)
const (
errGetCRD = "cannot get crd"
errFmtTimeoutExceeded = "%f seconds timeout for waiting CRDs to be ready is exceeded"
)
// NewCRDWaiter returns a new *CRDWaiter initializer.
func NewCRDWaiter(names []string, timeout time.Duration, period time.Duration, log logging.Logger) *CRDWaiter {
return &CRDWaiter{Names: names, Timeout: timeout, Period: period, log: log}
}
// CRDWaiter blocks the execution until all the CRDs whose names are given are
// deployed to the cluster.
type CRDWaiter struct {
Names []string
Timeout time.Duration
Period time.Duration
log logging.Logger
}
// Run continuously checks whether the list of CRDs whose names are given are
// present in the cluster.
func (cw *CRDWaiter) Run(ctx context.Context, kube client.Client) error {
timeout := time.After(cw.Timeout)
ticker := time.NewTicker(cw.Period)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cw.log.Info("Waiting for required CRDs to be present", "names", cw.Names, "poll-interval", cw.Period)
present := 0
for _, n := range cw.Names {
crd := &v1.CustomResourceDefinition{}
nn := types.NamespacedName{Name: n}
err := kube.Get(ctx, nn, crd)
if err != nil && !kerrors.IsNotFound(err) {
return errors.Wrap(err, errGetCRD)
}
if kerrors.IsNotFound(err) {
break
}
present++
}
if present == len(cw.Names) {
return nil
}
case <-timeout:
return errors.Errorf(errFmtTimeoutExceeded, cw.Timeout.Seconds())
}
}
}