Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MerkleDB.commitChanges #2921

Merged
merged 16 commits into from
Apr 8, 2024
84 changes: 51 additions & 33 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,10 @@ func (db *merkleDB) commitBatch(ops []database.BatchOp) error {
return view.commitToDB(context.Background())
}

// commitChanges commits the changes in [trieToCommit] to [db].
// commitView commits the changes in [trieToCommit] to [db].
// Assumes [trieToCommit]'s node IDs have been calculated.
// Assumes [db.commitLock] is held.
func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error {
func (db *merkleDB) commitView(ctx context.Context, trieToCommit *view) error {
db.lock.Lock()
defer db.lock.Unlock()

Expand All @@ -949,7 +949,7 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error
}

changes := trieToCommit.changes
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitChanges", oteltrace.WithAttributes(
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitView", oteltrace.WithAttributes(
attribute.Int("nodesChanged", len(changes.nodes)),
attribute.Int("valuesChanged", len(changes.values)),
))
Expand All @@ -965,8 +965,46 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error
return nil
}

currentValueNodeBatch := db.valueNodeDB.NewBatch()
_, nodesSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.writeNodes")
valueNodeBatch := db.valueNodeDB.NewBatch()
if err := db.applyChanges(ctx, valueNodeBatch, changes); err != nil {
return err
}

if err := db.commitValueChanges(ctx, valueNodeBatch); err != nil {
return err
}

db.history.record(changes)

// Update root in database.
db.root = changes.rootChange.after
db.rootID = changes.rootID
return nil
}

// moveChildViewsToDB removes any child views from the trieToCommit and moves
// them to the db.
//
// assumes [db.lock] is held
func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) {
trieToCommit.validityTrackingLock.Lock()
defer trieToCommit.validityTrackingLock.Unlock()

for _, childView := range trieToCommit.childViews {
childView.updateParent(db)
db.childViews = append(db.childViews, childView)
}
trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize)
}

// applyChanges takes the [changes] and applies them to [db.intermediateNodeDB]
// and [valueNodeBatch].
//
// assumes [db.lock] is held
func (db *merkleDB) applyChanges(ctx context.Context, valueNodeBatch *valueNodeBatch, changes *changeSummary) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.applyChanges")
defer span.End()

for key, nodeChange := range changes.nodes {
shouldAddIntermediate := nodeChange.after != nil && !nodeChange.after.hasValue()
shouldDeleteIntermediate := !shouldAddIntermediate && nodeChange.before != nil && !nodeChange.before.hasValue()
Expand All @@ -976,50 +1014,30 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error

if shouldAddIntermediate {
if err := db.intermediateNodeDB.Put(key, nodeChange.after); err != nil {
nodesSpan.End()
return err
}
} else if shouldDeleteIntermediate {
if err := db.intermediateNodeDB.Delete(key); err != nil {
nodesSpan.End()
return err
}
}

if shouldAddValue {
currentValueNodeBatch.Put(key, nodeChange.after)
valueNodeBatch.Put(key, nodeChange.after)
} else if shouldDeleteValue {
currentValueNodeBatch.Delete(key)
valueNodeBatch.Delete(key)
}
}
nodesSpan.End()

_, commitSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.valueNodeDBCommit")
err := currentValueNodeBatch.Write()
commitSpan.End()
if err != nil {
return err
}

db.history.record(changes)

// Update root in database.
db.root = changes.rootChange.after
db.rootID = changes.rootID
return nil
}

// moveChildViewsToDB removes any child views from the trieToCommit and moves them to the db
// assumes [db.lock] is held
func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) {
trieToCommit.validityTrackingLock.Lock()
defer trieToCommit.validityTrackingLock.Unlock()
// commitValueChanges is a thin wrapper around [valueNodeBatch.Write()] to
// provide tracing.
func (db *merkleDB) commitValueChanges(ctx context.Context, valueNodeBatch *valueNodeBatch) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitValueChanges")
defer span.End()

for _, childView := range trieToCommit.childViews {
childView.updateParent(db)
db.childViews = append(db.childViews, childView)
}
trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize)
return valueNodeBatch.Write()
}

// CommitToDB is a no-op for db since it is already in sync with itself.
Expand Down
6 changes: 3 additions & 3 deletions x/merkledb/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,13 @@ func (v *view) commitToDB(ctx context.Context) error {
))
defer span.End()

// Call this here instead of in [v.db.commitChanges]
// because doing so there would be a deadlock.
// Call this here instead of in [v.db.commitView] because doing so there
// would be a deadlock.
if err := v.applyValueChanges(ctx); err != nil {
return err
}

if err := v.db.commitChanges(ctx, v); err != nil {
if err := v.db.commitView(ctx, v); err != nil {
return err
}

Expand Down