/
snsnotifier.go
119 lines (103 loc) · 2.85 KB
/
snsnotifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
Copyright 2021 Adevinta
*/
package notify
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sns/snsiface"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)
// Notifier represents a generic notifier API.
type Notifier interface {
Push(message interface{}, attributes map[string]string) error
}
// Config holds the required sqs config information.
type Config struct {
Endpoint string `mapstructure:"endpoint"`
TopicArn string `mapstructure:"topic_arn"`
Enabled bool `mapstructure:"enabled"`
}
// SNSNotifier send push event to a sns topic.
type SNSNotifier struct {
c Config
sns snsiface.SNSAPI
l log.Logger
}
// NewSNSNotifier creates a new SNSNotifier with the given configuration.
func NewSNSNotifier(c Config, log log.Logger) (*SNSNotifier, error) {
arn, err := arn.Parse(c.TopicArn)
if err != nil {
_ = level.Error(log).Log("ParsingSNSTopicARN", err)
return nil, err
}
sess, err := session.NewSession()
if err != nil {
_ = level.Error(log).Log("CreatingAWSSession", err)
return nil, err
}
awsCfg := aws.NewConfig()
if arn.Region != "" {
awsCfg = awsCfg.WithRegion(arn.Region)
}
if c.Endpoint != "" {
awsCfg = awsCfg.WithEndpoint(c.Endpoint)
}
n := &SNSNotifier{
c: c,
l: log,
sns: sns.New(sess, awsCfg),
}
return n, nil
}
// Push pushes a notification to the configured sns topic.
func (s *SNSNotifier) Push(message interface{}, attributes map[string]string) error {
if !s.c.Enabled {
_ = level.Info(s.l).Log("PushNotification", "Disabled")
return nil
}
_ = level.Debug(s.l).Log("PushNotification", "Pushing")
content, err := json.Marshal(&message)
if err != nil {
return err
}
input := &sns.PublishInput{
Message: aws.String(string(content)),
TopicArn: aws.String(s.c.TopicArn),
MessageAttributes: prepareMessageAttributes(attributes),
}
output, err := s.sns.Publish(input)
if err != nil {
_ = level.Error(s.l).Log("ErrorPushNotification", err, "Message", aws.StringValue(input.Message))
return (err)
}
messageID := ""
if output != nil {
messageID = aws.StringValue(output.MessageId)
}
_ = level.Debug(s.l).Log(
"PushNotification", "OK",
"Message", aws.StringValue(input.Message),
"MessageID", messageID)
return nil
}
func prepareMessageAttributes(attributes map[string]string) map[string]*sns.MessageAttributeValue {
var attrs map[string]*sns.MessageAttributeValue
if attributes != nil {
attrs = make(map[string]*sns.MessageAttributeValue)
t := "String"
for n, v := range attributes {
// See: https://bryce.is/writing/code/jekyll/update/2015/11/01/3-go-gotchas.html
localV := v
attrs[n] = &sns.MessageAttributeValue{
DataType: &t,
StringValue: &localV,
}
}
}
return attrs
}