forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprogress.go
90 lines (81 loc) · 3.13 KB
/
progress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/LICENSE
package sqlccl
import (
"time"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/sql/jobs"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
// For both backups and restores, we compute progress as the number of completed
// export or import requests, respectively, divided by the total number of
// requests. To avoid hammering the system.jobs table, when a response comes
// back, we issue a progress update only if a) it's been a duration of
// progressTimeThreshold since the last update, or b) the difference between the
// last logged fractionCompleted and the current fractionCompleted is more than
// progressFractionThreshold.
const (
progressTimeThreshold = time.Second
progressFractionThreshold = 0.05
)
type jobProgressLogger struct {
// These fields must be externally initialized.
job *jobs.Job
startFraction float32
totalChunks int
progressedFn jobs.ProgressedFn
// The remaining fields are for internal use only.
completedChunks int
lastReportedAt time.Time
lastReportedFraction float32
}
// chunkFinished marks one chunk of the job as completed. If either the time or
// fraction threshold has been reached, the progress update will be persisted to
// system.jobs.
//
// NB: chunkFinished is not threadsafe. A previous implementation that was
// threadsafe occasionally led to massive contention. One 2TB restore on a 15
// node cluster, for example, had 60 goroutines attempting to update the
// progress at once, causing massive contention on the row in system.jobs. This
// inadvertently applied backpressure on the restore's import requests and
// slowed the job to a crawl. If multiple threads need to update progress, use a
// channel and a dedicated goroutine that calls loop.
func (jpl *jobProgressLogger) chunkFinished(ctx context.Context) error {
jpl.completedChunks++
fraction := float32(jpl.completedChunks) / float32(jpl.totalChunks)
fraction = fraction*(1-jpl.startFraction) + jpl.startFraction
shouldLogProgress := fraction-jpl.lastReportedFraction > progressFractionThreshold ||
jpl.lastReportedAt.Add(progressTimeThreshold).Before(timeutil.Now())
if !shouldLogProgress {
return nil
}
jpl.lastReportedAt = timeutil.Now()
jpl.lastReportedFraction = fraction
return jpl.job.Progressed(ctx, fraction, jpl.progressedFn)
}
// loop calls chunkFinished for every message received over chunkCh. It exits
// when chunkCh is closed, when totalChunks messages have been received, or when
// the context is canceled.
func (jpl *jobProgressLogger) loop(ctx context.Context, chunkCh <-chan struct{}) error {
for {
select {
case _, ok := <-chunkCh:
if !ok {
return nil
}
if err := jpl.chunkFinished(ctx); err != nil {
return err
}
if jpl.completedChunks == jpl.totalChunks {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}