Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of iterator end #56

Merged
merged 1 commit into from Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -721,7 +721,7 @@ A reference to the database that created this iterator.

#### `iterator.count`

Read-only getter that indicates how many keys have been yielded so far (by any method) excluding calls that errored or yielded `undefined`.
Read-only getter that indicates how many entries have been yielded so far (by any method) excluding calls that errored or yielded `undefined`.

#### `iterator.limit`

Expand Down
48 changes: 38 additions & 10 deletions abstract-iterator.js
Expand Up @@ -17,6 +17,7 @@ const kKeys = Symbol('keys')
const kValues = Symbol('values')
const kLimit = Symbol('limit')
const kCount = Symbol('count')
const kEnded = Symbol('ended')

// This class is an internal utility for common functionality between AbstractIterator,
// AbstractKeyIterator and AbstractValueIterator. It's not exported.
Expand All @@ -40,6 +41,10 @@ class CommonIterator {
this[kCount] = 0
this[kSignal] = options.signal != null ? options.signal : null

// Ending means reaching the natural end of the data and (unlike closing) that can
// be reset by seek(), unless the limit was reached.
this[kEnded] = false

this.db = db
this.db.attachResource(this)
}
Expand All @@ -56,21 +61,25 @@ class CommonIterator {
startWork(this)

try {
if (this[kCount] >= this[kLimit]) {
if (this[kEnded] || this[kCount] >= this[kLimit]) {
this[kEnded] = true
return undefined
}

let item = await this._next()

if (item === undefined) {
this[kEnded] = true
return undefined
}

try {
if (item !== undefined) {
item = this[kDecodeOne](item)
this[kCount]++
}
item = this[kDecodeOne](item)
} catch (err) {
throw new IteratorDecodeError(err)
}

this[kCount]++
return item
} finally {
endWork(this)
Expand All @@ -92,10 +101,18 @@ class CommonIterator {
startWork(this)

try {
if (size <= 0) return []
if (this[kEnded] || size <= 0) {
this[kEnded] = true
return []
}

const items = await this._nextv(size, options)

if (items.length === 0) {
this[kEnded] = true
return items
}

try {
this[kDecodeMany](items)
} catch (err) {
Expand All @@ -112,10 +129,16 @@ class CommonIterator {
async _nextv (size, options) {
const acc = []

let item
while (acc.length < size) {
const item = await this._next(options)

while (acc.length < size && (item = await this._next(options)) !== undefined) {
acc.push(item)
if (item !== undefined) {
acc.push(item)
} else {
// Must track this here because we're directly calling _next()
this[kEnded] = true
break
}
}

return acc
Expand All @@ -126,7 +149,7 @@ class CommonIterator {
startWork(this)

try {
if (this[kCount] >= this[kLimit]) {
if (this[kEnded] || this[kCount] >= this[kLimit]) {
return []
}

Expand All @@ -144,6 +167,8 @@ class CommonIterator {
endWork(this)
await destroy(this, err)
} finally {
this[kEnded] = true

if (this[kWorking]) {
endWork(this)
await this.close()
Expand Down Expand Up @@ -196,6 +221,9 @@ class CommonIterator {

const mapped = this.db.prefixKey(keyEncoding.encode(target), keyFormat, false)
this._seek(mapped, options)

// If _seek() was successfull, more data may be available.
this[kEnded] = false
}
}

Expand Down
38 changes: 38 additions & 0 deletions test/iterator-seek-test.js
Expand Up @@ -132,6 +132,44 @@ exports.seek = function (test, testCommon) {
return db.close()
})

test(`${mode}().seek() can be used to iterate twice`, async function (t) {
const db = testCommon.factory()
await db.batch(testData())
const it = db[mode]()

t.same(await it.nextv(10), [['one', '1'], ['three', '3'], ['two', '2']].map(mapEntry), 'match')
t.same(await it.nextv(10), [], 'end of iterator')

it.seek('one')

t.same(await it.nextv(10), [['one', '1'], ['three', '3'], ['two', '2']].map(mapEntry), 'match again')
t.same(await it.nextv(10), [], 'end of iterator again')

await it.close()
return db.close()
})

test(`${mode}().seek() can be used to iterate twice, within limit`, async function (t) {
const db = testCommon.factory()
await db.batch(testData())
const limit = 4
const it = db[mode]({ limit })

t.same(await it.nextv(10), [['one', '1'], ['three', '3'], ['two', '2']].map(mapEntry), 'match')
t.same(await it.nextv(10), [], 'end of iterator')

it.seek('one')

t.same(await it.nextv(10), [['one', '1']].map(mapEntry), 'limit reached')
t.same(await it.nextv(10), [], 'end of iterator')

it.seek('one')
t.same(await it.nextv(10), [], 'does not reset after limit has been reached')

await it.close()
return db.close()
})

if (testCommon.supports.snapshots) {
for (const reverse of [false, true]) {
for (const deferred of [false, true]) {
Expand Down
20 changes: 20 additions & 0 deletions test/iterator-test.js
Expand Up @@ -270,6 +270,16 @@ exports.iterator = function (test, testCommon) {
await it.close()
})

test(`${mode}().nextv() honors limit and size`, async function (t) {
const it = db[mode]({ limit: 2 })

t.same(await it.nextv(1), [['foobatch1', 'bar1']].map(mapEntry))
t.same(await it.nextv(10), [['foobatch2', 'bar2']].map(mapEntry))
t.same(await it.nextv(10), [])

await it.close()
})

test(`${mode}().nextv() honors limit in reverse`, async function (t) {
const it = db[mode]({ limit: 2, reverse: true })

Expand All @@ -279,6 +289,16 @@ exports.iterator = function (test, testCommon) {
await it.close()
})

test(`${mode}().nextv() honors limit and size in reverse`, async function (t) {
const it = db[mode]({ limit: 2, reverse: true })

t.same(await it.nextv(1), [['foobatch3', 'bar3']].map(mapEntry))
t.same(await it.nextv(10), [['foobatch2', 'bar2']].map(mapEntry))
t.same(await it.nextv(10), [])

await it.close()
})

test(`${mode}().all()`, async function (t) {
t.same(await db[mode]().all(), [
['foobatch1', 'bar1'],
Expand Down
7 changes: 5 additions & 2 deletions test/self/abstract-iterator-test.js
Expand Up @@ -11,6 +11,10 @@ const testCommon = require('../common')({
})

for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator]) {
// Note, these tests don't create fully functional iterators, because they're not
// created via db.iterator() and therefore lack the options necessary to decode data.
// Not relevant for these tests.

test(`test ${Ctor.name} extensibility`, function (t) {
const Test = class TestIterator extends Ctor {}
const db = testCommon.factory()
Expand Down Expand Up @@ -67,7 +71,7 @@ for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator
})

test(`${Ctor.name}.nextv() extensibility`, async function (t) {
t.plan(4 * 2)
t.plan(4)

class TestIterator extends Ctor {
async _nextv (size, options) {
Expand All @@ -83,7 +87,6 @@ for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator
await db.open()
const it = new TestIterator(db, {})
await it.nextv(100)
await it.nextv(100, {})
await db.close()
})

Expand Down
131 changes: 131 additions & 0 deletions test/self/iterator-test.js
Expand Up @@ -71,6 +71,41 @@ for (const deferred of [false, true]) {
if (deferred) await db.open()
})

test(`${mode}().next() skips _next() if it previously signaled end (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
class MockLevel extends AbstractLevel {
[privateMethod] (options) {
return new MockIterator(this, options)
}
}

let calls = 0

class MockIterator extends Ctor {
async _next () {
if (calls++) return undefined

if (mode === 'iterator' || def) {
return ['a', 'a']
} else {
return 'a'
}
}
}

const db = new MockLevel(utf8Manifest)
if (!deferred) await db.open()
const it = db[publicMethod]()

t.same(await it.next(), mode === 'iterator' ? ['a', 'a'] : 'a')
t.is(calls, 1, 'got one _next() call')

t.is(await it.next(), undefined)
t.is(calls, 2, 'got another _next() call')

t.is(await it.next(), undefined)
t.is(calls, 2, 'not called again')
})

for (const limit of [2, 0]) {
test(`${mode}().next() skips _next() when limit ${limit} is reached (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
class MockLevel extends AbstractLevel {
Expand Down Expand Up @@ -184,6 +219,41 @@ for (const deferred of [false, true]) {
})
}

test(`${mode}().nextv() skips _nextv() if it previously signaled end (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
class MockLevel extends AbstractLevel {
[privateMethod] (options) {
return new MockIterator(this, options)
}
}

let calls = 0

class MockIterator extends Ctor {
async _nextv () {
if (calls++) return []

if (mode === 'iterator' || def) {
return [['a', 'a']]
} else {
return ['a']
}
}
}

const db = new MockLevel(utf8Manifest)
if (!deferred) await db.open()
const it = db[publicMethod]()

t.same(await it.nextv(100), [mode === 'iterator' ? ['a', 'a'] : 'a'])
t.is(calls, 1, 'got one _nextv() call')

t.same(await it.nextv(100), [])
t.is(calls, 2, 'got another _nextv() call')

t.same(await it.nextv(100), [])
t.is(calls, 2, 'not called again')
})

test(`${mode}().nextv() reduces size for _nextv() when near limit (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
class MockLevel extends AbstractLevel {
[privateMethod] (options) {
Expand Down Expand Up @@ -615,6 +685,38 @@ for (const deferred of [false, true]) {
}
})

test(`${mode}() default nextv() stops when natural end is reached (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
let calls = 0

class MockLevel extends AbstractLevel {
[privateMethod] (options) {
return new MockIterator(this, options)
}
}

class MockIterator extends Ctor {
async _next () {
if (calls++) return undefined

if (mode === 'iterator' || def) {
return ['a', 'a']
} else {
return 'a'
}
}
}

const db = new MockLevel(utf8Manifest)
if (!deferred) await db.open()
const it = await db[publicMethod]()

t.same(await it.nextv(10), [mode === 'iterator' ? ['a', 'a'] : 'a'])
t.is(calls, 2)

t.same(await it.nextv(10), [], 'ended')
t.is(calls, 2, 'not called again')
})

test(`${mode}() has default all() (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
t.plan(8)

Expand Down Expand Up @@ -685,6 +787,35 @@ for (const deferred of [false, true]) {
}
})

test(`${mode}() default all() stops when limit is reached (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
t.plan(2)
let calls = 0

class MockLevel extends AbstractLevel {
[privateMethod] (options) {
return new MockIterator(this, options)
}
}

class MockIterator extends Ctor {
async _nextv (size, options) {
calls++
if (mode === 'iterator' || def) {
return [[String(calls), String(calls)]]
} else {
return [String(calls)]
}
}
}

const db = new MockLevel(utf8Manifest)
if (!deferred) await db.open()

const items = await db[publicMethod]({ limit: 2 }).all()
t.is(items.length, 2)
t.is(calls, 2)
})

test(`${mode}() custom all() (deferred: ${deferred}, default implementation: ${def})`, async function (t) {
t.plan(3)

Expand Down