Skip to content
This repository has been archived by the owner on Jan 19, 2021. It is now read-only.

Commit

Permalink
refactor walk trie
Browse files Browse the repository at this point in the history
introduce WalkStrategy
fix checkpoint benchmark
  • Loading branch information
jochem-brouwer committed Nov 28, 2020
1 parent 3be5389 commit 480a072
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 11 deletions.
1 change: 1 addition & 0 deletions benchmarks/checkpointing.ts
Expand Up @@ -27,6 +27,7 @@ const iterTest = async (numOfIter: number): Promise<Array<number>> => {
}
await trie.commit()
}
return process.hrtime(hrstart)
}

const go = async () => {
Expand Down
21 changes: 12 additions & 9 deletions src/baseTrie.ts
Expand Up @@ -3,8 +3,8 @@ import { LevelUp } from 'levelup'
import { keccak, KECCAK256_RLP } from 'ethereumjs-util'
import { DB, BatchDBOp, PutBatch } from './db'
import { TrieReadStream as ReadStream } from './readStream'
import { PrioritizedTaskExecutor } from './prioritizedTaskExecutor'
import { bufferToNibbles, matchingNibbleLength, doKeysMatch } from './util/nibbles'
import WalkStrategy from './util/walkStrategy'
import {
TrieNode,
decodeNode,
Expand All @@ -26,11 +26,11 @@ interface Path {
stack: TrieNode[]
}

type FoundNodeFunction = (
export type FoundNodeFunction = (
nodeRef: Buffer,
node: TrieNode,
key: Nibbles,
walkController: any
walkController: WalkStrategy
) => void

/**
Expand Down Expand Up @@ -175,7 +175,8 @@ export class Trie {
resolve({ node: null, remaining: keyRemainder, stack })
} else {
// node found, continuing search
await walkController.only(branchIndex)
// this can be optimized as this calls getBranch again.
await walkController.onlyBranchIndex(node, keyProgress, branchIndex)
}
}
} else if (node instanceof LeafNode) {
Expand All @@ -193,7 +194,7 @@ export class Trie {
resolve({ node: null, remaining: keyRemainder, stack })
} else {
// keys match, continue search
await walkController.next()
await walkController.allChildren(node, keyProgress)
}
}
}
Expand All @@ -214,8 +215,10 @@ export class Trie {
* @returns Resolves when finished walking trie.
*/
async _walkTrie(root: Buffer, onFound: FoundNodeFunction): Promise<void> {
await WalkStrategy.newWalk(onFound, this, root)

// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve) => {
/*return new Promise(async (resolve) => {
const self = this
root = root || this.root
Expand Down Expand Up @@ -303,7 +306,7 @@ export class Trie {
} else {
resolve()
}
})
})*/
}

/**
Expand Down Expand Up @@ -760,7 +763,7 @@ export class Trie {
async _findDbNodes(onFound: FoundNodeFunction): Promise<void> {
const outerOnFound: FoundNodeFunction = async (nodeRef, node, key, walkController) => {
if (isRawNode(nodeRef)) {
await walkController.next()
await walkController.allChildren(node, key)
} else {
onFound(nodeRef, node, key, walkController)
}
Expand All @@ -786,7 +789,7 @@ export class Trie {
onFound(nodeRef, node, fullKey, walkController)
} else {
// keep looking for value nodes
await walkController.next()
await walkController.allChildren(node, key)
}
}
await this._walkTrie(this.root, outerOnFound)
Expand Down
2 changes: 1 addition & 1 deletion src/readStream.ts
Expand Up @@ -23,7 +23,7 @@ export class TrieReadStream extends Readable {
key: nibblesToBuffer(key),
value: node.value,
})
await walkController.next()
await walkController.allChildren(node, key)
})
this.push(null)
}
Expand Down
2 changes: 1 addition & 1 deletion src/scratchReadStream.ts
Expand Up @@ -25,7 +25,7 @@ export class ScratchReadStream extends Readable {
key: nodeRef,
value: node.serialize(),
})
await walkController.next()
await walkController.allChildren(node, key)
})
this.push(null)
}
Expand Down
123 changes: 123 additions & 0 deletions src/util/walkStrategy.ts
@@ -0,0 +1,123 @@
import { BaseTrie } from '..'
import { FoundNodeFunction } from '../baseTrie'
import { PrioritizedTaskExecutor } from '../prioritizedTaskExecutor'
import { BranchNode, ExtensionNode, LeafNode, Nibbles, TrieNode } from '../trieNode'

export default class WalkStrategy {
readonly onNode: FoundNodeFunction
readonly taskExecutor: PrioritizedTaskExecutor
readonly trie: BaseTrie
private resolve: Function

/**
* Creates a new WalkStrategy
* @param onNode - The `FoundNodeFunction` to call if a node is found
* @param trie - The `Trie` to walk on
*/
private constructor(onNode: FoundNodeFunction, trie: BaseTrie, poolSize: number) {
this.onNode = onNode
this.taskExecutor = new PrioritizedTaskExecutor(poolSize)
this.trie = trie
this.resolve = () => {}
}

/**
* Async function to create and start a new walk over a trie.
* @param onNode - The `FoundNodeFunction to call if a node is found
* @param trie - The trie to walk on
* @param root - The root key to walk on
* @param poolSize - Task execution pool size to prevent OOM errors. Defaults to 500.
*/
static async newWalk(
onNode: FoundNodeFunction,
trie: BaseTrie,
root: Buffer,
poolSize?: number
): Promise<void> {
const strategy = new WalkStrategy(onNode, trie, poolSize ?? 500)
await strategy.startWalk(root)
}

startWalk(root: Buffer): Promise<void> {
return new Promise(async (resolve) => {
this.resolve = resolve
const node = await this.trie._lookupNode(root)
this.processNode(root, node as TrieNode, [])
})
}

/**
* Run all children of a node. Priority of these nodes are the key length of the children
* @param node - Node to get all children of and call onNode on
* @param key - The current `key` which would yield the `node` when trying to get this node with a `get` operation.
*/
allChildren(node: TrieNode, key: Nibbles = []) {
if (node instanceof LeafNode) {
if (this.taskExecutor.finished()) {
this.resolve()
}
return
}
let children
if (node instanceof ExtensionNode) {
children = [[node.key, node.value]]
} else if (node instanceof BranchNode) {
children = node.getChildren().map((b) => [[b[0]], b[1]])
}
if (!children) {
// Node has no children
return this.resolve()
}
for (const child of children) {
const keyExtension = child[0] as Nibbles
const childRef = child[1] as Buffer
const childKey = key.concat(keyExtension)
const priority = childKey.length
this.pushNode(childRef, childKey, priority)
}
}

/**
*
* @param nodeRef - Push a node reference to the event queue. This reference is a 32-byte keccak hash of the value corresponding to the `key`.
* @param key - The current key.
* @param priority - Optional priority, defaults to key length
*/
pushNode(nodeRef: Buffer, key: Nibbles = [], priority?: number) {
this.taskExecutor.execute(priority ?? key.length, async (taskCallback: Function) => {
const childNode = await this.trie._lookupNode(nodeRef)
taskCallback()
this.processNode(nodeRef as Buffer, childNode as TrieNode, key)
})
}

/**
* Push a branch of a certain BranchNode to the event queue
* @param node - The node to select a branch on. Should be a BranchNode
* @param key - The current key which leads to the corresponding node
* @param childIndex - The child index to add to the event queue
* @param priority - Optional priority of the event, defaults to the total key length
*/
async onlyBranchIndex(node: TrieNode, key: Nibbles = [], childIndex: number, priority?: number) {
if (!(node instanceof BranchNode)) {
// TODO check if we can remove this and default to `BranchNode` in function sig
throw new Error('Expected branch node')
}
const childRef = node.getBranch(childIndex)
if (!childRef) {
throw new Error('Could not get branch of childIndex')
}
const childKey = key.slice() // This copies the key to a new array.
childKey.push(childIndex)
const prio = priority ?? childKey.length
this.pushNode(childRef as Buffer, childKey, prio)
}

private processNode(nodeRef: Buffer, node: TrieNode, key: Nibbles = []) {
if (node) {
this.onNode(nodeRef, node, key, this)
} else {
this.resolve()
}
}
}

0 comments on commit 480a072

Please sign in to comment.