-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
sqs.go
152 lines (130 loc) · 4.5 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package aws
import (
"context"
"errors"
"sort"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/awslabs/goformation/cloudformation"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/core"
"github.com/elastic/beats/x-pack/functionbeat/provider"
"github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer"
)
const batchSize = 10
// SQSConfig is the configuration for the SQS event type.
type SQSConfig struct {
Triggers []*SQSTriggerConfig `config:"triggers"`
Description string `config:"description"`
Name string `config:"name" validate:"nonzero,required"`
LambdaConfig *lambdaConfig `config:",inline"`
}
// SQSTriggerConfig configuration for the current trigger.
type SQSTriggerConfig struct {
EventSourceArn string `config:"event_source_arn"`
}
// Validate validates the configuration.
func (cfg *SQSConfig) Validate() error {
if len(cfg.Triggers) == 0 {
return errors.New("you need to specify at least one trigger")
}
return nil
}
// SQS receives events from the web service and forward them to elasticsearch.
type SQS struct {
log *logp.Logger
config *SQSConfig
}
// NewSQS creates a new function to receives events from a SQS queue.
func NewSQS(provider provider.Provider, cfg *common.Config) (provider.Function, error) {
config := &SQSConfig{LambdaConfig: DefaultLambdaConfig}
if err := cfg.Unpack(config); err != nil {
return nil, err
}
return &SQS{log: logp.NewLogger("sqs"), config: config}, nil
}
// Run starts the lambda function and wait for web triggers.
func (s *SQS) Run(_ context.Context, client core.Client) error {
lambda.Start(s.createHandler(client))
return nil
}
func (s *SQS) createHandler(client core.Client) func(request events.SQSEvent) error {
return func(request events.SQSEvent) error {
s.log.Debugf("The handler receives %d events", len(request.Records))
events := transformer.SQS(request)
if err := client.PublishAll(events); err != nil {
s.log.Errorf("Could not publish events to the pipeline, error: %+v", err)
return err
}
client.Wait()
return nil
}
}
// Name return the name of the lambda function.
func (s *SQS) Name() string {
return "sqs"
}
// Template returns the cloudformation template for configuring the service with the specified triggers.
func (s *SQS) Template() *cloudformation.Template {
template := cloudformation.NewTemplate()
prefix := func(suffix string) string {
return normalizeResourceName("fnb" + s.config.Name + suffix)
}
for _, trigger := range s.config.Triggers {
resourceName := prefix("SQS") + normalizeResourceName(trigger.EventSourceArn)
template.Resources[resourceName] = &cloudformation.AWSLambdaEventSourceMapping{
BatchSize: batchSize,
EventSourceArn: trigger.EventSourceArn,
FunctionName: cloudformation.GetAtt(prefix(""), "Arn"),
}
}
return template
}
// Policies returns a slice of policies to add to the lambda role.
func (s *SQS) Policies() []cloudformation.AWSIAMRole_Policy {
resources := make([]string, len(s.config.Triggers))
for idx, trigger := range s.config.Triggers {
resources[idx] = trigger.EventSourceArn
}
// Give us a chance to generate the same document indenpendant of the changes,
// to help with updates.
sort.Strings(resources)
// SQS Roles permissions:
// - lambda:CreateEventSourceMapping
// - lambda:ListEventSourceMappings
// - lambda:ListFunctions
//
// Lambda Role permission
// - sqs:ChangeMessageVisibility
// - sqs:DeleteMessage
// - sqs:GetQueueAttributes
// - sqs:ReceiveMessage
policies := []cloudformation.AWSIAMRole_Policy{
cloudformation.AWSIAMRole_Policy{
PolicyName: cloudformation.Join("-", []string{"fnb", "sqs", s.config.Name}),
PolicyDocument: map[string]interface{}{
"Statement": []map[string]interface{}{
map[string]interface{}{
"Action": []string{
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
},
"Effect": "Allow",
"Resource": resources,
},
},
},
},
}
return policies
}
// LambdaConfig returns the configuration to use when creating the lambda.
func (s *SQS) LambdaConfig() *lambdaConfig {
return s.config.LambdaConfig
}