Skip to content

Commit

Permalink
fix: stream iterator functions
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Apr 6, 2024
1 parent 0910865 commit 939a185
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/commondao/common.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {

const stream = this.cfg.db.streamQuery<DBM>(q, opt)
const partialQuery = !!q._selectedFieldNames
if (partialQuery) return stream
if (partialQuery) return stream as any

// This almost works, but hard to implement `errorMode: THROW_AGGREGATED` in this case
// return stream.flatMap(async (dbm: DBM) => {
Expand Down Expand Up @@ -551,7 +551,7 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {
// Experimental: using `.map()`
const stream: ReadableTyped<string> = this.cfg.db
.streamQuery<DBM>(q.select(['id']), opt)
.on('error', err => stream.emit('error', err))
// .on('error', err => stream.emit('error', err))
.map((r: ObjectWithId) => r.id)

// const stream: ReadableTyped<string> = this.cfg.db
Expand Down
18 changes: 10 additions & 8 deletions src/kv/commonKeyValueDao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ export class CommonKeyValueDao<T> {
const { mapBufferToValue } = this.cfg.hooks

if (!mapBufferToValue) {
return this.cfg.db.streamValues(this.cfg.table, limit)
return this.cfg.db.streamValues(this.cfg.table, limit) as ReadableTyped<T>
}

const stream: ReadableTyped<T> = this.cfg.db
.streamValues(this.cfg.table, limit)
.on('error', err => stream.emit('error', err))
.flatMap(async (buf: Buffer) => {
// .on('error', err => stream.emit('error', err))
.flatMap(async buf => {
try {
return [await mapBufferToValue(buf)] satisfies T[]
return [await mapBufferToValue(buf)]
} catch (err) {
this.cfg.logger.error(err)
return [] // SKIP
Expand All @@ -223,15 +223,17 @@ export class CommonKeyValueDao<T> {
const { mapBufferToValue } = this.cfg.hooks

if (!mapBufferToValue) {
return this.cfg.db.streamEntries(this.cfg.table, limit)
return this.cfg.db.streamEntries(this.cfg.table, limit) as ReadableTyped<
KeyValueTuple<string, T>
>
}

const stream: ReadableTyped<KeyValueTuple<string, T>> = this.cfg.db
.streamEntries(this.cfg.table, limit)
.on('error', err => stream.emit('error', err))
.flatMap(async ([id, buf]: KeyValueTuple<string, Buffer>) => {
// .on('error', err => stream.emit('error', err))
.flatMap(async ([id, buf]) => {
try {
return [[id, await mapBufferToValue(buf)]] satisfies KeyValueTuple<string, T>[]
return [[id, await mapBufferToValue(buf)]]
} catch (err) {
this.cfg.logger.error(err)
return [] // SKIP
Expand Down
4 changes: 2 additions & 2 deletions src/testing/daoTest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Readable } from 'node:stream'
import { _deepCopy, _pick, _sortBy, _omit, localTimeNow } from '@naturalcycles/js-lib'
import { _pipeline, readableToArray } from '@naturalcycles/nodejs-lib'
import { _pipeline } from '@naturalcycles/nodejs-lib'
import { CommonDaoLogLevel, DBQuery } from '..'
import { CommonDB } from '../common.db'
import { CommonDao } from '../commondao/common.dao'
Expand Down Expand Up @@ -212,7 +212,7 @@ export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQui
})

test('streamQueryIds all', async () => {
let ids = await readableToArray(dao.query().streamQueryIds())
let ids = await dao.query().streamQueryIds().toArray()
ids = ids.sort()
expectMatch(
expectedItems.map(i => i.id),
Expand Down
3 changes: 1 addition & 2 deletions src/testing/dbTest.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { _filterObject, _pick, _sortBy, pMap } from '@naturalcycles/js-lib'
import { readableToArray } from '@naturalcycles/nodejs-lib'
import { CommonDB, CommonDBType } from '../common.db'
import { DBIncrement, DBPatch } from '../db.model'
import { DBQuery } from '../query/dbQuery'
Expand Down Expand Up @@ -220,7 +219,7 @@ export function runCommonDBTest(db: CommonDB, quirks: CommonDBImplementationQuir
// STREAM
if (support.streaming) {
test('streamQuery all', async () => {
let rows = await readableToArray(db.streamQuery(queryAll()))
let rows = await db.streamQuery(queryAll()).toArray()

rows = _sortBy(rows, r => r.id) // cause order is not specified in DBQuery
expectMatch(items, rows, quirks)
Expand Down
13 changes: 6 additions & 7 deletions src/testing/keyValueDBTest.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { _range, _sortBy } from '@naturalcycles/js-lib'
import { readableToArray } from '@naturalcycles/nodejs-lib'
import { CommonKeyValueDB, KeyValueDBTuple } from '../kv/commonKeyValueDB'
import { TEST_TABLE } from './test.model'

Expand Down Expand Up @@ -42,40 +41,40 @@ export function runCommonKeyValueDBTest(db: CommonKeyValueDB): void {
})

test('streamIds', async () => {
const ids = await readableToArray(db.streamIds(TEST_TABLE))
const ids = await db.streamIds(TEST_TABLE).toArray()
ids.sort()
expect(ids).toEqual(testIds)
})

test('streamIds limited', async () => {
const idsLimited = await readableToArray(db.streamIds(TEST_TABLE, 2))
const idsLimited = await db.streamIds(TEST_TABLE, 2).toArray()
// Order is non-deterministic, so, cannot compare values
// idsLimited.sort()
// expect(idsLimited).toEqual(testIds.slice(0, 2))
expect(idsLimited.length).toBe(2)
})

test('streamValues', async () => {
const values = await readableToArray(db.streamValues(TEST_TABLE))
const values = await db.streamValues(TEST_TABLE).toArray()
values.sort()
expect(values).toEqual(testEntries.map(e => e[1]))
})

test('streamValues limited', async () => {
const valuesLimited = await readableToArray(db.streamValues(TEST_TABLE, 2))
const valuesLimited = await db.streamValues(TEST_TABLE, 2).toArray()
// valuesLimited.sort()
// expect(valuesLimited).toEqual(testEntries.map(e => e[1]).slice(0, 2))
expect(valuesLimited.length).toBe(2)
})

test('streamEntries', async () => {
const entries = await readableToArray(db.streamEntries(TEST_TABLE))
const entries = await db.streamEntries(TEST_TABLE).toArray()
entries.sort()
expect(entries).toEqual(testEntries)
})

test('streamEntries limited', async () => {
const entriesLimited = await readableToArray(db.streamEntries(TEST_TABLE, 2))
const entriesLimited = await db.streamEntries(TEST_TABLE, 2).toArray()
// entriesLimited.sort()
// expect(entriesLimited).toEqual(testEntries.slice(0, 2))
expect(entriesLimited.length).toBe(2)
Expand Down
13 changes: 6 additions & 7 deletions src/testing/keyValueDaoTest.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { _range, _sortBy } from '@naturalcycles/js-lib'
import { readableToArray } from '@naturalcycles/nodejs-lib'
import { KeyValueDBTuple } from '../kv/commonKeyValueDB'
import { CommonKeyValueDao } from '../kv/commonKeyValueDao'

Expand Down Expand Up @@ -33,40 +32,40 @@ export function runCommonKeyValueDaoTest(dao: CommonKeyValueDao<Buffer>): void {
})

test('streamIds', async () => {
const ids = await readableToArray(dao.streamIds())
const ids = await dao.streamIds().toArray()
ids.sort()
expect(ids).toEqual(testIds)
})

test('streamIds limited', async () => {
const idsLimited = await readableToArray(dao.streamIds(2))
const idsLimited = await dao.streamIds(2).toArray()
// Order is non-deterministic, so, cannot compare values
// idsLimited.sort()
// expect(idsLimited).toEqual(testIds.slice(0, 2))
expect(idsLimited.length).toBe(2)
})

test('streamValues', async () => {
const values = await readableToArray(dao.streamValues())
const values = await dao.streamValues().toArray()
values.sort()
expect(values).toEqual(testEntries.map(e => e[1]))
})

test('streamValues limited', async () => {
const valuesLimited = await readableToArray(dao.streamValues(2))
const valuesLimited = await dao.streamValues(2).toArray()
// valuesLimited.sort()
// expect(valuesLimited).toEqual(testEntries.map(e => e[1]).slice(0, 2))
expect(valuesLimited.length).toBe(2)
})

test('streamEntries', async () => {
const entries = await readableToArray(dao.streamEntries())
const entries = await dao.streamEntries().toArray()
entries.sort()
expect(entries).toEqual(testEntries)
})

test('streamEntries limited', async () => {
const entriesLimited = await readableToArray(dao.streamEntries(2))
const entriesLimited = await dao.streamEntries(2).toArray()
// entriesLimited.sort()
// expect(entriesLimited).toEqual(testEntries.slice(0, 2))
expect(entriesLimited.length).toBe(2)
Expand Down
6 changes: 3 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -879,9 +879,9 @@
zod "^3.20.2"

"@naturalcycles/nodejs-lib@^13.0.1", "@naturalcycles/nodejs-lib@^13.0.2", "@naturalcycles/nodejs-lib@^13.1.1":
version "13.10.0"
resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.10.0.tgz#0914dea4fced163a9642f09def35f242b9daf565"
integrity sha512-KKxpAb6oK250Lk7t2nGTrc+T3YD6R0Ba0c19wbuJNXDZToZ0FRooQc1vgcrjksMpHDebWBPYz0g9IIIbA+gQzA==
version "13.11.0"
resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.11.0.tgz#b6835d5eb053029cee4645fd8f4c1e0c84ad4590"
integrity sha512-cjkuW9exFUlBxPRhOR8SCweJmn1UP8HmgB98Fsph4vjKaYOuRZUivuPTFIrHGxpvLrOedMXhMh4Ft8SHAnA+aw==
dependencies:
"@naturalcycles/js-lib" "^14.0.0"
"@types/js-yaml" "^4.0.9"
Expand Down

0 comments on commit 939a185

Please sign in to comment.