Skip to content

Commit f7d0371

Browse files
authored
Add partition key based iterator to the bulk loader (#4841)
1 parent 317e02e commit f7d0371

File tree

9 files changed

+871
-486
lines changed

9 files changed

+871
-486
lines changed

dgraph/cmd/bulk/key.go

Lines changed: 78 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dgraph/cmd/bulk/loader.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,10 @@ func (ld *loader) mapStage() {
230230
func (ld *loader) reduceStage() {
231231
ld.prog.setPhase(reducePhase)
232232

233-
r := reducer{state: ld.state}
233+
r := reducer{
234+
state: ld.state,
235+
streamIds: make(map[string]uint32),
236+
}
234237
x.Check(r.run())
235238
}
236239

dgraph/cmd/bulk/mapper.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ import (
4444
farm "github.com/dgryski/go-farm"
4545
)
4646

47+
const partitionKeyShard = 10
48+
4749
type mapper struct {
4850
*state
4951
shards []shardState // shard is based on predicate
@@ -120,6 +122,29 @@ func (m *mapper) writeMapEntriesToFile(entries []*pb.MapEntry, encodedSize uint6
120122
x.Check(gzWriter.Close())
121123
}()
122124

125+
// Create partition keys for the map file.
126+
header := &pb.MapHeader{
127+
PartitionKeys: [][]byte{},
128+
}
129+
shardPartitionNo := len(entries) / partitionKeyShard
130+
for i := range entries {
131+
if shardPartitionNo == 0 {
132+
// we have very few entries so no need for partition keys.
133+
break
134+
}
135+
if (i+1)%shardPartitionNo == 0 {
136+
header.PartitionKeys = append(header.PartitionKeys, entries[i].GetKey())
137+
}
138+
}
139+
// Write the header to the map file.
140+
headerBuf, err := header.Marshal()
141+
x.Check(err)
142+
lenBuf := make([]byte, 4)
143+
binary.BigEndian.PutUint32(lenBuf, uint32(len(headerBuf)))
144+
x.Check2(w.Write(lenBuf))
145+
x.Check2(w.Write(headerBuf))
146+
x.Check(err)
147+
123148
sizeBuf := make([]byte, binary.MaxVarintLen64)
124149
for _, me := range entries {
125150
n := binary.PutUvarint(sizeBuf, uint64(me.Size()))

dgraph/cmd/bulk/progress.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type progress struct {
3838
mapEdgeCount int64
3939
reduceEdgeCount int64
4040
reduceKeyCount int64
41+
numEncoding int32
4142

4243
start time.Time
4344
startReduce time.Time
@@ -107,14 +108,15 @@ func (p *progress) reportOnce() {
107108
pct = fmt.Sprintf("%.2f%% ", 100*float64(reduceEdgeCount)/float64(mapEdgeCount))
108109
}
109110
fmt.Printf("[%s] REDUCE %s %sedge_count:%s edge_speed:%s/sec "+
110-
"plist_count:%s plist_speed:%s/sec\n",
111+
"plist_count:%s plist_speed:%s/sec. Num Encoding: %d\n",
111112
timestamp,
112113
x.FixedDuration(now.Sub(p.start)),
113114
pct,
114115
niceFloat(float64(reduceEdgeCount)),
115116
niceFloat(float64(reduceEdgeCount)/elapsed.Seconds()),
116117
niceFloat(float64(reduceKeyCount)),
117118
niceFloat(float64(reduceKeyCount)/elapsed.Seconds()),
119+
atomic.LoadInt32(&p.numEncoding),
118120
)
119121
default:
120122
x.AssertTruef(false, "invalid phase")

0 commit comments

Comments
 (0)