-
Notifications
You must be signed in to change notification settings - Fork 1
/
bytecounter_reader.go
49 lines (40 loc) · 987 Bytes
/
bytecounter_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package bytecounter
import (
"io"
"sync/atomic"
"time"
)
type ByteCounterReader struct {
reader io.ReadCloser
// called & accessed synchronously during Read, no external access
cb func(full int64)
cbEvery time.Duration
lastCbAt time.Time
// set atomically because it may be read by multiple threads
bytes int64
}
func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader {
return &ByteCounterReader{
reader: reader,
}
}
func (b *ByteCounterReader) SetCallback(every time.Duration, cb func(full int64)) {
b.cbEvery = every
b.cb = cb
}
func (b *ByteCounterReader) Close() error {
return b.reader.Close()
}
func (b *ByteCounterReader) Read(p []byte) (n int, err error) {
n, err = b.reader.Read(p)
full := atomic.AddInt64(&b.bytes, int64(n))
now := time.Now()
if b.cb != nil && now.Sub(b.lastCbAt) > b.cbEvery {
b.cb(full)
b.lastCbAt = now
}
return n, err
}
func (b *ByteCounterReader) Bytes() int64 {
return atomic.LoadInt64(&b.bytes)
}