Skip to content

Commit

Permalink
Breaking: rename highWaterMark to highWaterMarkBytes
Browse files Browse the repository at this point in the history
To remove a conflict with streams. Also adds documentation.

Ref Level/leveldown#468
Ref Level/community#70
  • Loading branch information
vweevers committed Feb 8, 2022
1 parent ad80e40 commit 2d49754
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 26 deletions.
35 changes: 33 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [`db.batch(operations[, options][, callback])`](#dbbatchoperations-options-callback)
- [`db.batch()`](#dbbatch)
- [`iterator = db.iterator([options])`](#iterator--dbiteratoroptions)
- [About high water](#about-high-water)
- [`keyIterator = db.keys([options])`](#keyiterator--dbkeysoptions)
- [`valueIterator = db.values([options])`](#valueiterator--dbvaluesoptions)
- [`db.clear([options][, callback])`](#dbclearoptions-callback)
Expand Down Expand Up @@ -367,9 +368,39 @@ The `gte` and `lte` range options take precedence over `gt` and `lt` respectivel
- `values` (boolean, default: `true`): whether to return the value of each entry. If set to `false`, the iterator will yield values that are `undefined`. Prefer to use `db.values()` instead.
- `keyEncoding`: custom key encoding for this iterator, used to encode range options, to encode `seek()` targets and to decode keys.
- `valueEncoding`: custom value encoding for this iterator, used to decode values.
- `fillCache` (boolean, default: `false`): If set to `true`, LevelDB will fill its in-memory [LRU](http://en.wikipedia.org/wiki/Least_Recently_Used) cache with data that was read.
- `fillCache` (boolean, default: `false`): if set to `true`, LevelDB will fill its in-memory [LRU](http://en.wikipedia.org/wiki/Least_Recently_Used) cache with data that was read.
- `highWaterMarkBytes` (number, default: `16 * 1024`): limit the amount of data that the iterator will hold in memory. Explained below.

> :pushpin: To instead consume data using Node.js streams, see [`level-read-stream`](https://github.com/Level/read-stream).
#### About high water

While [`iterator.nextv(size)`](#iteratornextvsize-options-callback) is reading entries from LevelDB into memory, it sums up the byte length of those entries. If and when that sum has exceeded `highWaterMarkBytes`, reading will stop. If `nextv(2)` would normally yield two entries but the first entry is too big, then only one entry will be yielded. More `nextv(size)` calls must then be made to get the remaining entries.

If memory usage is less of a concern, increasing `highWaterMarkBytes` can increase the throughput of `nextv(size)`. It can also be set to `0` which means `nextv(size)` will never yield more than one entry, as `highWaterMarkBytes` will be exceeded on each call. It can not be set to `Infinity`. On key- and value iterators, it applies to the byte length of keys or values respectively, rather than the combined byte length of keys _and_ values.

Optimal performance can be achieved by setting `highWaterMarkBytes` to at least `size` multiplied by the expected byte length of an entry, ensuring that `size` is always met. In other words, that `nextv(size)` will not stop reading before `size` amount of entries have been read into memory. If the iterator is wrapped in a [stream](https://github.com/Level/read-stream) then the `size` parameter is dictated by the stream's [`highWaterMark`](https://github.com/Level/read-stream#api) option. For example:

```js
const { EntryStream } = require('level-read-stream')

// If an entry is 50 bytes on average
const stream = new EntryStream(db, {
highWaterMark: 1000,
highWaterMarkBytes: 1000 * 50
})
```

Side note: the "watermark" analogy makes more sense in Node.js streams because its internal `highWaterMark` can grow, indicating the highest that the "water" has been. In a `classic-level` iterator however, `highWaterMarkBytes` is fixed once set. Getting exceeded does not change it.

The `highWaterMarkBytes` option also applies to an internal cache that `classic-level` employs for [`next()`](#iteratornextcallback) and [`for await...of`](#for-awaitof-iterator). When `next()` is called, that cache is populated with at most 1000 entries, or less than that if `highWaterMarkBytes` is exceeded by the sum of the byte length of entries that are waiting in the cache. To avoid reading too eagerly, the cache is not populated on the first `next()` call, or the first `next()` call after a `seek()`. Only on subsequent `next()` calls. This also applies to `for await...of`. In the following example the cache is never populated and for that reason, the `highWaterMarkBytes` option doesn't come into play.

```js
const lexint = require('lexicographic-integer-encoding')('hex')
const it = db.iterator({ keyEncoding: lexint })

for await (const [key, value] of it) {
it.seek(key + 2)
}
```

### `keyIterator = db.keys([options])`

Expand Down
13 changes: 6 additions & 7 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,14 +823,14 @@ struct Iterator final : public BaseIterator {
const bool fillCache,
const bool keyAsBuffer,
const bool valueAsBuffer,
const uint32_t highWaterMark)
const uint32_t highWaterMarkBytes)
: BaseIterator(database, reverse, lt, lte, gt, gte, limit, fillCache),
id_(id),
keys_(keys),
values_(values),
keyAsBuffer_(keyAsBuffer),
valueAsBuffer_(valueAsBuffer),
highWaterMark_(highWaterMark),
highWaterMarkBytes_(highWaterMarkBytes),
first_(true),
nexting_(false),
isClosing_(false),
Expand Down Expand Up @@ -877,7 +877,7 @@ struct Iterator final : public BaseIterator {
bytesRead += v.size();
}

if (bytesRead > highWaterMark_ || cache_.size() >= size) {
if (bytesRead > highWaterMarkBytes_ || cache_.size() >= size) {
return true;
}
}
Expand All @@ -890,7 +890,7 @@ struct Iterator final : public BaseIterator {
const bool values_;
const bool keyAsBuffer_;
const bool valueAsBuffer_;
const uint32_t highWaterMark_;
const uint32_t highWaterMarkBytes_;
bool first_;
bool nexting_;
bool isClosing_;
Expand Down Expand Up @@ -1624,8 +1624,7 @@ NAPI_METHOD(iterator_init) {
const bool keyAsBuffer = EncodingIsBuffer(env, options, "keyEncoding");
const bool valueAsBuffer = EncodingIsBuffer(env, options, "valueEncoding");
const int limit = Int32Property(env, options, "limit", -1);
const uint32_t highWaterMark = Uint32Property(env, options, "highWaterMark",
16 * 1024);
const uint32_t highWaterMarkBytes = Uint32Property(env, options, "highWaterMarkBytes", 16 * 1024);

std::string* lt = RangeOption(env, options, "lt");
std::string* lte = RangeOption(env, options, "lte");
Expand All @@ -1635,7 +1634,7 @@ NAPI_METHOD(iterator_init) {
const uint32_t id = database->currentIteratorId_++;
Iterator* iterator = new Iterator(database, id, reverse, keys,
values, limit, lt, lte, gt, gte, fillCache,
keyAsBuffer, valueAsBuffer, highWaterMark);
keyAsBuffer, valueAsBuffer, highWaterMarkBytes);
napi_value result;

NAPI_STATUS_THROWS(napi_create_external(env, iterator,
Expand Down
5 changes: 5 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ export interface AdditionalIteratorOptions {
* @defaultValue `false`
*/
fillCache?: boolean | undefined

/**
* Limit the amount of data that the iterator will hold in memory.
*/
highWaterMarkBytes?: number | undefined
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"prebuild-win32-x64": "prebuildify -t 8.14.0 --napi --strip"
},
"dependencies": {
"abstract-level": "^1.0.0",
"abstract-level": "^1.0.1",
"catering": "^2.1.0",
"module-error": "^1.0.1",
"napi-macros": "~2.0.0",
Expand Down
18 changes: 9 additions & 9 deletions test/cleanup-hanging-iterators-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ makeTest('test likely-closed iterator', function (db, t, done) {
})

makeTest('test non-closed iterator', function (db, t, done) {
// Same as the test above but with a highWaterMark of 0 so that we don't
// Same as the test above but with a highWaterMarkBytes of 0 so that we don't
// preemptively fetch all records, to ensure that the iterator is still
// active when we (attempt to) close the database.
const it = db.iterator({ highWaterMark: 0 })
const it = db.iterator({ highWaterMarkBytes: 0 })

it.next(function (err, key, value) {
t.ifError(err, 'no error from next()')
Expand All @@ -57,10 +57,10 @@ makeTest('test multiple likely-closed iterators', function (db, t, done) {
})

makeTest('test multiple non-closed iterators', function (db, t, done) {
// Same as the test above but with a highWaterMark of 0.
// Same as the test above but with a highWaterMarkBytes of 0.
for (let i = 0; i < repeats; i++) {
db.iterator({ highWaterMark: 0 })
db.iterator({ highWaterMark: 0 }).next(function () {})
db.iterator({ highWaterMarkBytes: 0 })
db.iterator({ highWaterMarkBytes: 0 }).next(function () {})
}

setTimeout(done, Math.floor(Math.random() * 50))
Expand All @@ -70,8 +70,8 @@ global.gc && makeTest('test multiple non-closed iterators with forced gc', funct
// Same as the test above but with forced GC, to test that the lifespan of an
// iterator is tied to *both* its JS object and whether the iterator was closed.
for (let i = 0; i < repeats; i++) {
db.iterator({ highWaterMark: 0 })
db.iterator({ highWaterMark: 0 }).next(function () {})
db.iterator({ highWaterMarkBytes: 0 })
db.iterator({ highWaterMarkBytes: 0 }).next(function () {})
}

setTimeout(function () {
Expand All @@ -95,7 +95,7 @@ makeTest('test closing iterators', function (db, t, done) {

makeTest('test recursive next', function (db, t, done) {
// Test that we're able to close when user keeps scheduling work
const it = db.iterator({ highWaterMark: 0 })
const it = db.iterator({ highWaterMarkBytes: 0 })

it.next(function loop (err, key) {
if (err && err.code !== 'LEVEL_ITERATOR_NOT_OPEN') throw err
Expand All @@ -107,7 +107,7 @@ makeTest('test recursive next', function (db, t, done) {

makeTest('test recursive next (random)', function (db, t, done) {
// Same as the test above but closing at a random time
const it = db.iterator({ highWaterMark: 0 })
const it = db.iterator({ highWaterMarkBytes: 0 })

it.next(function loop (err, key) {
if (err && err.code !== 'LEVEL_ITERATOR_NOT_OPEN') throw err
Expand Down
6 changes: 3 additions & 3 deletions test/iterator-gc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ test('db without ref does not get GCed while iterating', function (t) {
db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

// Set highWaterMark to 0 so that we don't preemptively fetch.
const it = db.iterator({ highWaterMark: 0 })
// Set highWaterMarkBytes to 0 so that we don't preemptively fetch.
const it = db.iterator({ highWaterMarkBytes: 0 })

// Remove reference
db = null
Expand All @@ -39,7 +39,7 @@ test('db without ref does not get GCed while iterating', function (t) {
iterate(it)
} else {
// But a timeout usually also allows GC to kick in. If not, the time
// between iterator ticks might. That's when "highWaterMark: 0" helps.
// between iterator ticks might. That's when "highWaterMarkBytes: 0" helps.
setTimeout(iterate.bind(null, it), 1000)
}
})
Expand Down
63 changes: 63 additions & 0 deletions test/iterator-hwm-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
'use strict'

const test = require('tape')
const testCommon = require('./common')

let db

test('highWaterMarkBytes setup', async function (t) {
db = testCommon.factory()

// Write 8 bytes
return db.batch().put('a', '0').put('b', '1').put('c', '2').put('d', '3').write()
})

test('highWaterMarkBytes limits byte length of nextv() entries', async function (t) {
const hwm = async (highWaterMarkBytes) => {
const it = db.iterator({ highWaterMarkBytes })
const entries = await it.nextv(1e3)
await it.close()
return entries
}

t.same(await hwm(0), [['a', '0']], 'accepts 0')
t.same(await hwm(Infinity), [['a', '0']], 'Infinity is interpreted as 0 (by Node-API)')
t.same(await hwm(1), [['a', '0']], 'is limited')
t.same(await hwm(2), [['a', '0'], ['b', '1']], 'highWaterMarkBytes must be exceeded, not met')
})

test('highWaterMarkBytes limits byte length of internal next() cache', async function (t) {
const hwm = async (highWaterMarkBytes) => {
const it = db.iterator({ highWaterMarkBytes })

// Because initial next() calls don't cache, make two calls
await it.next()
await it.next()

const count = 1 + it.cached
await it.close()

// Return how many bytes were retrieved natively by the second call
return count * 2
}

t.is(await hwm(0), 2, 'accepts 0')
t.is(await hwm(Infinity), 2, 'Infinity is interpreted as 0 (by Node-API)')
t.is(await hwm(1), 2, 'is limited')
t.is(await hwm(2), 4, 'highWaterMarkBytes must be exceeded, not met')
t.is(await hwm(9), 6, 'double-check that previous test did apply a limit')
})

test('highWaterMarkBytes does not affect byte length of all() entries', async function (t) {
const hwm = async (highWaterMarkBytes) => {
// Note: setting hwm does make all() slower, as it uses nextv() atm
return db.iterator({ highWaterMarkBytes }).all()
}

t.same(await hwm(0), [['a', '0'], ['b', '1'], ['c', '2'], ['d', '3']])
t.same(await hwm(1), [['a', '0'], ['b', '1'], ['c', '2'], ['d', '3']])
})

test('highWaterMarkBytes teardown', async function (t) {
return db.close()
})
2 changes: 1 addition & 1 deletion test/iterator-recursion-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ test('setUp db', function (t) {

test('iterate over a large iterator with a large watermark', function (t) {
const iterator = db.iterator({
highWaterMark: 10000000
highWaterMarkBytes: 10000000
})
const read = function () {
iterator.next(function (err, key, value) {
Expand Down
6 changes: 3 additions & 3 deletions test/iterator-starvation-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ test('iterator does not starve event loop', function (t) {
db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

// Set a high highWaterMark to fill up the cache entirely
const it = db.iterator({ highWaterMark: Math.pow(1024, 3) })
// Set a high highWaterMarkBytes to fill up the cache entirely
const it = db.iterator({ highWaterMarkBytes: Math.pow(1024, 3) })

let breaths = 0
let entries = 0
Expand Down Expand Up @@ -77,7 +77,7 @@ test('iterator with seeks does not starve event loop', function (t) {
db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

const it = db.iterator({ highWaterMark: Math.pow(1024, 3), limit: sourceData.length })
const it = db.iterator({ highWaterMarkBytes: Math.pow(1024, 3), limit: sourceData.length })

let breaths = 0
let entries = 0
Expand Down

0 comments on commit 2d49754

Please sign in to comment.