/
service.go
299 lines (266 loc) · 9.55 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tumble
import (
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"go.chromium.org/luci/appengine/gaemiddleware"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/server/router"
ds "go.chromium.org/gae/service/datastore"
"go.chromium.org/gae/service/info"
)
const (
baseURL = "/internal/" + baseName
fireAllTasksURL = baseURL + "/fire_all_tasks"
processShardPattern = baseURL + "/process_shard/:shard_id/at/:timestamp"
transientHTTPHeader = "X-LUCI-Tumble-Transient"
)
// Service is an instance of a Tumble service. It installs its handlers into an
// HTTP router and services Tumble request tasks.
type Service struct {
// Namespaces is a function that returns the datastore namespaces that Tumble
// will poll.
//
// If nil, Tumble will be executed against all namespaces registered in the
// datastore.
Namespaces func(context.Context) ([]string, error)
}
// InstallHandlers installs http handlers.
//
// 'base' is usually gaemiddleware.BaseProd(), but can also be its derivative
// if something else it needed in the context.
func (s *Service) InstallHandlers(r *router.Router, base router.MiddlewareChain) {
// GET so that this can be invoked from cron
r.GET(fireAllTasksURL, base.Extend(gaemiddleware.RequireCron), s.FireAllTasksHandler)
r.POST(processShardPattern, base.Extend(gaemiddleware.RequireTaskQueue(baseName)),
func(ctx *router.Context) {
loop := ctx.Request.URL.Query().Get("single") == ""
s.ProcessShardHandler(ctx, loop)
})
}
// FireAllTasksHandler is an HTTP handler that expects `logging` and `luci/gae`
// services to be installed into the context.
//
// FireAllTasksHandler verifies that it was called within an Appengine Cron
// request, and then invokes the FireAllTasks function.
func (s *Service) FireAllTasksHandler(c *router.Context) {
if err := s.FireAllTasks(c.Context); err != nil {
c.Writer.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(c.Writer, "fire_all_tasks failed: %s", err)
} else {
c.Writer.Write([]byte("ok"))
}
}
// FireAllTasks searches for work in all namespaces, and fires off a process
// task for any shards it finds that have at least one Mutation present to
// ensure that no work languishes forever. This may not be needed in
// a constantly-loaded system with good tumble key distribution.
func (s *Service) FireAllTasks(c context.Context) error {
cfg := getConfig(c)
namespaces, err := s.getNamespaces(c, cfg)
if err != nil {
return err
}
// Probe each namespace in parallel. Each probe function reports its own
// errors, so the work pool will never return any non-nil error response.
var errCount, taskCount counter
_ = parallel.WorkPool(cfg.NumGoroutines, func(ch chan<- func() error) {
for _, ns := range namespaces {
ns := ns
ch <- func() error {
// Generate a list of all shards.
allShards := make([]taskShard, 0, cfg.TotalShardCount(ns))
for i := uint64(0); i < cfg.TotalShardCount(ns); i++ {
allShards = append(allShards, taskShard{i, minTS})
}
s.fireAllTasksForNamespace(c, cfg, ns, allShards, &errCount, &taskCount)
return nil
}
}
})
if errCount > 0 {
logging.Errorf(c, "Encountered %d error(s).", errCount)
return errors.New("errors were encountered while probing for tasks")
}
logging.Debugf(c, "Successfully probed %d namespace(s) and fired %d tasks(s).",
len(namespaces), taskCount)
return err
}
func (s *Service) fireAllTasksForNamespace(c context.Context, cfg *Config, ns string, allShards []taskShard,
errCount, taskCount *counter) {
// Enter the supplied namespace.
logging.Infof(c, "Firing all tasks for namespace %q", ns)
c = info.MustNamespace(c, ns)
if ns != "" {
c = logging.SetField(c, "namespace", ns)
}
// First, check if the namespace has *any* Mutations.
q := ds.NewQuery("tumble.Mutation").KeysOnly(true).Limit(1)
switch amt, err := ds.Count(c, q); {
case err != nil:
logging.WithError(err).Errorf(c, "Error querying for Mutations")
errCount.inc()
return
case amt == 0:
logging.Infof(c, "No Mutations registered for this namespace.")
return
}
// We have at least one Mutation for this namespace. Iterate through all
// shards and dispatch a processing task for each one that has Mutations.
//
// Track shards that we find work for. After scanning is complete, fire off
// tasks for all identified shards.
triggerShards := make(map[taskShard]struct{}, len(allShards))
for _, shrd := range allShards {
amt, err := ds.Count(c, processShardQuery(c, cfg, shrd.shard).Limit(1))
if err != nil {
logging.Fields{
logging.ErrorKey: err,
"shard": shrd.shard,
}.Errorf(c, "Error querying for shards")
errCount.inc()
break
}
if amt > 0 {
logging.Infof(c, "Found work in shard [%d]", shrd.shard)
triggerShards[shrd] = struct{}{}
}
}
// Fire tasks for shards with identified work.
if len(triggerShards) > 0 {
logging.Infof(c, "Firing tasks for %d tasked shard(s).", len(triggerShards))
if !fireTasks(c, cfg, triggerShards, false) {
logging.Errorf(c, "Failed to fire tasks.")
errCount.inc()
} else {
taskCount.add(len(triggerShards))
}
} else {
logging.Infof(c, "No tasked shards were found.")
}
}
func (s *Service) getNamespaces(c context.Context, cfg *Config) ([]string, error) {
// Get the set of namespaces to handle.
nsFn := s.Namespaces
if nsFn == nil {
nsFn = getDatastoreNamespaces
}
namespaces, err := nsFn(c)
if err != nil {
logging.WithError(err).Errorf(c, "Failed to enumerate namespaces.")
return nil, err
}
return namespaces, nil
}
// ProcessShardHandler is an HTTP handler that expects `logging` and `luci/gae`
// services to be installed into the context.
//
// ProcessShardHandler verifies that its being run as a taskqueue task and that
// the following parameters exist and are well-formed:
// * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
// * shard_id: decimal-encoded shard identifier.
//
// ProcessShardHandler then invokes ProcessShard with the parsed parameters. It
// runs in the namespace of the task which scheduled it and processes mutations
// for that namespace.
func (s *Service) ProcessShardHandler(ctx *router.Context, loop bool) {
c, rw, p := ctx.Context, ctx.Writer, ctx.Params
tstampStr := p.ByName("timestamp")
sidStr := p.ByName("shard_id")
tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
if err != nil {
logging.Errorf(c, "bad timestamp %q", tstampStr)
rw.WriteHeader(http.StatusNotFound)
fmt.Fprintf(rw, "bad timestamp")
return
}
sid, err := strconv.ParseUint(sidStr, 10, 64)
if err != nil {
logging.Errorf(c, "bad shardID %q", tstampStr)
rw.WriteHeader(http.StatusNotFound)
fmt.Fprintf(rw, "bad shardID")
return
}
cfg := getConfig(c)
logging.Infof(c, "Processing tasks in namespace %q", info.GetNamespace(c))
// AppEngine backend instances run for 10 minute at most,
// set the overall context deadline to 9 minutes.
c, cancel := clock.WithDeadline(c, clock.Now(c).Add(9*time.Minute))
defer cancel()
err = processShard(c, cfg, time.Unix(tstamp, 0).UTC(), sid, loop)
if err != nil {
logging.Errorf(c, "failure! %s", err)
if transient.Tag.In(err) {
rw.Header().Add(transientHTTPHeader, "true")
}
rw.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(rw, "error: %s", err)
} else {
rw.Write([]byte("ok"))
}
}
// getDatastoreNamespaces returns a list of all of the namespaces in the
// datastore.
//
// This is done by issuing a datastore query for kind "__namespace__". The
// resulting keys will have IDs for the namespaces, namely:
// - The default namespace will have integer ID 1.
// - Other namespaces will have string IDs.
func getDatastoreNamespaces(c context.Context) ([]string, error) {
q := ds.NewQuery("__namespace__").KeysOnly(true)
// Query our datastore for the full set of namespaces.
var namespaceKeys []*ds.Key
if err := ds.GetAll(c, q, &namespaceKeys); err != nil {
logging.WithError(err).Errorf(c, "Failed to execute namespace query.")
return nil, err
}
namespaces := make([]string, 0, len(namespaceKeys))
for _, nk := range namespaceKeys {
// Add our namespace ID. For the default namespace, the key will have an
// integer ID of 1, so StringID will correctly be an empty string.
namespaces = append(namespaces, nk.StringID())
}
return namespaces, nil
}
// processURL creates a new url for a process shard taskqueue task, including
// the given timestamp and shard number.
func processURL(ts timestamp, shard uint64, ns string, loop bool) string {
v := strings.NewReplacer(
":shard_id", fmt.Sprint(shard),
":timestamp", strconv.FormatInt(int64(ts), 10),
).Replace(processShardPattern)
// Append our namespace query parameter. This is cosmetic, and the default
// namespace will have this query parameter omitted.
query := url.Values{}
if ns != "" {
query.Set("ns", ns)
}
if !loop {
query.Set("single", "1")
}
if len(query) > 0 {
v += "?" + query.Encode()
}
return v
}