/
eventbusimpl.go
65 lines (54 loc) · 1.58 KB
/
eventbusimpl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package sns
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sns/snsiface"
"github.com/cevixe/aws-sdk-go/aws/env"
"github.com/cevixe/aws-sdk-go/aws/factory"
"github.com/cevixe/aws-sdk-go/aws/model"
"github.com/cevixe/aws-sdk-go/aws/util"
"github.com/pkg/errors"
"os"
)
type eventBusImpl struct {
eventBusTopic string
snsClient snsiface.SNSAPI
}
func NewSnsEventBus(
eventBusTopic string,
snsClient snsiface.SNSAPI) model.AwsEventBus {
return &eventBusImpl{
eventBusTopic: eventBusTopic,
snsClient: snsClient,
}
}
func NewDefaultSnsEventBus(awsFactory factory.AwsFactory) model.AwsEventBus {
eventBusTopicArn := os.Getenv(env.CevixeEventBusTopicArn)
snsClient := awsFactory.SnsClient()
return NewSnsEventBus(eventBusTopicArn, snsClient)
}
func (e eventBusImpl) PublishEvent(ctx context.Context, event *model.AwsEventRecord) {
messageJson := util.MarshalJsonString(map[string]interface{}{
"default": util.MarshalJsonString(event),
})
var input = &sns.PublishInput{
TopicArn: aws.String(e.eventBusTopic),
Message: aws.String(messageJson),
MessageStructure: aws.String("json"),
MessageAttributes: map[string]*sns.MessageAttributeValue{
"event_type": {
DataType: aws.String("String"),
StringValue: event.EventType,
},
"event_class": {
DataType: aws.String("String"),
StringValue: event.EventClass,
},
},
}
_, err := e.snsClient.PublishWithContext(ctx, input)
if err != nil {
panic(errors.Wrap(err, "cannot publish event to sns"))
}
}