forked from dgraph-io/dgraph
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tablet.go
291 lines (263 loc) · 8.15 KB
/
tablet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
/*
* Copyright (C) 2017 Dgraph Labs, Inc. and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package zero
import (
"sort"
"time"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/x"
humanize "github.com/dustin/go-humanize"
"golang.org/x/net/context"
)
const (
predicateMoveTimeout = 20 * time.Minute
)
/*
Steps to move predicate p from g1 to g2.
Design change:
• If you’re not the leader, don’t talk to zero.
• Let the leader send you updates via proposals.
Move:
• Dgraph zero would decide that G1 should not serve P, G2 should serve it.
• Zero would propose that G1 is read-only for predicate P. This would propagate to the cluster.
• Zero would tell G1 to move P to G2 (Endpoint: Zero → G1)
This would trigger G1 to get latest state. Wait for it.
• G1 would propose this state to it’s followers.
• G1 after proposing would do a call to G2, and start streaming.
• Before G2 starts accepting, it should delete any current keys for P.
• It should tell Zero whether it succeeded or failed. (Endpoint: G1 → Zero)
• Zero would then propose that G2 is serving P (or G1 is, if fail above) P would RW.
• G1 gets this, G2 gets this.
• Both propagate this to their followers.
*/
// TODO: Have a event log for everything.
func (s *Server) rebalanceTablets() {
ticker := time.NewTicker(opts.rebalanceInterval)
for {
select {
case <-ticker.C:
predicate, srcGroup, dstGroup := s.chooseTablet()
if len(predicate) == 0 {
break
}
if err := s.movePredicate(predicate, srcGroup, dstGroup); err != nil {
x.Println(err)
}
}
}
}
func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error {
tab := s.ServingTablet(predicate)
x.AssertTruef(tab != nil, "Tablet to be moved: [%v] should not be nil", predicate)
x.Printf("Going to move predicate: [%v], size: [%v] from group %d to %d\n", predicate,
humanize.Bytes(uint64(tab.Space)), srcGroup, dstGroup)
ctx, cancel := context.WithTimeout(context.Background(), predicateMoveTimeout)
done := make(chan struct{}, 1)
go func(done chan struct{}, cancel context.CancelFunc) {
select {
case <-s.leaderChangeChannel():
// Cancel predicate moves when you step down as leader.
if !s.Node.AmLeader() {
cancel()
break
}
// We might have initiated predicate move on some other node, give it some
// time to get cancelled. On cancellation the other node would set the predicate
// to write mode again and we need to be sure that it doesn't happen after we
// decide to move the predicate and set it to read mode.
time.Sleep(time.Minute)
// Check if any predicates were stuck in read mode. We don't need to do it
// periodically because we revert back the predicate to write state in case
// of any error unless a node crashes or is shutdown.
s.runRecovery()
case <-done:
cancel()
}
}(done, cancel)
err := s.moveTablet(ctx, predicate, srcGroup, dstGroup)
done <- struct{}{}
if err != nil {
return x.Errorf("Error while trying to move predicate %v from %d to %d: %v", predicate,
srcGroup, dstGroup, err)
}
x.Printf("Predicate move done for: [%v] from group %d to %d\n", predicate, srcGroup, dstGroup)
return nil
}
func (s *Server) runRecovery() {
s.RLock()
defer s.RUnlock()
if s.state == nil {
return
}
var proposals []*intern.ZeroProposal
for _, group := range s.state.Groups {
for _, tab := range group.Tablets {
if tab.ReadOnly {
p := &intern.ZeroProposal{}
p.Tablet = &intern.Tablet{
GroupId: tab.GroupId,
Predicate: tab.Predicate,
Space: tab.Space,
Force: true,
}
proposals = append(proposals, p)
}
}
}
errCh := make(chan error)
for _, pr := range proposals {
go func(pr *intern.ZeroProposal) {
errCh <- s.Node.proposeAndWait(context.Background(), pr)
}(pr)
}
for range proposals {
// We Don't care about these errors
// Ideally shouldn't error out.
if err := <-errCh; err != nil {
x.Printf("Error while applying proposal in update stream %v\n", err)
}
}
}
func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uint32) {
s.RLock()
defer s.RUnlock()
if s.state == nil {
return
}
numGroups := len(s.state.Groups)
if !s.Node.AmLeader() || numGroups <= 1 {
return
}
// Sort all groups by their sizes.
type kv struct {
gid uint32
size int64 // in bytes
}
var groups []kv
for k, v := range s.state.Groups {
space := int64(0)
for _, tab := range v.Tablets {
space += tab.Space
}
groups = append(groups, kv{k, space})
}
sort.Slice(groups, func(i, j int) bool {
return groups[i].size < groups[j].size
})
x.Printf("\n\nGroups sorted by size: %+v\n\n", groups)
for lastGroup := numGroups - 1; lastGroup > 0; lastGroup-- {
srcGroup = groups[lastGroup].gid
dstGroup = groups[0].gid
size_diff := groups[lastGroup].size - groups[0].size
x.Printf("size_diff %v\n", size_diff)
// Don't move a node unless you receive atleast one update regarding tablet size.
// Tablet size would have come up with leader update.
if !s.hasLeader(dstGroup) {
return
}
// We move the predicate only if the difference between size of both machines is
// atleast 10% of src group.
if float64(size_diff) < 0.1*float64(groups[0].size) {
continue
}
// Try to find a predicate which we can move.
size := int64(0)
group := s.state.Groups[srcGroup]
for _, tab := range group.Tablets {
// Finds a tablet as big a possible such that on moving it dstGroup's size is
// less than or equal to srcGroup.
if tab.Space <= size_diff/2 && tab.Space > size {
predicate = tab.Predicate
size = tab.Space
}
}
if len(predicate) > 0 {
return
}
}
return
}
func (s *Server) moveTablet(ctx context.Context, predicate string, srcGroup uint32,
dstGroup uint32) error {
err := s.movePredicateHelper(ctx, predicate, srcGroup, dstGroup)
if err == nil {
return nil
}
if !s.Node.AmLeader() {
s.runRecovery()
return err
}
stab := s.ServingTablet(predicate)
x.AssertTrue(stab != nil)
p := &intern.ZeroProposal{}
p.Tablet = &intern.Tablet{
GroupId: srcGroup,
Predicate: predicate,
Space: stab.Space,
Force: true,
}
if err := s.Node.proposeAndWait(context.Background(), p); err != nil {
x.Printf("Error while reverting group %d to RW: %+v\n", srcGroup, err)
}
return err
}
func (s *Server) movePredicateHelper(ctx context.Context, predicate string, srcGroup uint32,
dstGroup uint32) error {
n := s.Node
stab := s.ServingTablet(predicate)
x.AssertTrue(stab != nil)
// Propose that predicate in read only
p := &intern.ZeroProposal{}
p.Tablet = &intern.Tablet{
GroupId: srcGroup,
Predicate: predicate,
Space: stab.Space,
ReadOnly: true,
Force: true,
}
if err := n.proposeAndWait(ctx, p); err != nil {
return err
}
pl := s.Leader(srcGroup)
if pl == nil {
return x.Errorf("No healthy connection found to leader of group %d", srcGroup)
}
c := intern.NewWorkerClient(pl.Get())
in := &intern.MovePredicatePayload{
Predicate: predicate,
State: s.membershipState(),
SourceGroupId: srcGroup,
DestGroupId: dstGroup,
}
if _, err := c.MovePredicate(ctx, in); err != nil {
return err
}
// Propose that predicate is served by dstGroup in RW.
p.Tablet = &intern.Tablet{
GroupId: dstGroup,
Predicate: predicate,
Space: stab.Space,
Force: true,
}
if err := n.proposeAndWait(ctx, p); err != nil {
return err
}
// TODO: Probably make it R in dstGroup and send state to srcGroup and only after
// it proposes make it RW in dstGroup. That way we won't have stale reads from srcGroup
// for sure.
return nil
}