Skip to content

Commit

Permalink
feat: CommonDao.streamSaveTransform
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Aug 24, 2023
1 parent 2d8340f commit d26d42d
Show file tree
Hide file tree
Showing 12 changed files with 646 additions and 719 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
},
"dependencies": {
"@naturalcycles/js-lib": "^14.116.0",
"@naturalcycles/nodejs-lib": "^12.0.0"
"@naturalcycles/nodejs-lib": "^13.1.1"
},
"devDependencies": {
"@naturalcycles/bench-lib": "^1.0.0",
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/file/localFile.persistence.plugin.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as fs from 'node:fs'
import fs from 'node:fs'
import { Readable } from 'node:stream'
import * as fsp from 'node:fs/promises'
import fsp from 'node:fs/promises'
import { createGzip, createUnzip } from 'node:zlib'
import { pMap, ObjectWithId } from '@naturalcycles/js-lib'
import {
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/inmemory/inMemory.db.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as fs from 'node:fs'
import * as fsp from 'node:fs/promises'
import fs from 'node:fs'
import fsp from 'node:fs/promises'
import { Readable } from 'node:stream'
import { createGzip, createUnzip } from 'node:zlib'
import {
Expand Down
30 changes: 26 additions & 4 deletions src/commondao/common.dao.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,20 @@ export interface CommonDaoSaveOptions<DBM extends ObjectWithId>
ensureUniqueId?: boolean
}

export interface CommonDaoStreamDeleteOptions<DBM extends ObjectWithId>
extends CommonDaoStreamOptions<DBM> {}

export interface CommonDaoStreamSaveOptions<DBM extends ObjectWithId>
extends CommonDaoSaveOptions<DBM>,
CommonDaoStreamOptions<DBM> {}

export interface CommonDaoStreamForEachOptions<IN>
extends CommonDaoStreamOptions,
TransformMapOptions<IN, any>,
TransformLogProgressOptions<IN> {}
extends CommonDaoStreamOptions<IN>,
TransformMapOptions<IN, any> {}

export interface CommonDaoStreamOptions extends CommonDaoOptions {
export interface CommonDaoStreamOptions<IN>
extends CommonDaoOptions,
TransformLogProgressOptions<IN> {
/**
* @default true (for streams)
*/
Expand All @@ -337,6 +345,20 @@ export interface CommonDaoStreamOptions extends CommonDaoOptions {
* @default ErrorMode.SUPPRESS for .forEach() streams as well, but overridable
*/
errorMode?: ErrorMode

/**
* Applicable to some of stream operations, e.g deleteByQuery.
* If set - `deleteByQuery` won't execute it "all at once", but in batches.
*
* Defaults to undefined, so the operation is executed "all at once".
*/
batchSize?: number

/**
* When batchSize is set - this option controls how many batches to run concurrently.
* Defaults to 16, "the magic number of JavaScript concurrency".
*/
batchConcurrency?: number
}

export type CommonDaoCreateOptions = CommonDBCreateOptions
2 changes: 1 addition & 1 deletion src/commondao/common.dao.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ test('common', async () => {
expect(await dao.deleteById(undefined)).toBe(0)
expect(await dao.deleteById('123')).toBe(0)
expect(await dao.deleteByQuery(dao.query())).toBe(0)
expect(await dao.deleteByQuery(dao.query(), { stream: true })).toBe(0)
expect(await dao.deleteByQuery(dao.query(), { batchSize: 500 })).toBe(0)

expect(dao.anyToDBM(undefined)).toBeUndefined()
expect(dao.anyToDBM({}, { skipValidation: true })).toMatchObject({})
Expand Down
85 changes: 79 additions & 6 deletions src/commondao/common.dao.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Transform } from 'node:stream'
import {
_assert,
_filterNullishValues,
Expand Down Expand Up @@ -57,8 +58,10 @@ import {
CommonDaoLogLevel,
CommonDaoOptions,
CommonDaoSaveOptions,
CommonDaoStreamDeleteOptions,
CommonDaoStreamForEachOptions,
CommonDaoStreamOptions,
CommonDaoStreamSaveOptions,
} from './common.dao.model'

const isGAE = !!process.env['GAE_INSTANCE']
Expand Down Expand Up @@ -529,7 +532,7 @@ export class CommonDao<
/**
* Stream as Readable, to be able to .pipe() it further with support of backpressure.
*/
streamQueryAsDBM(q: DBQuery<DBM>, opt: CommonDaoStreamOptions = {}): ReadableTyped<DBM> {
streamQueryAsDBM(q: DBQuery<DBM>, opt: CommonDaoStreamOptions<DBM> = {}): ReadableTyped<DBM> {
q.table = opt.table || q.table
opt.skipValidation = opt.skipValidation !== false // default true
opt.skipConversion = opt.skipConversion !== false // default true
Expand Down Expand Up @@ -568,7 +571,10 @@ export class CommonDao<
*
* You can do `.pipe(transformNoOp)` to make it "valid again".
*/
streamQuery(q: DBQuery<DBM>, opt: CommonDaoStreamOptions = {}): ReadableTyped<Saved<BM>> {
streamQuery(
q: DBQuery<DBM>,
opt: CommonDaoStreamOptions<Saved<BM>> = {},
): ReadableTyped<Saved<BM>> {
q.table = opt.table || q.table
opt.skipValidation = opt.skipValidation !== false // default true
opt.skipConversion = opt.skipConversion !== false // default true
Expand Down Expand Up @@ -611,7 +617,7 @@ export class CommonDao<
return rows.map(r => r.id)
}

streamQueryIds(q: DBQuery<DBM>, opt: CommonDaoStreamOptions = {}): ReadableTyped<ID> {
streamQueryIds(q: DBQuery<DBM>, opt: CommonDaoStreamOptions<ID> = {}): ReadableTyped<ID> {
q.table = opt.table || q.table
opt.errorMode ||= ErrorMode.SUPPRESS

Expand Down Expand Up @@ -958,6 +964,72 @@ export class CommonDao<
return rows
}

/**
* "Streaming" is implemented by buffering incoming rows into **batches**
* (of size opt.batchSize, which defaults to 500),
* and then executing db.saveBatch(batch) with the concurrency
* of opt.batchConcurrency (which defaults to 16).
*/
streamSaveTransform(opt: CommonDaoStreamSaveOptions<DBM> = {}): Transform[] {
this.requireWriteAccess()

const table = opt.table || this.cfg.table
opt.skipValidation ??= true
opt.skipConversion ??= true
opt.errorMode ||= ErrorMode.SUPPRESS

if (this.cfg.immutable && !opt.allowMutability && !opt.saveMethod) {
opt = { ...opt, saveMethod: 'insert' }
}

const excludeFromIndexes = opt.excludeFromIndexes || this.cfg.excludeFromIndexes
const { beforeSave } = this.cfg.hooks!

const { batchSize = 500, batchConcurrency = 16, errorMode } = opt

return [
transformMap<BM, DBM>(
async bm => {
this.assignIdCreatedUpdated(bm, opt) // mutates

let dbm = await this.bmToDBM(bm, opt)

if (beforeSave) {
dbm = (await beforeSave(dbm))!
if (dbm === null && !opt.tx) return SKIP
}

return dbm
},
{
errorMode,
},
),
transformBuffer<DBM>({ batchSize }),
transformMap<DBM[], DBM[]>(
async batch => {
await this.cfg.db.saveBatch(table, batch, {
...opt,
excludeFromIndexes,
})
return batch
},
{
concurrency: batchConcurrency,
errorMode,
flattenArrayOutput: true,
},
),
transformLogProgress({
metric: 'saved',
...opt,
}),
// just to satisfy and simplify typings
// It's easier to return Transform[], rather than (Transform | Writable)[]
writableVoid() as Transform,
]
}

// DELETE
/**
* @returns number of deleted items
Expand Down Expand Up @@ -995,7 +1067,7 @@ export class CommonDao<
*/
async deleteByQuery(
q: DBQuery<DBM>,
opt: CommonDaoStreamForEachOptions<DBM> & { stream?: boolean } = {},
opt: CommonDaoStreamDeleteOptions<DBM> = {},
): Promise<number> {
this.requireWriteAccess()
this.requireObjectMutability(opt)
Expand All @@ -1004,8 +1076,8 @@ export class CommonDao<
const started = this.logStarted(op, q.table)
let deleted = 0

if (opt.stream) {
const batchSize = 500
if (opt.batchSize) {
const { batchSize, batchConcurrency = 16 } = opt

await _pipeline([
this.cfg.db.streamQuery<DBM>(q.select(['id']), opt),
Expand All @@ -1022,6 +1094,7 @@ export class CommonDao<
},
{
predicate: _passthroughPredicate,
concurrency: batchConcurrency,
},
),
// LogProgress should be AFTER the mapper, to be able to report correct stats
Expand Down
4 changes: 2 additions & 2 deletions src/pipeline/dbPipelineBackup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as fs from 'node:fs'
import * as fsp from 'node:fs/promises'
import fs from 'node:fs'
import fsp from 'node:fs/promises'
import { createGzip, ZlibOptions } from 'node:zlib'
import {
AppError,
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/dbPipelineRestore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as fs from 'node:fs'
import fs from 'node:fs'
import { createUnzip } from 'node:zlib'
import {
AsyncMapper,
Expand Down
11 changes: 5 additions & 6 deletions src/query/dbQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import {
CommonDaoOptions,
CommonDaoStreamDeleteOptions,
CommonDaoStreamForEachOptions,
CommonDaoStreamOptions,
DBPatch,
Expand Down Expand Up @@ -301,19 +302,19 @@ export class RunnableDBQuery<
await this.dao.streamQueryAsDBMForEach(this, mapper, opt)
}

streamQuery(opt?: CommonDaoStreamOptions): ReadableTyped<Saved<BM>> {
streamQuery(opt?: CommonDaoStreamOptions<Saved<BM>>): ReadableTyped<Saved<BM>> {
return this.dao.streamQuery(this, opt)
}

streamQueryAsDBM(opt?: CommonDaoStreamOptions): ReadableTyped<DBM> {
streamQueryAsDBM(opt?: CommonDaoStreamOptions<DBM>): ReadableTyped<DBM> {
return this.dao.streamQueryAsDBM(this, opt)
}

async queryIds(opt?: CommonDaoOptions): Promise<ID[]> {
return await this.dao.queryIds(this, opt)
}

streamQueryIds(opt?: CommonDaoStreamOptions): ReadableTyped<ID> {
streamQueryIds(opt?: CommonDaoStreamOptions<ID>): ReadableTyped<ID> {
return this.dao.streamQueryIds(this, opt)
}

Expand All @@ -324,9 +325,7 @@ export class RunnableDBQuery<
await this.dao.streamQueryIdsForEach(this, mapper, opt)
}

async deleteByQuery(
opt?: CommonDaoStreamForEachOptions<DBM> & { stream?: boolean },
): Promise<number> {
async deleteByQuery(opt?: CommonDaoStreamDeleteOptions<DBM>): Promise<number> {
return await this.dao.deleteByQuery(this, opt)
}
}
2 changes: 1 addition & 1 deletion src/test/paths.cnst.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as path from 'node:path'
import path from 'node:path'

export const projectDir = path.join(`${__dirname}/../..`)
export const tmpDir = `${projectDir}/tmp`
16 changes: 15 additions & 1 deletion src/testing/daoTest.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Readable } from 'node:stream'
import { pDelay, _deepCopy, _pick, _sortBy, _omit, localTime } from '@naturalcycles/js-lib'
import { readableToArray, transformNoOp } from '@naturalcycles/nodejs-lib'
import { _pipeline, readableToArray, transformNoOp } from '@naturalcycles/nodejs-lib'
import { CommonDaoLogLevel, DBQuery } from '..'
import { CommonDB } from '../common.db'
import { CommonDao } from '../commondao/common.dao'
Expand Down Expand Up @@ -252,6 +253,19 @@ export function runCommonDaoTest(
quirks,
)
})

test('streamSaveTransform', async () => {
const items2 = createTestItemsBM(2).map(i => ({ ...i, id: i.id + '_str' }))
const ids = items2.map(i => i.id)

await _pipeline([Readable.from(items2), ...dao.streamSaveTransform()])

const items2Loaded = await dao.getByIds(ids)
expectMatch(items2, items2Loaded, quirks)

// cleanup
await dao.query().filterIn('id', ids).deleteByQuery()
})
}

// DELETE BY
Expand Down
Loading

0 comments on commit d26d42d

Please sign in to comment.