This repository has been archived by the owner on Sep 24, 2022. It is now read-only.
forked from carousell/Orion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
message_queue.go
122 lines (109 loc) · 3.85 KB
/
message_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package message_queue
import (
"log"
"strconv"
goPubSub "cloud.google.com/go/pubsub"
cache "github.com/patrickmn/go-cache"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
//MessageQueue Intergace to the wrappers around Google pubsub lib calls
type MessageQueue interface {
Init(pubSubKey string, gProject string) error
Close() error
Publish(string, *PubSubData) *goPubSub.PublishResult
GetResult(ctx context.Context, result *goPubSub.PublishResult) (string, error)
SubscribeMessages(ctx context.Context, subscriptionId string, subscribeFunction SubscribeFunction) error
}
//PubSubData represents msg format to be used for writing messages to pubsub
type PubSubData struct {
Id string
Timestamp int64
Data []byte
}
//PubSubQueue Required configs for interacting with pubsub
type PubSubQueue struct {
pubSubKey string
gProject string
PubsubClient *goPubSub.Client
ctx context.Context
topics *cache.Cache
}
//NewMessageQueue create a new object to MessageQueue interface
func NewMessageQueue(enabled bool, serviceAccountKey string, project string) MessageQueue {
MessageQueue := new(PubSubQueue)
if enabled {
MessageQueue.Init(serviceAccountKey, project)
}
return MessageQueue
}
//Init initiates connection to Google Pubsub
func (pubsubqueue *PubSubQueue) Init(pubSubKey string, gProject string) error {
var err error
pubsubqueue.pubSubKey = pubSubKey
pubsubqueue.gProject = gProject
pubsubqueue.ctx, pubsubqueue.PubsubClient, err = pubsubqueue.configurePubsub()
if err != nil {
log.Fatalln("Error in client connections to PubSub", err)
return err
}
pubsubqueue.topics = cache.New(cache.NoExpiration, cache.NoExpiration)
return nil
}
//Close Closes all topic connections to pubsub
func (pubsubqueue *PubSubQueue) Close() error {
for _, item := range pubsubqueue.topics.Items() {
if topic, ok := item.Object.(*goPubSub.Topic); ok {
topic.Stop()
}
}
return nil
}
func (pubsubqueue *PubSubQueue) configurePubsub() (context.Context, *goPubSub.Client, error) {
var err error
key := []byte(pubsubqueue.pubSubKey)
conf, err := google.JWTConfigFromJSON(key, "https://www.googleapis.com/auth/pubsub")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
ts := conf.TokenSource(ctx)
ps, err := goPubSub.NewClient(ctx, pubsubqueue.gProject, option.WithTokenSource(ts))
if err != nil {
log.Fatal("Error in client connections to PubSub", err)
return nil, nil, err
}
return ctx, ps, nil
}
//Publish publishes the given message to the topic
func (pubsubqueue *PubSubQueue) Publish(topicName string, pubSubData *PubSubData) *goPubSub.PublishResult {
var topic *goPubSub.Topic
if t, ok := pubsubqueue.topics.Get(topicName); ok {
if to, ok := t.(*goPubSub.Topic); ok {
topic = to
}
}
if topic == nil {
topic = pubsubqueue.PubsubClient.Topic(topicName)
pubsubqueue.topics.SetDefault(topicName, topic)
}
attributes := map[string]string{
"id": pubSubData.Id,
"timestamp": strconv.FormatInt(pubSubData.Timestamp, 10),
}
publishResult := topic.Publish(pubsubqueue.ctx, &goPubSub.Message{Data: pubSubData.Data, Attributes: attributes})
return publishResult
}
//GetResult gets results of the publish call, can be used to make publish a sync call
func (pubsubqueue *PubSubQueue) GetResult(ctx context.Context, result *goPubSub.PublishResult) (string, error) {
return result.Get(ctx)
}
//SubscribeFunction receives messages from a subscription
type SubscribeFunction func(ctx context.Context, msg *goPubSub.Message)
//SubscribeMessages Initales a subscriber call and assigns to given subscriber function
func (pubsubqueue *PubSubQueue) SubscribeMessages(ctx context.Context, subscriptionId string, subscribeFunction SubscribeFunction) error {
subscription := pubsubqueue.PubsubClient.Subscription(subscriptionId)
err := subscription.Receive(ctx, subscribeFunction)
return err
}