This repository has been archived by the owner on Apr 22, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
146 lines (124 loc) · 4.02 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package legacy
import (
"fmt"
"time"
"github.com/giantswarm/microerror"
"github.com/giantswarm/micrologger"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
const (
// WatchTimeOut is the time to wait on watches against the Kubernetes API
// before giving up and throwing an error.
WatchTimeOut = 90 * time.Second
namespaceDefault = "default"
)
// ServiceConfig represents the configuration used to create a certificate TPR
// service.
type ServiceConfig struct {
// Dependencies.
K8sClient kubernetes.Interface
Logger micrologger.Logger
}
// DefaultServiceConfig provides a default configuration to create a new
// certificate TPR service by best effort.
func DefaultServiceConfig() ServiceConfig {
return ServiceConfig{
// Dependencies.
K8sClient: nil,
Logger: nil,
}
}
// NewService creates a new configured certificate TPR service.
func NewService(config ServiceConfig) (*Service, error) {
// Dependencies.
if config.K8sClient == nil {
return nil, microerror.Maskf(invalidConfigError, "config.K8sClient must not be empty")
}
if config.Logger == nil {
return nil, microerror.Maskf(invalidConfigError, "config.Logger must not be empty")
}
newService := &Service{
// Dependencies.
k8sClient: config.K8sClient,
logger: config.Logger,
}
return newService, nil
}
// Service implements the certificate TPR service.
type Service struct {
// Dependencies.
k8sClient kubernetes.Interface
logger micrologger.Logger
}
// SearchCerts watches for all secrets of a cluster and returns it as
// assets bundle.
func (s *Service) SearchCerts(clusterID string) (AssetsBundle, error) {
assetsBundle := make(AssetsBundle)
for _, componentName := range ClusterComponents {
ab, err := s.SearchCertsForComponent(clusterID, componentName.String())
if err != nil {
return nil, microerror.Mask(err)
}
for k, v := range ab {
assetsBundle[k] = v
}
}
return assetsBundle, nil
}
// SearchCertsForComponent watches for secrets of a single cluster component and
// returns it as assets bundle.
func (s *Service) SearchCertsForComponent(clusterID, componentName string) (AssetsBundle, error) {
// TODO we should also do a list. In case the secrets have already been
// created we might miss them with only watching.
watcher, err := s.k8sClient.Core().Secrets(namespaceDefault).Watch(metav1.ListOptions{
// Select only secrets that match the given component and the given cluster
// clusterID.
LabelSelector: fmt.Sprintf(
"%s=%s, %s=%s",
ComponentLabel,
componentName,
ClusterIDLabel,
clusterID,
),
})
if err != nil {
return nil, microerror.Mask(err)
}
assetsBundle := make(AssetsBundle)
defer watcher.Stop()
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return nil, microerror.Maskf(secretsRetrievalFailedError, "secrets channel was already closed")
}
switch event.Type {
case watch.Added:
secret := event.Object.(*corev1.Secret)
component := ClusterComponent(secret.Labels[ComponentLabel])
if !ValidComponent(component, ClusterComponents) {
return nil, microerror.Maskf(secretsRetrievalFailedError, "unknown clusterComponent %s", component)
}
for _, assetType := range TLSAssetTypes {
asset, ok := secret.Data[assetType.String()]
if !ok {
return nil, microerror.Maskf(secretsRetrievalFailedError, "malformed secret was missing %v asset", assetType)
}
assetsBundle[AssetsBundleKey{component, assetType}] = asset
}
return assetsBundle, nil
case watch.Deleted:
// Noop. Ignore deleted events. These are handled by the certificate
// operator.
case watch.Error:
return nil, microerror.Maskf(secretsRetrievalFailedError, "there was an error in the watcher: %v", apierrors.FromObject(event.Object))
}
case <-time.After(WatchTimeOut):
return nil, microerror.Maskf(secretsRetrievalFailedError, "timeout while watching secret %#q", componentName)
}
}
}