/
service.go
412 lines (340 loc) · 12 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
package planprocessor
import (
"bufio"
"context"
"encoding/json"
"flag"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/objstore"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/tools/blocksconvert"
)
type PlanProcessor interface {
// Returns "id" that is appended to "finished" status file.
ProcessPlanEntries(ctx context.Context, entries chan blocksconvert.PlanEntry) (string, error)
}
type Config struct {
// Exported config options.
Name string
HeartbeatPeriod time.Duration
SchedulerEndpoint string
NextPlanInterval time.Duration
GrpcConfig grpcclient.Config
}
func (cfg *Config) RegisterFlags(prefix string, f *flag.FlagSet) {
cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".client", f)
host, _ := os.Hostname()
f.StringVar(&cfg.Name, prefix+".name", host, "Name passed to scheduler, defaults to hostname.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat", 5*time.Minute, "How often to update plan progress file.")
f.StringVar(&cfg.SchedulerEndpoint, prefix+".scheduler-endpoint", "", "Scheduler endpoint to ask for more plans to work on.")
f.DurationVar(&cfg.NextPlanInterval, prefix+".next-plan-interval", 1*time.Minute, "How often to ask for next plan (when idle)")
}
// Creates new plan processor service.
// PlansDirectory is used for storing plan files.
// Bucket client used for downloading plan files.
// Cleanup function called on startup and after each build. Can be nil.
// Factory for creating PlanProcessor. Called for each new plan.
func NewService(cfg Config, plansDirectory string, bucket objstore.Bucket, cleanup func(logger log.Logger) error, factory func(planLog log.Logger, userID string, dayStart, dayEnd time.Time) PlanProcessor, l log.Logger, reg prometheus.Registerer) (*Service, error) {
if cfg.SchedulerEndpoint == "" {
return nil, errors.New("no scheduler endpoint")
}
if bucket == nil || factory == nil {
return nil, errors.New("invalid config")
}
if plansDirectory == "" {
return nil, errors.New("no directory for plans")
}
if err := os.MkdirAll(plansDirectory, os.FileMode(0700)); err != nil {
return nil, errors.Wrap(err, "failed to create plans directory")
}
b := &Service{
cfg: cfg,
plansDirectory: plansDirectory,
bucket: bucket,
cleanupFn: cleanup,
factory: factory,
log: l,
currentPlanStartTime: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_blocksconvert_plan_start_time_seconds",
Help: "Start time of current plan's time range (unix timestamp).",
}),
planFileReadPosition: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_blocksconvert_plan_file_position",
Help: "Read bytes from the plan file.",
}),
planFileSize: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_blocksconvert_plan_size",
Help: "Total size of plan file.",
}),
}
b.Service = services.NewBasicService(b.cleanup, b.running, nil)
return b, nil
}
// This service implements common behaviour for plan-processing: 1) wait for next plan, 2) download plan,
// 3) process each plan entry, 4) delete local plan, 5) repeat. It gets plans from scheduler. During plan processing,
// this service maintains "progress" status file, and when plan processing finishes, it uploads "finished" plan.
type Service struct {
services.Service
cfg Config
log log.Logger
plansDirectory string
bucket objstore.Bucket
cleanupFn func(logger log.Logger) error
factory func(planLog log.Logger, userID string, dayStart time.Time, dayEnd time.Time) PlanProcessor
planFileReadPosition prometheus.Gauge
planFileSize prometheus.Gauge
currentPlanStartTime prometheus.Gauge
}
func (s *Service) cleanup(_ context.Context) error {
files, err := ioutil.ReadDir(s.plansDirectory)
if err != nil {
return err
}
for _, f := range files {
toRemove := filepath.Join(s.plansDirectory, f.Name())
level.Info(s.log).Log("msg", "deleting unfinished local plan file", "file", toRemove)
err = os.Remove(toRemove)
if err != nil {
return errors.Wrapf(err, "removing %s", toRemove)
}
}
if s.cleanupFn != nil {
return s.cleanupFn(s.log)
}
return nil
}
func (s *Service) running(ctx context.Context) error {
ticker := time.NewTicker(s.cfg.NextPlanInterval)
defer ticker.Stop()
var schedulerClient blocksconvert.SchedulerClient
var conn *grpc.ClientConn
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
// We may get "tick" even when we should stop.
if ctx.Err() != nil {
return nil
}
if conn == nil {
opts, err := s.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return err
}
conn, err = grpc.Dial(s.cfg.SchedulerEndpoint, opts...)
if err != nil {
level.Error(s.log).Log("msg", "failed to dial", "endpoint", s.cfg.SchedulerEndpoint, "err", err)
continue
}
schedulerClient = blocksconvert.NewSchedulerClient(conn)
}
resp, err := schedulerClient.NextPlan(ctx, &blocksconvert.NextPlanRequest{Name: s.cfg.Name})
if err != nil {
level.Error(s.log).Log("msg", "failed to get next plan due to error, closing connection", "err", err)
_ = conn.Close()
conn = nil
schedulerClient = nil
continue
}
// No plan to work on, ignore.
if resp.PlanFile == "" {
continue
}
isPlanFile, planBaseName := blocksconvert.IsPlanFilename(resp.PlanFile)
if !isPlanFile {
level.Error(s.log).Log("msg", "got invalid plan file", "planFile", resp.PlanFile)
continue
}
ok, base, _ := blocksconvert.IsProgressFilename(resp.ProgressFile)
if !ok || base != planBaseName {
level.Error(s.log).Log("msg", "got invalid progress file", "progressFile", resp.ProgressFile)
continue
}
level.Info(s.log).Log("msg", "received plan file", "planFile", resp.PlanFile, "progressFile", resp.ProgressFile)
err = s.downloadAndProcessPlanFile(ctx, resp.PlanFile, planBaseName, resp.ProgressFile)
if err != nil {
level.Error(s.log).Log("msg", "failed to process plan file", "planFile", resp.PlanFile, "err", err)
// If context is canceled (blocksconvert is shutting down, or due to hearbeating failure), don't upload error.
if !errors.Is(err, context.Canceled) {
errorFile := blocksconvert.ErrorFilename(planBaseName)
err = s.bucket.Upload(ctx, errorFile, strings.NewReader(err.Error()))
if err != nil {
level.Error(s.log).Log("msg", "failed to upload error file", "errorFile", errorFile, "err", err)
}
}
}
err = s.cleanup(ctx)
if err != nil {
level.Error(s.log).Log("msg", "failed to cleanup working directory", "err", err)
}
}
}
}
func (s *Service) downloadAndProcessPlanFile(ctx context.Context, planFile, planBaseName, lastProgressFile string) error {
defer s.planFileSize.Set(0)
defer s.planFileReadPosition.Set(0)
defer s.currentPlanStartTime.Set(0)
planLog := log.With(s.log, "plan", planFile)
// Start heartbeating (updating of progress file). We setup new context used for the rest of the function.
// If hearbeating fails, we cancel this new context to abort quickly.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
hb := newHeartbeat(planLog, s.bucket, s.cfg.HeartbeatPeriod, planBaseName, lastProgressFile)
hb.AddListener(services.NewListener(nil, nil, nil, nil, func(from services.State, failure error) {
level.Error(planLog).Log("msg", "heartbeating failed, aborting build", "failure", failure)
cancel()
}))
if err := services.StartAndAwaitRunning(ctx, hb); err != nil {
return errors.Wrap(err, "failed to start heartbeating")
}
localPlanFile := filepath.Join(s.plansDirectory, filepath.Base(planFile))
planSize, err := downloadPlanFile(ctx, s.bucket, planFile, localPlanFile)
if err != nil {
return errors.Wrapf(err, "failed to download plan file %s to %s", planFile, localPlanFile)
}
level.Info(planLog).Log("msg", "downloaded plan file", "localPlanFile", localPlanFile, "size", planSize)
s.planFileSize.Set(float64(planSize))
f, err := os.Open(localPlanFile)
if err != nil {
return errors.Wrapf(err, "failed to read local plan file %s", localPlanFile)
}
defer func() {
_ = f.Close()
}()
// Use a buffer for reading plan file.
r, err := blocksconvert.PreparePlanFileReader(planFile, bufio.NewReaderSize(&readPositionReporter{r: f, g: s.planFileReadPosition}, 1*1024*1024))
if err != nil {
return err
}
dec := json.NewDecoder(r)
userID, dayStart, dayEnd, err := parsePlanHeader(dec)
if err != nil {
return err
}
s.currentPlanStartTime.Set(float64(dayStart.Unix()))
level.Info(planLog).Log("msg", "processing plan file", "user", userID, "dayStart", dayStart, "dayEnd", dayEnd)
processor := s.factory(planLog, userID, dayStart, dayEnd)
planEntryCh := make(chan blocksconvert.PlanEntry)
idChan := make(chan string, 1)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
id, err := processor.ProcessPlanEntries(gctx, planEntryCh)
idChan <- id
return err
})
g.Go(func() error {
return parsePlanEntries(gctx, dec, planEntryCh)
})
if err := g.Wait(); err != nil {
return errors.Wrap(err, "failed to build block")
}
err = os.Remove(localPlanFile)
if err != nil {
level.Warn(planLog).Log("msg", "failed to delete local plan file", "err", err)
}
id := <-idChan
// Upload finished status file
finishedFile := blocksconvert.FinishedFilename(planBaseName, id)
if err := s.bucket.Upload(ctx, finishedFile, strings.NewReader(id)); err != nil {
return errors.Wrap(err, "failed to upload finished status file")
}
level.Info(planLog).Log("msg", "uploaded finished file", "file", finishedFile)
// Stop heartbeating.
if err := services.StopAndAwaitTerminated(ctx, hb); err != nil {
// No need to report this error to caller to avoid generating error file.
level.Warn(planLog).Log("msg", "hearbeating failed", "err", err)
}
// All OK
return nil
}
func downloadPlanFile(ctx context.Context, bucket objstore.Bucket, planFile string, localPlanFile string) (int64, error) {
f, err := os.Create(localPlanFile)
if err != nil {
return 0, err
}
r, err := bucket.Get(ctx, planFile)
if err != nil {
_ = f.Close()
return 0, err
}
// Copy will read `r` until EOF, or error is returned. Any possible error from Close is irrelevant.
defer func() { _ = r.Close() }()
n, err := io.Copy(f, r)
if err != nil {
_ = f.Close()
return 0, err
}
return n, f.Close()
}
func parsePlanHeader(dec *json.Decoder) (userID string, startTime, endTime time.Time, err error) {
header := blocksconvert.PlanEntry{}
if err = dec.Decode(&header); err != nil {
return
}
if header.User == "" || header.DayIndex == 0 {
err = errors.New("failed to read plan file header: no user or day index found")
return
}
dayStart := time.Unix(int64(header.DayIndex)*int64(24*time.Hour/time.Second), 0).UTC()
dayEnd := dayStart.Add(24 * time.Hour)
return header.User, dayStart, dayEnd, nil
}
func parsePlanEntries(ctx context.Context, dec *json.Decoder, planEntryCh chan blocksconvert.PlanEntry) error {
defer close(planEntryCh)
var err error
complete := false
entry := blocksconvert.PlanEntry{}
for err = dec.Decode(&entry); err == nil; err = dec.Decode(&entry) {
if entry.Complete {
complete = true
entry.Reset()
continue
}
if complete {
return errors.New("plan entries found after plan footer")
}
if entry.SeriesID != "" && len(entry.Chunks) > 0 {
select {
case planEntryCh <- entry:
// ok
case <-ctx.Done():
return nil
}
}
entry.Reset()
}
if err == io.EOF {
if !complete {
return errors.New("plan is not complete")
}
err = nil
}
return errors.Wrap(err, "parsing plan entries")
}
type readPositionReporter struct {
r io.Reader
g prometheus.Gauge
pos int64
}
func (r *readPositionReporter) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
if n > 0 {
r.pos += int64(n)
r.g.Set(float64(r.pos))
}
return n, err
}