forked from pivotal-cf/cred-alert
/
pub_sub_enqueuer.go
66 lines (54 loc) · 1.28 KB
/
pub_sub_enqueuer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package queue
import (
"cred-alert/crypto"
"encoding/base64"
"cloud.google.com/go/pubsub"
"code.cloudfoundry.org/lager"
"golang.org/x/net/context"
)
//go:generate counterfeiter . Topic
type Topic interface {
Publish(context.Context, *pubsub.Message) *pubsub.PublishResult
}
type pubSubEnqueuer struct {
logger lager.Logger
topic Topic
signer crypto.Signer
}
func NewPubSubEnqueuer(logger lager.Logger, topic Topic, signer crypto.Signer) Enqueuer {
return &pubSubEnqueuer{
logger: logger,
topic: topic,
signer: signer,
}
}
func (p *pubSubEnqueuer) Enqueue(task Task) error {
payload := []byte(task.Payload())
signature, err := p.signer.Sign(payload)
if err != nil {
p.logger.Error("failed-to-sign", err)
return err
}
endcodedSignature := base64.StdEncoding.EncodeToString(signature)
message := &pubsub.Message{
Attributes: map[string]string{
"id": task.ID(),
"type": task.Type(),
"signature": endcodedSignature,
},
Data: payload,
}
ctx := context.TODO()
res := p.topic.Publish(ctx, message)
id, err := res.Get(ctx)
if err != nil {
p.logger.Error("failed-to-publish", err)
return err
}
p.logger.Info("successfully-published", lager.Data{
"id": task.ID(),
"pubsub-id": id,
"type": task.Type(),
})
return nil
}