/
progress.go
155 lines (139 loc) · 3.53 KB
/
progress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package utils
import (
"context"
"encoding/json"
"io"
"sync/atomic"
"time"
"github.com/cheggaaa/pb/v3"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)
type logFunc func(msg string, fields ...zap.Field)
// ProgressPrinter prints a progress bar.
type ProgressPrinter struct {
name string
total int64
redirectLog bool
progress int64
cancel context.CancelFunc
}
// NewProgressPrinter returns a new progress printer.
func NewProgressPrinter(
name string,
total int64,
redirectLog bool,
) *ProgressPrinter {
return &ProgressPrinter{
name: name,
total: total,
redirectLog: redirectLog,
cancel: func() {
log.Warn("canceling non-started progress printer")
},
}
}
// Inc increases the current progress bar.
func (pp *ProgressPrinter) Inc() {
atomic.AddInt64(&pp.progress, 1)
}
// Close closes the current progress bar.
func (pp *ProgressPrinter) Close() {
pp.cancel()
}
// goPrintProgress starts a gorouinte and prints progress.
func (pp *ProgressPrinter) goPrintProgress(
ctx context.Context,
logFuncImpl logFunc,
testWriter io.Writer, // Only for tests
) {
cctx, cancel := context.WithCancel(ctx)
pp.cancel = cancel
bar := pb.New64(pp.total)
if pp.redirectLog || testWriter != nil {
tmpl := `{"P":"{{percent .}}","C":"{{counters . }}","E":"{{etime .}}","R":"{{rtime .}}","S":"{{speed .}}"}`
bar.SetTemplateString(tmpl)
bar.SetRefreshRate(2 * time.Minute)
bar.Set(pb.Static, false) // Do not update automatically
bar.Set(pb.ReturnSymbol, false) // Do not append '\r'
bar.Set(pb.Terminal, false) // Do not use terminal width
// Hack! set Color to avoid separate progress string
bar.Set(pb.Color, true)
if logFuncImpl == nil {
logFuncImpl = log.Info
}
bar.SetWriter(&wrappedWriter{name: pp.name, log: logFuncImpl})
} else {
tmpl := `{{string . "barName" | green}} {{ bar . "<" "-" (cycle . "-" "\\" "|" "/" ) "." ">"}} {{percent .}}`
bar.SetTemplateString(tmpl)
bar.Set("barName", pp.name)
}
if testWriter != nil {
bar.SetWriter(testWriter)
bar.SetRefreshRate(2 * time.Second)
}
bar.Start()
go func() {
t := time.NewTicker(time.Second)
defer t.Stop()
defer bar.Finish()
for {
select {
case <-cctx.Done():
// a hacky way to adapt the old behavior:
// when canceled by the outer context, leave the progress unchanged.
// when canceled by Close method (the 'internal' way), push the progress to 100%.
if ctx.Err() != nil {
return
}
bar.SetCurrent(pp.total)
return
case <-t.C:
}
currentProgress := atomic.LoadInt64(&pp.progress)
if currentProgress <= pp.total {
bar.SetCurrent(currentProgress)
} else {
bar.SetCurrent(pp.total)
}
}
}()
}
type wrappedWriter struct {
name string
log logFunc
}
func (ww *wrappedWriter) Write(p []byte) (int, error) {
var info struct {
P string
C string
E string
R string
S string
}
if err := json.Unmarshal(p, &info); err != nil {
return 0, errors.Trace(err)
}
ww.log("progress",
zap.String("step", ww.name),
zap.String("progress", info.P),
zap.String("count", info.C),
zap.String("speed", info.S),
zap.String("elapsed", info.E),
zap.String("remaining", info.R))
return len(p), nil
}
// StartProgress starts progress bar.
func StartProgress(
ctx context.Context,
name string,
total int64,
redirectLog bool,
log logFunc,
) *ProgressPrinter {
progress := NewProgressPrinter(name, total, redirectLog)
progress.goPrintProgress(ctx, log, nil)
return progress
}