This repository has been archived by the owner on Aug 2, 2023. It is now read-only.
/
service.go
145 lines (123 loc) · 3.85 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package randomkeytpr
import (
"fmt"
"time"
"github.com/giantswarm/microerror"
"github.com/giantswarm/micrologger"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apismetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
const (
// WatchTimeOut is the time to wait on watches against the Kubernetes API
// before giving up and throwing an error.
WatchTimeOut = 90 * time.Second
)
// ServiceConfig represents the configuration used to create a certificate TPR
// service.
type ServiceConfig struct {
// Dependencies.
K8sClient kubernetes.Interface
Logger micrologger.Logger
}
// DefaultServiceConfig provides a default configuration to create a new
// certificate TPR service by best effort.
func DefaultServiceConfig() ServiceConfig {
return ServiceConfig{
// Dependencies.
K8sClient: nil,
Logger: nil,
}
}
// NewService creates a new configured certificate TPR service.
func NewService(config ServiceConfig) (*Service, error) {
// Dependencies.
if config.K8sClient == nil {
return nil, microerror.Maskf(invalidConfigError, "config.K8sClient must not be empty")
}
if config.Logger == nil {
return nil, microerror.Maskf(invalidConfigError, "config.Logger must not be empty")
}
newService := &Service{
// Dependencies.
k8sClient: config.K8sClient,
logger: config.Logger,
}
return newService, nil
}
// Service implements the certificate TPR service.
type Service struct {
// Dependencies.
k8sClient kubernetes.Interface
logger micrologger.Logger
}
// SearchKeys watches for keys secrets of a cluster
func (s *Service) SearchKeys(clusterID string) (map[Key][]byte, error) {
keys := make(map[Key][]byte)
for _, keyType := range RandomKeyTypes {
ab, err := s.SearchKeysForKeytype(clusterID, keyType.String())
if err != nil {
return nil, microerror.Mask(err)
}
for k, v := range ab {
keys[k] = v
}
}
return keys, nil
}
// SearchKeysForKeytype watches for keys secrets of a single cluster keytype and
// returns it as assets bundle.
func (s *Service) SearchKeysForKeytype(clusterID, keyType string) (map[Key][]byte, error) {
// TODO we should also do a list. In case the secrets have already been
// created we might miss them with only watching.
s.logger.Log("debug", fmt.Sprintf("searching secret: %s=%s, %s=%s", KeyLabel, keyType, ClusterIDLabel, clusterID))
watcher, err := s.k8sClient.Core().Secrets(v1.NamespaceDefault).Watch(apismetav1.ListOptions{
// Select only secrets that match the given Keytype and the given cluster
// clusterID.
LabelSelector: fmt.Sprintf(
"%s=%s, %s=%s",
KeyLabel,
keyType,
ClusterIDLabel,
clusterID,
),
})
if err != nil {
return nil, microerror.Mask(err)
}
keys := make(map[Key][]byte)
defer watcher.Stop()
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return nil, microerror.Maskf(secretsRetrievalFailedError, "secrets channel was already closed")
}
switch event.Type {
case watch.Added:
secret := event.Object.(*v1.Secret)
key := Key(secret.Labels[KeyLabel])
if !ValidKey(key) {
return nil, microerror.Maskf(secretsRetrievalFailedError, "unknown clusterKey %s", key)
}
for _, k := range RandomKeyTypes {
asset, ok := secret.Data[k.String()]
if !ok {
return nil, microerror.Maskf(secretsRetrievalFailedError, "malformed secret was missing %v asset", keyType)
}
keys[k] = asset
}
return keys, nil
case watch.Deleted:
// Noop. Ignore deleted events. These are handled by the certificate
// operator.
case watch.Error:
return nil, microerror.Maskf(secretsRetrievalFailedError, "there was an error in the watcher: %v", apierrors.FromObject(event.Object))
}
case <-time.After(WatchTimeOut):
return nil, microerror.Maskf(secretsRetrievalFailedError, "timed out waiting for secrets")
}
}
}