-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.go
132 lines (93 loc) · 2.62 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package sns
import (
"context"
"encoding/json"
"github.com/americanas-go/errors"
"github.com/americanas-go/faas/util"
"github.com/americanas-go/log"
"github.com/aws/aws-sdk-go-v2/aws"
awssns "github.com/aws/aws-sdk-go-v2/service/sns"
v2 "github.com/cloudevents/sdk-go/v2"
"github.com/matryer/try"
"golang.org/x/sync/errgroup"
"github.com/americanas-go/ignite/aws/aws-sdk-go.v2/client/sns"
)
// Client represents a sns client.
type Client struct {
client sns.Client
}
// NewClient creates a new sns client.
func NewClient(c sns.Client) *Client {
return &Client{client: c}
}
// Publish publishes an event slice.
func (p *Client) Publish(ctx context.Context, events []*v2.Event) error {
logger := log.FromContext(ctx).WithTypeOf(*p)
logger.Info("publishing to awssns")
if len(events) > 0 {
return p.send(ctx, events)
}
logger.Warnf("no messages were reported for posting")
return nil
}
func (p *Client) send(parentCtx context.Context, events []*v2.Event) (err error) {
logger := log.FromContext(parentCtx).WithTypeOf(*p)
g, gctx := errgroup.WithContext(parentCtx)
defer gctx.Done()
for _, e := range events {
event := e
g.Go(func() (err error) {
var rawMessage []byte
rawMessage, err = p.rawMessage(event)
if err != nil {
return errors.Wrap(err, errors.Internalf("error on marshal. %s", err.Error()))
}
message := Message{
Default: string(rawMessage),
}
messageBytes, _ := json.Marshal(message)
messageStr := string(messageBytes)
input := &awssns.PublishInput{
Message: aws.String(messageStr),
MessageStructure: aws.String("json"),
TopicArn: aws.String(event.Subject()),
}
logger.WithField("subject", event.Subject()).
WithField("id", event.ID()).
Info(string(rawMessage))
err = try.Do(func(attempt int) (bool, error) {
var err error
err = p.client.Publish(gctx, input)
if err != nil {
return attempt < 5, errors.NewInternal(err, "could not be published in awssns")
}
return false, nil
})
return err
})
}
return g.Wait()
}
func (p *Client) rawMessage(out *v2.Event) (rawMessage []byte, err error) {
exts := out.Extensions()
source, ok := exts["target"]
if ok {
s := source.(string)
if s == "data" {
var data interface{}
err = out.DataAs(&data)
if err != nil {
return nil, errors.Wrap(err, errors.Internalf("error on data as. %s", err.Error()))
}
rawMessage, err = json.Marshal(data)
} else {
rawMessage, err = util.JSONBytes(*out)
}
} else {
rawMessage, err = util.JSONBytes(*out)
}
return rawMessage, err
}
type Message struct {
Default string `json:"default"`
}