Skip to content

Commit b718993

Browse files
authored
Optimize XidtoUID map used by live and bulk loader (#2998)
I've spent the last few days looking at how to optimize the live mutation path in Dgraph server. While trying many things in the server (past commits included), I realized my optimizations in the server are not improving things much, the throughput saturating at 20-30K NQuads/sec. Turns out, it was the live loader which was causing the saturation. The XID to UID assigner was the bottleneck causing the throughput to stagnate, despite the server being underutilized. This PR fixes that by optimizing the assigner. In particular, I've removed the slow LRU cache. Added buffer to `newRanges` channel to ensure we always have a range handy when we run out. Made passing badger DB instance optional, so we can avoid doing disk writes if not required. And made other optimizations around how we lock, etc. I also added benchmarks for the assigner, which shows each allocation (tested via parallel benchmark) takes 350 ns/op on my desktop. With these changes, the live loader throughput jumps to 100K-120K NQuads/sec on my desktop. In particular, pre-assigning UIDs to the RDF/JSON file yields maximum throughput. I can load 140M friend graph RDFs in 25 mins. Helps with #2975 . Changes: * Work on optimizing XidToUid map. * Add the test and benchmark for xid to uid map * Working code with decreased memory usage. Includes a new BumpUp API. * Working live loader, which can optionally just keep all the mapping in memory. * Adding shards back to XidMap speed up operations by a huge factor. Benchmark shows each allocation is 300ns. * Make BumpTo much faster by calling Zero directly, instead of looping through the newRanges channel. * Improve how BumpTo() happens by using a maxSeenUid variable.
1 parent fc25e4c commit b718993

File tree

8 files changed

+287
-166
lines changed

8 files changed

+287
-166
lines changed

dgraph/cmd/bulk/loader.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func readSchema(filename string) []*pb.SchemaUpdate {
147147
func (ld *loader) mapStage() {
148148
ld.prog.setPhase(mapPhase)
149149

150+
// TODO: Consider if we need to always store the XIDs in Badger. Things slow down if we do.
150151
xidDir := filepath.Join(ld.opt.TmpDir, "xids")
151152
x.Check(os.Mkdir(xidDir, 0755))
152153
opt := badger.DefaultOptions
@@ -157,10 +158,7 @@ func (ld *loader) mapStage() {
157158
var err error
158159
ld.xidDB, err = badger.Open(opt)
159160
x.Check(err)
160-
ld.xids = xidmap.New(ld.xidDB, ld.zero, xidmap.Options{
161-
NumShards: 1 << 10,
162-
LRUSize: 1 << 19,
163-
})
161+
ld.xids = xidmap.New(ld.zero, ld.xidDB)
164162

165163
var dir, ext string
166164
var loaderType int
@@ -226,7 +224,7 @@ func (ld *loader) mapStage() {
226224
for i := range ld.mappers {
227225
ld.mappers[i] = nil
228226
}
229-
ld.xids.EvictAll()
227+
x.Check(ld.xids.Flush())
230228
x.Check(ld.xidDB.Close())
231229
ld.xids = nil
232230
runtime.GC()

dgraph/cmd/bulk/mapper.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ func (m *mapper) processNQuad(nq gql.NQuad) {
217217
}
218218

219219
func (m *mapper) lookupUid(xid string) uint64 {
220-
uid, isNew := m.xids.AssignUid(xid)
221-
if !isNew || !m.opt.StoreXids {
220+
uid := m.xids.AssignUid(xid)
221+
if !m.opt.StoreXids {
222222
return uid
223223
}
224224
if strings.HasPrefix(xid, "_:") {

dgraph/cmd/live/batch.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type loader struct {
6363
dc *dgo.Dgraph
6464
alloc *xidmap.XidMap
6565
ticker *time.Ticker
66-
kv *badger.DB
66+
db *badger.DB
6767
requestsWg sync.WaitGroup
6868
// If we retry a request, we add one to retryRequestsWg.
6969
retryRequestsWg sync.WaitGroup
@@ -188,15 +188,18 @@ func (l *loader) makeRequests() {
188188
}
189189

190190
func (l *loader) printCounters() {
191-
l.ticker = time.NewTicker(2 * time.Second)
191+
period := 5 * time.Second
192+
l.ticker = time.NewTicker(period)
192193
start := time.Now()
193194

195+
var last Counter
194196
for range l.ticker.C {
195197
counter := l.Counter()
196-
rate := float64(counter.Nquads) / counter.Elapsed.Seconds()
198+
rate := float64(counter.Nquads-last.Nquads) / period.Seconds()
197199
elapsed := time.Since(start).Round(time.Second)
198-
fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/sec: %5.0f Aborts: %d\n",
200+
fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n",
199201
elapsed, counter.TxnsDone, counter.Nquads, rate, counter.Aborts)
202+
last = counter
200203
}
201204
}
202205

dgraph/cmd/live/run.go

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,12 @@ func (l *loader) uid(val string) string {
158158
// to be an existing node in the graph. There is limited protection against
159159
// a user selecting an unassigned UID in this way - it may be assigned
160160
// later to another node. It is up to the user to avoid this.
161-
if strings.HasPrefix(val, "0x") {
162-
if _, err := strconv.ParseUint(val[2:], 16, 64); err == nil {
163-
return val
164-
}
161+
if uid, err := strconv.ParseUint(val, 0, 64); err == nil {
162+
l.alloc.BumpTo(uid)
163+
return fmt.Sprintf("%#x", uid)
165164
}
166165

167-
uid, _ := l.alloc.AssignUid(val)
166+
uid := l.alloc.AssignUid(val)
168167
return fmt.Sprintf("%#x", uint64(uid))
169168
}
170169

@@ -241,36 +240,32 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
241240
}
242241

243242
func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader {
244-
x.Check(os.MkdirAll(opt.clientDir, 0700))
245-
o := badger.DefaultOptions
246-
o.SyncWrites = true // So that checkpoints are persisted immediately.
247-
o.TableLoadingMode = bopt.MemoryMap
248-
o.Dir = opt.clientDir
249-
o.ValueDir = opt.clientDir
243+
var db *badger.DB
244+
if len(opt.clientDir) > 0 {
245+
x.Check(os.MkdirAll(opt.clientDir, 0700))
246+
o := badger.DefaultOptions
247+
o.Dir = opt.clientDir
248+
o.ValueDir = opt.clientDir
249+
o.TableLoadingMode = bopt.MemoryMap
250+
o.SyncWrites = false
250251

251-
kv, err := badger.Open(o)
252-
x.Checkf(err, "Error while creating badger KV posting store")
252+
var err error
253+
db, err = badger.Open(o)
254+
x.Checkf(err, "Error while creating badger KV posting store")
255+
}
253256

254257
// compression with zero server actually makes things worse
255258
connzero, err := x.SetupConnection(opt.zero, &tlsConf, false)
256259
x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.zero)
257260

258-
alloc := xidmap.New(
259-
kv,
260-
connzero,
261-
xidmap.Options{
262-
NumShards: 100,
263-
LRUSize: 1e5,
264-
},
265-
)
266-
261+
alloc := xidmap.New(connzero, db)
267262
l := &loader{
268263
opts: opts,
269264
dc: dc,
270265
start: time.Now(),
271266
reqs: make(chan api.Mutation, opts.Pending*2),
272267
alloc: alloc,
273-
kv: kv,
268+
db: db,
274269
zeroconn: connzero,
275270
}
276271

@@ -322,17 +317,8 @@ func run() error {
322317
}
323318
dgraphClient := dgo.NewDgraphClient(clients...)
324319

325-
if len(opt.clientDir) == 0 {
326-
var err error
327-
opt.clientDir, err = ioutil.TempDir("", "x")
328-
x.Checkf(err, "Error while trying to create temporary client directory.")
329-
fmt.Printf("Creating temp client directory at %s\n", opt.clientDir)
330-
defer os.RemoveAll(opt.clientDir)
331-
}
332320
l := setup(bmOpts, dgraphClient)
333321
defer l.zeroconn.Close()
334-
defer l.kv.Close()
335-
defer l.alloc.EvictAll()
336322

337323
if len(opt.schemaFile) > 0 {
338324
if err := processSchemaFile(ctx, opt.schemaFile, dgraphClient); err != nil {
@@ -397,5 +383,9 @@ func run() error {
397383
fmt.Printf("Time spent : %v\n", c.Elapsed)
398384
fmt.Printf("N-Quads processed per second : %d\n", rate)
399385

386+
if l.db != nil {
387+
l.alloc.Flush()
388+
l.db.Close()
389+
}
400390
return nil
401391
}

worker/mutation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error {
290290
return x.Errorf("Input for predicate %s of type uid is scalar", edge.Attr)
291291

292292
case schemaType.IsScalar() && !storageType.IsScalar():
293-
return x.Errorf("Input for predicate %s of type scalar is uid", edge.Attr)
293+
return x.Errorf("Input for predicate %s of type scalar is uid. Edge: %v", edge.Attr, edge)
294294

295295
// The suggested storage type matches the schema, OK!
296296
case storageType == schemaType && schemaType != types.DefaultID:

x/x.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func SetupConnection(host string, tlsConf *TLSHelperConfig, useGz bool) (*grpc.C
433433
grpc.WithBlock(),
434434
grpc.WithTimeout(10*time.Second))
435435

436-
if tlsConf.CertRequired {
436+
if tlsConf != nil && tlsConf.CertRequired {
437437
tlsConf.ConfigType = TLSClientConfig
438438
tlsCfg, _, err := GenerateTLSConfig(*tlsConf)
439439
if err != nil {

0 commit comments

Comments
 (0)