-
Notifications
You must be signed in to change notification settings - Fork 5.1k
/
dynamodb.go
217 lines (196 loc) · 6.56 KB
/
dynamodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package dynamodb
import (
"context"
"errors"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/cenk/backoff"
"github.com/traefik/traefik/job"
"github.com/traefik/traefik/log"
"github.com/traefik/traefik/provider"
"github.com/traefik/traefik/safe"
"github.com/traefik/traefik/types"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configuration for provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash" export:"true"`
AccessKeyID string `description:"The AWS credentials access key to use for making requests"`
RefreshSeconds int `description:"Polling interval (in seconds)" export:"true"`
Region string `description:"The AWS region to use for requests" export:"true"`
SecretAccessKey string `description:"The AWS credentials secret key to use for making requests"`
TableName string `description:"The AWS dynamodb table that stores configuration for traefik" export:"true"`
Endpoint string `description:"The endpoint of a dynamodb. Used for testing with a local dynamodb"`
}
type dynamoClient struct {
db dynamodbiface.DynamoDBAPI
}
// Init the provider
func (p *Provider) Init(constraints types.Constraints) error {
return p.BaseProvider.Init(constraints)
}
// createClient configures aws credentials and creates a dynamoClient
func (p *Provider) createClient() (*dynamoClient, error) {
log.Info("Creating Provider client...")
sess, err := session.NewSession()
if err != nil {
return nil, err
}
if p.Region == "" {
return nil, errors.New("no Region provided for Provider")
}
cfg := &aws.Config{
Region: &p.Region,
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: p.AccessKeyID,
SecretAccessKey: p.SecretAccessKey,
},
},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
defaults.RemoteCredProvider(*(defaults.Config()), defaults.Handlers()),
}),
}
if p.Trace {
cfg.WithLogger(aws.LoggerFunc(func(args ...interface{}) {
log.Debug(args...)
}))
}
if p.Endpoint != "" {
cfg.Endpoint = aws.String(p.Endpoint)
}
return &dynamoClient{
dynamodb.New(sess, cfg),
}, nil
}
// scanTable scans the given table and returns slice of all items in the table
func (p *Provider) scanTable(client *dynamoClient) ([]map[string]*dynamodb.AttributeValue, error) {
log.Debugf("Scanning Provider table: %s ...", p.TableName)
params := &dynamodb.ScanInput{
TableName: aws.String(p.TableName),
}
items := make([]map[string]*dynamodb.AttributeValue, 0)
err := client.db.ScanPages(params,
func(page *dynamodb.ScanOutput, lastPage bool) bool {
items = append(items, page.Items...)
return !lastPage
})
if err != nil {
log.Errorf("Failed to scan Provider table %s", p.TableName)
return nil, err
}
log.Debugf("Successfully scanned Provider table %s", p.TableName)
return items, nil
}
// buildConfiguration retrieves items from dynamodb and converts them into Backends and Frontends in a Configuration
func (p *Provider) buildConfiguration(client *dynamoClient) (*types.Configuration, error) {
items, err := p.scanTable(client)
if err != nil {
return nil, err
}
log.Debugf("Number of Items retrieved from Provider: %d", len(items))
backends := make(map[string]*types.Backend)
frontends := make(map[string]*types.Frontend)
// unmarshal dynamoAttributes into Backends and Frontends
for i, item := range items {
log.Debugf("Provider Item: %d\n%v", i, item)
// verify the type of each item by checking to see if it has
// the corresponding type, backend or frontend map
if backend, exists := item["backend"]; exists {
log.Debug("Unmarshaling backend from Provider...")
tmpBackend := &types.Backend{}
err = dynamodbattribute.Unmarshal(backend, tmpBackend)
if err != nil {
log.Errorf(err.Error())
} else {
backends[*item["name"].S] = tmpBackend
log.Debug("Backend from Provider unmarshalled successfully")
}
} else if frontend, exists := item["frontend"]; exists {
log.Debugf("Unmarshaling frontend from Provider...")
tmpFrontend := &types.Frontend{}
err = dynamodbattribute.Unmarshal(frontend, tmpFrontend)
if err != nil {
log.Errorf(err.Error())
} else {
frontends[*item["name"].S] = tmpFrontend
log.Debug("Frontend from Provider unmarshalled successfully")
}
} else {
log.Warnf("Error in format of Provider Item: %v", item)
}
}
return &types.Configuration{
Backends: backends,
Frontends: frontends,
}, nil
}
// Provide provides the configuration to traefik via the configuration channel
// if watch is enabled it polls dynamodb
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
handleCanceled := func(ctx context.Context, err error) error {
if ctx.Err() == context.Canceled || err == context.Canceled {
return nil
}
return err
}
pool.Go(func(stop chan bool) {
ctx, cancel := context.WithCancel(context.Background())
safe.Go(func() {
<-stop
cancel()
})
operation := func() error {
awsClient, err := p.createClient()
if err != nil {
return handleCanceled(ctx, err)
}
configuration, err := p.buildConfiguration(awsClient)
if err != nil {
return handleCanceled(ctx, err)
}
configurationChan <- types.ConfigMessage{
ProviderName: "dynamodb",
Configuration: configuration,
}
if p.Watch {
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
defer reload.Stop()
for {
log.Debug("Watching Provider...")
select {
case <-reload.C:
configuration, err := p.buildConfiguration(awsClient)
if err != nil {
return handleCanceled(ctx, err)
}
configurationChan <- types.ConfigMessage{
ProviderName: "dynamodb",
Configuration: configuration,
}
case <-ctx.Done():
return handleCanceled(ctx, ctx.Err())
}
}
}
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("Provider error: %s time: %v", err.Error(), time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Failed to connect to Provider. %s", err.Error())
}
})
return nil
}