forked from dgraph-io/dgraph
/
index.go
132 lines (113 loc) · 3.65 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package worker
import (
"time"
"golang.org/x/net/context"
"github.com/dgraph-io/dgraph/group"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/task"
"github.com/dgraph-io/dgraph/x"
)
// rebuildIndex is called by node.Run to rebuild index.
func (n *node) rebuildIndex(ctx context.Context, proposalData []byte) error {
x.AssertTrue(proposalData[0] == proposalReindex)
var proposal task.Proposal
x.Check(proposal.Unmarshal(proposalData[1:]))
x.AssertTrue(proposal.RebuildIndex != nil)
gid := n.gid
x.AssertTrue(gid == proposal.RebuildIndex.GroupId)
x.Trace(ctx, "Processing proposal to rebuild index: %v", proposal.RebuildIndex)
if err := n.syncAllMarks(ctx); err != nil {
return err
}
// Do actual index work.
attr := proposal.RebuildIndex.Attr
x.AssertTrue(group.BelongsTo(attr) == gid)
if err := posting.RebuildIndex(ctx, attr); err != nil {
return err
}
return nil
}
func (n *node) syncAllMarks(ctx context.Context) error {
gid := n.gid
// Get index of last committed.
lastIndex, err := n.store.LastIndex()
if err != nil {
return err
}
// Wait for syncing to data store.
for n.applied.WaitingFor() {
doneUntil := n.applied.DoneUntil() // applied until.
x.Trace(ctx, "syncAllMarks waiting, appliedUntil:%d lastIndex: %d",
doneUntil, lastIndex)
if doneUntil >= lastIndex {
break // Do the check before sleep.
}
time.Sleep(100 * time.Millisecond)
}
// Force an aggressive evict.
posting.CommitLists(10)
// Wait for posting lists applying.
w := posting.SyncMarkFor(gid)
for w.WaitingFor() {
doneUntil := w.DoneUntil() // synced until.
x.Trace(ctx, "syncAllMarks waiting, syncedUntil:%d lastIndex:%d",
doneUntil, lastIndex)
if doneUntil >= lastIndex {
break // Do the check before sleep.
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
// RebuildIndex request is used to trigger rebuilding of index for the requested
// attribute. Payload is not really used.
func (w *grpcWorker) RebuildIndex(ctx context.Context, req *task.RebuildIndex) (*Payload, error) {
if ctx.Err() != nil {
return &Payload{}, ctx.Err()
}
if err := proposeRebuildIndex(ctx, req); err != nil {
return &Payload{}, err
}
return &Payload{}, nil
}
func proposeRebuildIndex(ctx context.Context, ri *task.RebuildIndex) error {
gid := ri.GroupId
n := groups().Node(gid)
proposal := &task.Proposal{RebuildIndex: ri}
if err := n.ProposeAndWait(ctx, proposal); err != nil {
return err
}
return nil
}
// RebuildIndexOverNetwork rebuilds index for attr. If it serves the attr, then
// it will rebuild index. Otherwise, it will send a request to a server that
// serves the attr.
func RebuildIndexOverNetwork(ctx context.Context, attr string) error {
if !schema.State().IsIndexed(attr) {
return x.Errorf("Attribute %s is indexed", attr)
}
gid := group.BelongsTo(attr)
x.Trace(ctx, "RebuildIndex attr: %v groupId: %v", attr, gid)
if groups().ServesGroup(gid) {
// No need for a network call, as this should be run from within this instance.
return proposeRebuildIndex(ctx, &task.RebuildIndex{GroupId: gid, Attr: attr})
}
// Send this over the network.
addr := groups().AnyServer(gid)
pl := pools().get(addr)
conn, err := pl.Get()
if err != nil {
return x.Wrapf(err, "RebuildIndexOverNetwork: while retrieving connection.")
}
defer pl.Put(conn)
x.Trace(ctx, "Sending request to %v", addr)
c := NewWorkerClient(conn)
_, err = c.RebuildIndex(ctx, &task.RebuildIndex{Attr: attr, GroupId: gid})
if err != nil {
x.TraceError(ctx, x.Wrapf(err, "Error while calling Worker.RebuildIndex"))
return err
}
x.Trace(ctx, "RebuildIndex reply from server. Addr: %v Attr: %v", addr, attr)
return nil
}