-
Notifications
You must be signed in to change notification settings - Fork 6
/
reader.go
104 lines (95 loc) · 2.57 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package rateio
import (
"fmt"
"io"
"time"
"github.com/Cloud-Foundations/Dominator/lib/format"
)
const (
DEFAULT_SPEED_PERCENT = 2
)
func newReaderContext(maxIOPerSecond uint64, speedPercent uint64,
measurer ReadIOMeasurer) *ReaderContext {
var ctx ReaderContext
ctx.maxIOPerSecond = maxIOPerSecond
if speedPercent < 1 {
speedPercent = DEFAULT_SPEED_PERCENT
}
ctx.speedPercent = speedPercent
ctx.chunklen = ctx.maxIOPerSecond * ctx.speedPercent / 10000
ctx.measurer = measurer
ctx.timeOfLastPause = time.Now()
measurer.Reset()
return &ctx
}
func (ctx *ReaderContext) initialiseMaximumSpeed(maxSpeed uint64) {
if ctx.maxIOPerSecond > 0 {
fmt.Println("Maximum speed already set")
}
ctx.maxIOPerSecond = maxSpeed
ctx.chunklen = ctx.maxIOPerSecond * ctx.speedPercent / 10000
}
func (ctx *ReaderContext) setSpeedPercent(percent uint) {
if percent > 100 {
percent = 100
}
ctx.speedPercent = uint64(percent)
ctx.chunklen = ctx.maxIOPerSecond * ctx.speedPercent / 10000
ctx.timeOfLastPause = time.Now()
ctx.measurer.Reset()
}
func (ctx *ReaderContext) newReader(rd io.Reader) *Reader {
var reader Reader
reader.ctx = ctx
reader.rd = rd
return &reader
}
func (ctx *ReaderContext) format() string {
return fmt.Sprintf("max speed=%s/s limit=%d%% %s/s",
format.FormatBytes(ctx.maxIOPerSecond),
ctx.speedPercent,
format.FormatBytes(ctx.maxIOPerSecond*ctx.speedPercent/100))
}
func (rd *Reader) read(b []byte) (n int, err error) {
if rd.ctx.maxIOPerSecond < 1 {
// Unspecified capacity: go at maximum speed.
return rd.rd.Read(b)
}
if rd.ctx.speedPercent >= 100 {
// Operate at maximum speed: get out of the way.
return rd.rd.Read(b)
}
if rd.ctx.bytesSinceLastPause >= rd.ctx.chunklen {
// Need to slow down.
desiredPerSecond := rd.ctx.maxIOPerSecond * rd.ctx.speedPercent / 100
if desiredPerSecond < 1 {
desiredPerSecond = rd.ctx.maxIOPerSecond / 1000
}
if desiredPerSecond < 1 {
desiredPerSecond = 1
}
readSinceLastPause, err := rd.ctx.measurer.MeasureReadIO(
rd.ctx.bytesSinceLastPause)
if err != nil {
return 0, err
}
desiredDuration := time.Duration(uint64(time.Second) *
readSinceLastPause / desiredPerSecond)
targetTime := rd.ctx.timeOfLastPause.Add(desiredDuration)
rd.ctx.timeOfLastPause = time.Now()
duration := targetTime.Sub(time.Now())
if duration > 0 {
if rd.ctx.sleepTimeDistribution != nil {
rd.ctx.sleepTimeDistribution.Add(duration)
}
time.Sleep(duration)
}
rd.ctx.bytesSinceLastPause = 0
}
n, err = rd.rd.Read(b)
if n < 1 || err != nil {
return
}
rd.ctx.bytesSinceLastPause += uint64(n)
return
}