/
poller.go
163 lines (138 loc) · 4.26 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package secretsmanager
import (
"github.com/go-logr/logr"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/aws/aws-sdk-go/service/secretsmanager/secretsmanageriface"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
)
type SecretGetter interface {
GetSecret(secretID *string) (string, string, error)
}
type Secrets map[string]PolledSecretMeta
type Poller struct {
PolledSecrets Secrets
getSMClient func(string) (secretsmanageriface.SecretsManagerAPI, error)
defaultSearchRole string
smLastPolledOn time.Time
cachedSecretValuesByRole *lru.TwoQueueCache
wg sync.WaitGroup
errs chan<- error
quit chan bool
Log logr.Logger
}
// SecretMeta meta information of a polled secret
type PolledSecretMeta struct {
Tags map[string]string
CurrentVersionID string
UpdatedAt time.Time
}
// New creates a new poller, will send polling or other non critical errors through the errs channel
func New(interval time.Duration, errs chan error, getSMClient func(string) (secretsmanageriface.SecretsManagerAPI, error), defaultSearchRole string, logger logr.Logger) (*Poller, error) {
p := &Poller{
errs: errs,
getSMClient: getSMClient,
quit: make(chan bool),
defaultSearchRole: defaultSearchRole,
Log: logger,
}
var err error
// init a lru cache that can hold 10000 items (arbit value for now)
// this doesn't init the size to value set here, but is only used to figure if eviction is required or not
p.cachedSecretValuesByRole, err = lru.New2Q(10000)
if err != nil {
return nil, err
}
// poll in sync the first time to ensure that we have a populated cache before reconciler kicks in
p.PolledSecrets, err = p.fetchSecrets()
if err != nil {
return nil, err
}
p.Log.Info("Fetched secrets from AWS after starting", "numberOfSecrets", len(p.PolledSecrets))
go func() {
p.wg.Add(1)
ticker := time.NewTicker(interval)
p.poll(ticker)
ticker.Stop()
p.wg.Done()
}()
return p, nil
}
func (p *Poller) Stop() {
p.quit <- true
p.wg.Wait()
}
// poller polls secrets manager at `tick` defined intervals, caches it locally,
func (p *Poller) poll(ticker *time.Ticker) {
for {
select {
case _ = <-ticker.C:
polledSecrets, err := p.fetchSecrets()
if err != nil {
p.errs <- errors.WithMessagef(err, "failed polling secrets")
} else {
p.PolledSecrets = polledSecrets
p.Log.Info("Fetched secrets from AWS", "numberOfSecres", len(p.PolledSecrets))
}
case <-p.quit:
close(p.errs)
return
}
}
}
func (p *Poller) fetchSecrets() (Secrets, error) {
fetchedSecrets := make(Secrets)
allSecrets := []*secretsmanager.SecretListEntry{}
input := &secretsmanager.ListSecretsInput{
MaxResults: aws.Int64(100),
}
smClient, err := p.getSMClient(p.defaultSearchRole)
if err != nil {
return nil, err
}
err = smClient.ListSecretsPages(input, func(page *secretsmanager.ListSecretsOutput, lastPage bool) bool {
allSecrets = append(allSecrets, page.SecretList...)
return !lastPage
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
return nil, errors.WithMessagef(aerr, "failed listing secrets, error code: %s", aerr.Code())
}
return nil, errors.WithMessagef(err, "failed listing secrets")
}
for _, secret := range allSecrets {
if secret.DeletedDate != nil {
continue
}
versionID, err := getCurrentVersion(secret.SecretVersionsToStages)
if err != nil {
continue
}
secretTags := map[string]string{}
for _, t := range secret.Tags {
secretTags[*t.Key] = *t.Value
}
fetchedSecrets[*secret.Name] = PolledSecretMeta{
Tags: secretTags,
CurrentVersionID: versionID,
UpdatedAt: *secret.LastChangedDate,
}
}
p.smLastPolledOn = time.Now().UTC()
return fetchedSecrets, nil
}
// getCurrentVersion finds the versionid with AWSCURRENT
func getCurrentVersion(secretVersionToStages map[string][]*string) (string, error) {
for uuid, stages := range secretVersionToStages {
for _, stage := range stages {
if *stage == "AWSCURRENT" {
return uuid, nil
}
}
}
return "", errors.New("version with stage AWSCURRENT not found")
}