-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgoop.go
196 lines (167 loc) · 4.79 KB
/
goop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package goop
import (
"fmt"
"log"
"time"
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
"google.golang.org/api/option"
)
// Goop - Wrapper for Google Cloud Pub/Sub.
type Goop struct {
Context context.Context
Project string
Opts []option.ClientOption
Client *pubsub.Client
}
// CreateClient - Create a new Pub/Sub client.
//
// Return:
// error - An error if it occurred.
func (g *Goop) CreateClient() error {
pubSubClient, pubSubErr := pubsub.NewClient(g.Context, g.Project, g.Opts...)
if pubSubErr != nil {
return pubSubErr
}
// Set the client.
g.Client = pubSubClient
return nil
}
// CreateTopic - Create a new Pub/Sub topic if it does not already exist.
//
// Params:
// topicName string - The name of the topic to create.
//
// Return:
// *pubsub.Topic - Pointer to a Pub/Sub topic.
// error - An error if it occurred.
func (g *Goop) CreateTopic(topicName string) (*pubsub.Topic, error) {
fmt.Println("Creating Pub/Sub topic.")
// Check if the topic already exists.
topic := g.Client.Topic(topicName)
ok, err := topic.Exists(g.Context)
if err != nil {
return nil, err
}
// Bail out here if the topic already exists.
if ok {
fmt.Printf("Pub/Sub topic (%s) already exists.\n", topicName)
return topic, nil
}
// Create a topic to subscribe to.
topic, err = g.Client.CreateTopic(g.Context, topicName)
if err != nil {
return nil, err
}
fmt.Printf("Created Pub/Sub topic (%s).\n", topicName)
return topic, nil
}
// CreateSubscription - Create a new Pub/Sub subscription if it does not already
// exist.
//
// Params:
// topic *pubsub.Topic - The topic to get messages from.
// subName string - The name of the subscription to use.
//
// Return:
// *pubsub.Topic - Pointer to a Pub/Sub subsription.
// error - An error if it occurred.
func (g *Goop) CreateSubscription(topic *pubsub.Topic, subName string) (*pubsub.Subscription, error) {
fmt.Println("Creating Pub/Sub subscription.")
// Check if the subscription already exists.
sub := g.Client.Subscription(subName)
ok, err := sub.Exists(g.Context)
if err != nil {
return nil, err
}
// Bail out here if the subscription already exists.
if ok {
fmt.Printf("Pub/Sub subscription (%s) already exists.\n", subName)
return sub, nil
}
// Create a subscription.
sub, err = g.Client.CreateSubscription(g.Context, subName, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 20 * time.Second,
})
if err != nil {
return nil, err
}
fmt.Printf("Created Pub/Sub subscription (%s).\n", subName)
return sub, nil
}
// PullMessages - Pull messages from a Pub/Sub subscription.
//
// Params:
// subName string - The name of the subscription to use.
// messageCallback func(msg *pubsub.Message, g *Goop) error - Callback
// function to fire for each message pulled from the topic.
//
// Return:
// error - An error if it occurred.
func (g *Goop) PullMessages(subName string, messageCallback func(msg *pubsub.Message, g *Goop) error) error {
// Get the subscription.
sub := g.Client.Subscription(subName)
// Receive the message from Pub/Sub.
err := sub.Receive(g.Context, func(ctx context.Context, msg *pubsub.Message) {
// Run our callback function.
if err := messageCallback(msg, g); err != nil {
log.Printf("Could not process message: %s", err)
}
// Acknowledge receipt of the message all the time since there is no
// case where an invalid message should be put back into the queue
// for re-delivery.
msg.Ack()
})
if err != nil {
return err
}
return nil
}
// Publish - Publish a message to a Pub/Sub topic.
//
// Params:
// topicName string - The name of the topic to publish to.
// msg string - The message body to publish.
//
// Return:
// error - An error if it occurred.
func (g *Goop) Publish(topicName, msg string) error {
// Get the topic.
t := g.Client.Topic(topicName)
// Publish the message.
result := t.Publish(g.Context, &pubsub.Message{
Data: []byte(msg),
})
// Check the result for any errors.
_, err := result.Get(g.Context)
if err != nil {
return err
}
return nil
}
// PublishWithAttributes - Publish a message with attributes to a Pub/Sub topic.
//
// Params:
// topicName string - The name of the topic to publish to.
// msg string - The message body to publish.
// attributes map[string]string - Map of attributes (key/value) to publish
// along with the message body.
//
// Return:
// error - An error if it occurred.
func (g *Goop) PublishWithAttributes(topicName, msg string, attributes map[string]string) error {
// Get the topic.
t := g.Client.Topic(topicName)
// Publish the message.
result := t.Publish(g.Context, &pubsub.Message{
Data: []byte(msg),
Attributes: attributes,
})
// Check the result for any errors.
_, err := result.Get(g.Context)
if err != nil {
return err
}
return nil
}