This repository has been archived by the owner on Apr 26, 2021. It is now read-only.
forked from coreos/pkg
-
Notifications
You must be signed in to change notification settings - Fork 2
/
iocopy.go
189 lines (166 loc) · 4.6 KB
/
iocopy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright 2016 CoreOS Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package progressutil
import (
"errors"
"fmt"
"io"
"sync"
"time"
)
var (
ErrAlreadyStarted = errors.New("cannot add copies after PrintAndWait has been called")
)
type copyReader struct {
reader io.Reader
current int64
total int64
pb *ProgressBar
}
func (cr *copyReader) Read(p []byte) (int, error) {
n, err := cr.reader.Read(p)
cr.current += int64(n)
err1 := cr.updateProgressBar()
if err == nil {
err = err1
}
return n, err
}
func (cr *copyReader) updateProgressBar() error {
cr.pb.SetPrintAfter(cr.formattedProgress())
progress := float64(cr.current) / float64(cr.total)
if progress > 1 {
progress = 1
}
return cr.pb.SetCurrentProgress(progress)
}
// NewCopyProgressPrinter returns a new CopyProgressPrinter
func NewCopyProgressPrinter() *CopyProgressPrinter {
return &CopyProgressPrinter{
results: make(chan error),
cancel: make(chan struct{}),
pbp: &ProgressBarPrinter{PadToBeEven: true},
}
}
// CopyProgressPrinter will perform an arbitrary number of io.Copy calls, while
// continually printing the progress of each copy.
type CopyProgressPrinter struct {
results chan error
cancel chan struct{}
// `lock` mutex protects all fields below it in CopyProgressPrinter struct
lock sync.Mutex
readers []*copyReader
started bool
pbp *ProgressBarPrinter
}
// AddCopy adds a copy for this CopyProgressPrinter to perform. An io.Copy call
// will be made to copy bytes from reader to dest, and name and size will be
// used to label the progress bar and display how much progress has been made.
// If size is 0, the total size of the reader is assumed to be unknown.
// AddCopy can only be called before PrintAndWait; otherwise, ErrAlreadyStarted
// will be returned.
func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) error {
cpp.lock.Lock()
defer cpp.lock.Unlock()
if cpp.started {
return ErrAlreadyStarted
}
cr := ©Reader{
reader: reader,
current: 0,
total: size,
pb: cpp.pbp.AddProgressBar(),
}
cr.pb.SetPrintBefore(name)
cr.pb.SetPrintAfter(cr.formattedProgress())
cpp.readers = append(cpp.readers, cr)
go func() {
_, err := io.Copy(dest, cr)
select {
case <-cpp.cancel:
return
case cpp.results <- err:
return
}
}()
return nil
}
// PrintAndWait will print the progress for each copy operation added with
// AddCopy to printTo every printInterval. This will continue until every added
// copy is finished, or until cancel is written to.
// PrintAndWait may only be called once; any subsequent calls will immediately
// return ErrAlreadyStarted. After PrintAndWait has been called, no more
// copies may be added to the CopyProgressPrinter.
func (cpp *CopyProgressPrinter) PrintAndWait(printTo io.Writer, printInterval time.Duration, cancel chan struct{}) error {
cpp.lock.Lock()
if cpp.started {
cpp.lock.Unlock()
return ErrAlreadyStarted
}
cpp.started = true
cpp.lock.Unlock()
n := len(cpp.readers)
if n == 0 {
// Nothing to do.
return nil
}
defer close(cpp.cancel)
t := time.NewTicker(printInterval)
allDone := false
for i := 0; i < n; {
select {
case <-cancel:
return nil
case <-t.C:
_, err := cpp.pbp.Print(printTo)
if err != nil {
return err
}
case err := <-cpp.results:
i++
// Once completion is signaled, further on this just drains
// (unlikely) errors from the channel.
if err == nil && !allDone {
allDone, err = cpp.pbp.Print(printTo)
}
if err != nil {
return err
}
}
}
return nil
}
func (cr *copyReader) formattedProgress() string {
var totalStr string
if cr.total == 0 {
totalStr = "?"
} else {
totalStr = ByteUnitStr(cr.total)
}
return fmt.Sprintf("%s / %s", ByteUnitStr(cr.current), totalStr)
}
var byteUnits = []string{"B", "KB", "MB", "GB", "TB", "PB"}
// ByteUnitStr pretty prints a number of bytes.
func ByteUnitStr(n int64) string {
var unit string
size := float64(n)
for i := 1; i < len(byteUnits); i++ {
if size < 1000 {
unit = byteUnits[i-1]
break
}
size = size / 1000
}
return fmt.Sprintf("%.3g %s", size, unit)
}