Skip to content
This repository was archived by the owner on May 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
//
{
"extends": "@naturalcycles/dev-lib/scripts/tsconfig.json",
"exclude": ["**/__exclude"]
"exclude": ["**/__exclude"],
}
9 changes: 7 additions & 2 deletions src/adapter/cachedb/cache.db.model.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { CommonLogger, ObjectWithId } from '@naturalcycles/js-lib'
import { CommonDB } from '../../common.db'
import { CommonDBCreateOptions, CommonDBSaveOptions, CommonDBStreamOptions } from '../../db.model'
import {
CommonDBCreateOptions,
CommonDBOptions,
CommonDBSaveOptions,
CommonDBStreamOptions,
} from '../../db.model'

export interface CacheDBCfg {
name: string
Expand Down Expand Up @@ -45,7 +50,7 @@ export interface CacheDBCfg {
logger?: CommonLogger
}

export interface CacheDBOptions {
export interface CacheDBOptions extends CommonDBOptions {
/**
* @default false
*/
Expand Down
15 changes: 7 additions & 8 deletions src/adapter/cachedb/cache.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import {
StringMap,
} from '@naturalcycles/js-lib'
import { BaseCommonDB } from '../../base.common.db'
import { CommonDB } from '../../common.db'
import { CommonDBOptions, DBPatch, RunQueryResult } from '../../db.model'
import { CommonDB, commonDBFullSupport, CommonDBSupport } from '../../common.db'
import { DBPatch, RunQueryResult } from '../../db.model'
import { DBQuery } from '../../query/dbQuery'
import { DBTransaction } from '../../transaction/dbTransaction'
import {
CacheDBCfg,
CacheDBCreateOptions,
Expand All @@ -26,6 +25,11 @@ import {
* Queries always hit downstream (unless `onlyCache` is passed)
*/
export class CacheDB extends BaseCommonDB implements CommonDB {
override support: CommonDBSupport = {
...commonDBFullSupport,
transactions: false,
}

constructor(cfg: CacheDBCfg) {
super()
this.cfg = {
Expand Down Expand Up @@ -284,9 +288,4 @@ export class CacheDB extends BaseCommonDB implements CommonDB {

return updated || 0
}

override async commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise<void> {
await this.cfg.downstreamDB.commitTransaction(tx, opt)
await this.cfg.cacheDB.commitTransaction(tx, opt)
}
}
14 changes: 2 additions & 12 deletions src/adapter/file/file.db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,6 @@ const db = new FileDB({
plugin: new InMemoryPersistencePlugin(),
})

describe('runCommonDBTest', () =>
runCommonDBTest(db, {
bufferSupport: false, // todo: implement
insert: false,
update: false,
updateByQuery: false,
createTable: false,
}))
describe('runCommonDBTest', () => runCommonDBTest(db))

describe('runCommonDaoTest', () =>
runCommonDaoTest(db, {
createTable: false,
}))
describe('runCommonDaoTest', () => runCommonDaoTest(db))
190 changes: 121 additions & 69 deletions src/adapter/file/file.db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ import {
Saved,
} from '@naturalcycles/js-lib'
import { readableCreate, ReadableTyped, dimGrey } from '@naturalcycles/nodejs-lib'
import { BaseCommonDB, DBSaveBatchOperation, queryInMemory } from '../..'
import {
BaseCommonDB,
commonDBFullSupport,
CommonDBSupport,
DBOperation,
DBSaveBatchOperation,
DBTransaction,
queryInMemory,
} from '../..'
import { CommonDB } from '../../common.db'
import {
CommonDBOptions,
Expand All @@ -27,7 +35,6 @@ import {
RunQueryResult,
} from '../../db.model'
import { DBQuery } from '../../query/dbQuery'
import { DBTransaction } from '../../transaction/dbTransaction'
import { FileDBCfg } from './file.db.model'

/**
Expand All @@ -41,6 +48,16 @@ import { FileDBCfg } from './file.db.model'
* Each save operation saves *whole* file to the persistence layer.
*/
export class FileDB extends BaseCommonDB implements CommonDB {
override support: CommonDBSupport = {
...commonDBFullSupport,
bufferValues: false, // todo: implement
insertSaveMethod: false,
updateSaveMethod: false,
updateByQuery: false,
createTable: false,
transactions: false,
}

constructor(cfg: FileDBCfg) {
super()
this.cfg = {
Expand Down Expand Up @@ -101,72 +118,6 @@ export class FileDB extends BaseCommonDB implements CommonDB {
}
}

/**
* Implementation is optimized for loading/saving _whole files_.
*/
override async commitTransaction(tx: DBTransaction, _opt?: CommonDBOptions): Promise<void> {
// data[table][id] => row
const data: StringMap<StringMap<ObjectWithId>> = {}

// 1. Load all tables data (concurrently)
const tables = _uniq(tx.ops.map(o => o.table))

await pMap(
tables,
async table => {
const rows = await this.loadFile(table)
data[table] = _by(rows, r => r.id)
},
{ concurrency: 16 },
)

const backup = _deepCopy(data)

// 2. Apply ops one by one (in order)
tx.ops.forEach(op => {
if (op.type === 'deleteByIds') {
op.ids.forEach(id => delete data[op.table]![id])
} else if (op.type === 'saveBatch') {
op.rows.forEach(r => {
if (!r.id) {
throw new Error('FileDB: row has an empty id')
}
data[op.table]![r.id] = r
})
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
}
})

// 3. Sort, turn it into ops
// Not filtering empty arrays, cause it's already filtered in this.saveFiles()
const ops: DBSaveBatchOperation[] = _stringMapEntries(data).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows(_stringMapValues(map)),
}
})

// 4. Save all files
try {
await this.saveFiles(ops)
} catch (err) {
const ops: DBSaveBatchOperation[] = _stringMapEntries(backup).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows(_stringMapValues(map)),
}
})

// Rollback, ignore rollback error (if any)
await this.saveFiles(ops).catch(_ => {})

throw err
}
}

override async runQuery<ROW extends ObjectWithId>(
q: DBQuery<ROW>,
_opt?: CommonDBOptions,
Expand Down Expand Up @@ -216,6 +167,27 @@ export class FileDB extends BaseCommonDB implements CommonDB {
return deleted
}

override async deleteByIds(
table: string,
ids: string[],
_opt?: CommonDBOptions,
): Promise<number> {
const byId = _by(await this.loadFile(table), r => r.id)

let deleted = 0
ids.forEach(id => {
if (!byId[id]) return
delete byId[id]
deleted++
})

if (deleted > 0) {
await this.saveFile(table, _stringMapValues(byId))
}

return deleted
}

override async getTableSchema<ROW extends ObjectWithId>(
table: string,
): Promise<JsonSchemaRootObject<ROW>> {
Expand Down Expand Up @@ -256,7 +228,11 @@ export class FileDB extends BaseCommonDB implements CommonDB {
this.logFinished(started, op)
}

private sortRows<ROW extends ObjectWithId>(rows: ROW[]): ROW[] {
override async createTransaction(): Promise<FileDBTransaction> {
return new FileDBTransaction(this)
}

sortRows<ROW extends ObjectWithId>(rows: ROW[]): ROW[] {
rows = rows.map(r => _filterUndefinedValues(r))

if (this.cfg.sortOnSave) {
Expand All @@ -283,3 +259,79 @@ export class FileDB extends BaseCommonDB implements CommonDB {
this.cfg.logger?.log(`<< ${op} ${dimGrey(`in ${_since(started)}`)}`)
}
}

export class FileDBTransaction implements DBTransaction {
constructor(private db: FileDB) {}

ops: DBOperation[] = []

/**
* Implementation is optimized for loading/saving _whole files_.
*/
async commit(): Promise<void> {
// data[table][id] => row
const data: StringMap<StringMap<ObjectWithId>> = {}

// 1. Load all tables data (concurrently)
const tables = _uniq(this.ops.map(o => o.table))

await pMap(
tables,
async table => {
const rows = await this.db.loadFile(table)
data[table] = _by(rows, r => r.id)
},
{ concurrency: 16 },
)

const backup = _deepCopy(data)

// 2. Apply ops one by one (in order)
this.ops.forEach(op => {
if (op.type === 'deleteByIds') {
op.ids.forEach(id => delete data[op.table]![id])
} else if (op.type === 'saveBatch') {
op.rows.forEach(r => {
if (!r.id) {
throw new Error('FileDB: row has an empty id')
}
data[op.table]![r.id] = r
})
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
}
})

// 3. Sort, turn it into ops
// Not filtering empty arrays, cause it's already filtered in this.saveFiles()
const ops: DBSaveBatchOperation[] = _stringMapEntries(data).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.db.sortRows(_stringMapValues(map)),
}
})

// 4. Save all files
try {
await this.db.saveFiles(ops)
} catch (err) {
const ops: DBSaveBatchOperation[] = _stringMapEntries(backup).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.db.sortRows(_stringMapValues(map)),
}
})

// Rollback, ignore rollback error (if any)
await this.db.saveFiles(ops).catch(_ => {})

throw err
}
}

async rollback(): Promise<void> {
this.ops = []
}
}
14 changes: 2 additions & 12 deletions src/adapter/file/localFile.persistence.plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ const db = new FileDB({
}),
})

describe('runCommonDBTest', () =>
runCommonDBTest(db, {
bufferSupport: false, // todo: use bufferReviver
insert: false,
update: false,
updateByQuery: false,
createTable: false,
}))
describe('runCommonDBTest', () => runCommonDBTest(db))

describe('runCommonDaoTest', () =>
runCommonDaoTest(db, {
createTable: false,
}))
describe('runCommonDaoTest', () => runCommonDaoTest(db))
Loading