This repository has been archived by the owner on Oct 25, 2018. It is now read-only.
/
rebalance.go
101 lines (97 loc) · 2.42 KB
/
rebalance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package distributor
import (
"io"
"math/rand"
"time"
"github.com/alternative-storage/torus"
"github.com/alternative-storage/torus/models"
)
// Goroutine which watches for new rings and kicks off
// the rebalance dance.
func (d *Distributor) ringWatcher(closer chan struct{}) {
ch := make(chan torus.Ring)
d.srv.MDS.SubscribeNewRings(ch)
exit:
for {
select {
case <-closer:
d.srv.MDS.UnsubscribeNewRings(ch)
close(ch)
break exit
case newring, ok := <-ch:
if ok {
if newring.Version() == d.ring.Version() {
// No problem. We're seeing the same ring.
continue
}
if newring.Version() < d.ring.Version() {
panic("replacing old ring with ring in the past!")
}
d.mut.Lock()
d.ring = newring
d.mut.Unlock()
} else {
break exit
}
}
}
}
func (d *Distributor) rebalanceTicker(closer chan struct{}) {
n := 0
total := 0
time.Sleep(time.Duration(250+rand.Intn(250)) * time.Millisecond)
exit:
for {
clog.Tracef("starting rebalance/gc cycle")
volset, _, err := d.srv.MDS.GetVolumes()
if err != nil {
clog.Error(err)
}
for _, x := range volset {
clog.Tracef("checking volume %v...", x.Name)
err := d.rebalancer.PrepVolume(x)
if err != nil {
clog.Errorf("gc prep for %s failed: %s", x.Name, err)
}
}
ratelimit:
for {
timeout := 2 * time.Duration(n+1) * time.Millisecond
select {
case <-closer:
break exit
case <-time.After(timeout):
written, err := d.rebalancer.Tick()
if d.ring.Version() != d.rebalancer.VersionStart() {
// Something is changed -- we are now rebalancing
d.rebalancing = true
}
info := &models.RebalanceInfo{
Rebalancing: d.rebalancing,
}
total += written
info.LastRebalanceBlocks = uint64(total)
if err == io.EOF {
// Good job, sleep well, I'll most likely rebalance you in the morning.
info.LastRebalanceFinish = time.Now().UnixNano()
total = 0
finishver := d.rebalancer.VersionStart()
if finishver == d.ring.Version() {
d.rebalancing = false
info.Rebalancing = false
}
d.srv.UpdateRebalanceInfo(info)
clog.Tracef("finished rebalance/gc cycle. ring version is %v", d.ring.Version())
break ratelimit
} else if err != nil {
// This is usually really bad
clog.Error(err)
}
n = written
d.srv.UpdateRebalanceInfo(info)
}
}
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
d.rebalancer.Reset()
}
}