/
pubsub.go
263 lines (238 loc) · 7.44 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"context"
"net/http"
"sort"
"strings"
"google.golang.org/api/googleapi"
"google.golang.org/api/pubsub/v1"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/server/auth"
)
// createPubSubService returns configured instance of pubsub.Service.
func createPubSubService(c context.Context, pubSubURL string) (*pubsub.Service, error) {
// In real mode (not a unit test), use authenticated transport.
var transport http.RoundTripper
if pubSubURL == "" {
var err error
transport, err = auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(pubsub.PubsubScope))
if err != nil {
return nil, err
}
} else {
transport = http.DefaultTransport
}
service, err := pubsub.New(&http.Client{Transport: transport})
if err != nil {
return nil, err
}
if pubSubURL != "" {
service.BasePath = pubSubURL
}
return service, nil
}
// configureTopic creates PubSub topic and subscription, allowing given
// publisher to send messages to the topic.
//
// Both topic and subscription names are fully qualified PubSub resource IDs,
// e.g. "projects/<id>/topics/<id>".
//
// Idempotent.
func configureTopic(c context.Context, topic, sub, pushURL, publisher, pubSubURL string) error {
service, err := createPubSubService(c, pubSubURL)
if err != nil {
return err
}
// Create the topic. Ignore HTTP 409 (it means the topic already exists).
logging.Infof(c, "Ensuring topic %q exists", topic)
_, err = service.Projects.Topics.Create(topic, &pubsub.Topic{}).Context(c).Do()
if err != nil && !isHTTP409(err) {
logging.Errorf(c, "Failed - %s", err)
return transient.Tag.Apply(err)
}
// Create the subscription to this topic. Ignore HTTP 409.
logging.Infof(c, "Ensuring subscription %q exists", sub)
_, err = service.Projects.Subscriptions.Create(sub, &pubsub.Subscription{
Topic: topic,
AckDeadlineSeconds: 70, // GAE request timeout plus some spare time
PushConfig: &pubsub.PushConfig{
PushEndpoint: pushURL, // if "", the subscription will be pull based
},
}).Context(c).Do()
if err != nil && !isHTTP409(err) {
logging.Errorf(c, "Failed - %s", err)
return transient.Tag.Apply(err)
}
// Modify topic's IAM policy to allow publisher to publish.
if strings.HasSuffix(publisher, ".gserviceaccount.com") {
publisher = "serviceAccount:" + publisher
} else {
publisher = "user:" + publisher
}
logging.Infof(c, "Ensuring %q can publish to the topic", publisher)
// Do two attempts, to account for possible race condition. Two attempts
// should be enough to handle concurrent calls to 'configureTopic': second
// attempt will read already correct IAM policy and will just end right away.
for attempt := 0; attempt < 2; attempt++ {
err = modifyTopicIAMPolicy(c, service, topic, func(policy iamPolicy) error {
policy.grantRole("roles/pubsub.publisher", publisher)
return nil
})
if err == nil {
return nil
}
logging.Errorf(c, "Failed - %s", err)
}
return transient.Tag.Apply(err)
}
// pullSubcription pulls one message from PubSub subscription.
//
// Used on dev server only. Returns the message and callback to call to
// acknowledge the message.
func pullSubcription(c context.Context, subscription, pubSubURL string) (*pubsub.PubsubMessage, func(), error) {
service, err := createPubSubService(c, pubSubURL)
if err != nil {
return nil, nil, err
}
resp, err := service.Projects.Subscriptions.Pull(subscription, &pubsub.PullRequest{
ReturnImmediately: true,
MaxMessages: 1,
}).Context(c).Do()
if err != nil {
return nil, nil, err
}
switch len(resp.ReceivedMessages) {
case 0:
return nil, nil, nil
case 1:
ackID := resp.ReceivedMessages[0].AckId
ackCb := func() {
_, err := service.Projects.Subscriptions.Acknowledge(subscription, &pubsub.AcknowledgeRequest{
AckIds: []string{ackID},
}).Context(c).Do()
if err != nil {
logging.Errorf(c, "Failed to acknowledge the message - %s", err)
}
}
return resp.ReceivedMessages[0].Message, ackCb, nil
default:
panic(errors.New("received more than one message from PubSub while asking only one"))
}
}
func isHTTP409(err error) bool {
apiErr, _ := err.(*googleapi.Error)
return apiErr != nil && apiErr.Code == 409
}
// modifyTopicIAMPolicy reads IAM policy, calls callback to modify it, and then
// puts it back (if callback really changed it).
func modifyTopicIAMPolicy(c context.Context, service *pubsub.Service, topic string, cb func(iamPolicy) error) error {
policy, err := service.Projects.Topics.GetIamPolicy(topic).Context(c).Do()
if err != nil {
return err
}
// Convert the policy to a map. Make a copy to be mutated by the callback.
// Need to store the original to detect changes done by the callback.
roles := iamPolicyFromBindings(policy.Bindings)
clone := roles.clone()
if err = cb(clone); err != nil {
return err
}
// Skip storing if no changes are made.
if clone.isEqual(roles) {
return nil
}
// Convert back to IamPolicy struct.
logging.Infof(c, "Updating IAM policy of %q", topic)
request := &pubsub.SetIamPolicyRequest{
Policy: &pubsub.Policy{
Bindings: clone.toBindings(),
Etag: policy.Etag,
},
}
_, err = service.Projects.Topics.SetIamPolicy(topic, request).Context(c).Do()
return err
}
// iamPolicy is the IAM policy doc: map {role -> set of members}.
type iamPolicy map[string]stringset.Set
func iamPolicyFromBindings(bindings []*pubsub.Binding) iamPolicy {
roles := make(iamPolicy, len(bindings))
for _, b := range bindings {
roles[b.Role] = stringset.NewFromSlice(b.Members...)
}
return roles
}
func (p iamPolicy) toBindings() []*pubsub.Binding {
// Sort by role name.
roles := make([]string, 0, len(p))
for role := range p {
roles = append(roles, role)
}
sort.Strings(roles)
// Sort members list too.
bindings := make([]*pubsub.Binding, 0, len(p))
for _, role := range roles {
members := p[role].ToSlice()
sort.Strings(members)
bindings = append(bindings, &pubsub.Binding{
Role: role,
Members: members,
})
}
return bindings
}
func (p iamPolicy) clone() iamPolicy {
clone := make(iamPolicy, len(p))
for k, v := range p {
clone[k] = v.Dup()
}
return clone
}
func (p iamPolicy) isEqual(another iamPolicy) bool {
if len(p) != len(another) {
return false
}
for k, right := range another {
left := p[k]
if left.Len() != right.Len() {
return false
}
equal := true
left.Iter(func(item string) bool {
if !right.Has(item) {
equal = false
return false
}
return true
})
if !equal {
return false
}
}
return true
}
func (p iamPolicy) grantRole(role, principal string) {
switch existing := p[role]; {
case existing != nil && existing.Has(principal): // already there
return
case existing != nil: // the role is there, but not the principal
existing.Add(principal)
default:
p[role] = stringset.NewFromSlice(principal)
}
}