Skip to content

Commit

Permalink
feat(minato): support multiple drivers, fix #82 (#84)
Browse files Browse the repository at this point in the history
Co-authored-by: Hieuzest <girkirin@hotmail.com>
  • Loading branch information
shigma and Hieuzest committed Apr 16, 2024
1 parent 9fe00a0 commit 2f1f990
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 162 deletions.
137 changes: 88 additions & 49 deletions packages/core/src/database.ts
Expand Up @@ -44,8 +44,6 @@ export namespace Join2 {
export type Predicate<S, U extends Input<S>> = (args: Parameters<S, U>) => Eval.Expr<boolean>
}

const kTransaction = Symbol('transaction')

export namespace Database {
export interface Tables {}

Expand All @@ -57,16 +55,17 @@ export class Database<S = {}, N = {}, C extends Context = Context> extends Servi
static [Service.immediate] = true
static readonly Tables = Symbol('minato.tables')
static readonly Types = Symbol('minato.types')
static readonly transact = Symbol('minato.transact')
static readonly migrate = Symbol('minato.migrate')

// { [K in Keys<S>]: Model<S[K]> }
public tables: any = Object.create(null)
public drivers: Record<keyof any, any> = Object.create(null)
public tables: Dict<Model> = Object.create(null)
public drivers: Driver<any, C>[] = []
public types: Dict<Field.Transform> = Object.create(null)
public migrating = false
private prepareTasks: Dict<Promise<void>> = Object.create(null)
private migrateTasks: Dict<Promise<void>> = Object.create(null)

private _driver: Driver<any, C> | undefined
private stashed = new Set<string>()
private prepareTasks: Dict<Promise<void>> = Object.create(null)
public migrateTasks: Dict<Promise<void>> = Object.create(null)

async connect<T = undefined>(driver: Driver.Constructor<T>, ...args: Spread<T>) {
this.ctx.plugin(driver, args[0] as any)
Expand All @@ -80,18 +79,15 @@ export class Database<S = {}, N = {}, C extends Context = Context> extends Servi
}

async prepared() {
if (this[Database.migrate]) return
await Promise.all(Object.values(this.prepareTasks))
if (!this.migrating) {
await Promise.all(Object.values(this.migrateTasks))
}
}

private getDriver(table: any): Driver<any, C> {
// const model: Model = this.tables[name]
// if (model.driver) return this.drivers[model.driver]
const driver = Object.values(this.drivers)[0]
if (driver) driver.database = this
return driver
private getDriver(table: string | Selection): Driver<any, C> {
if (table instanceof Selection) return table.driver as any
const model: Model = this.tables[table]
if (!model) throw new Error(`cannot resolve table "${table}"`)
return model.ctx?.get('database')?._driver as any
}

private async prepare(name: string) {
Expand All @@ -113,14 +109,16 @@ export class Database<S = {}, N = {}, C extends Context = Context> extends Servi
let model = this.tables[name]
if (!model) {
model = this.tables[name] = new Model(name)
// model.driver = config.driver
}
Object.entries(fields).forEach(([key, field]: [string, any]) => {
const transformer = []
this.parseField(field, transformer, undefined, value => field = fields[key] = value)
if (typeof field === 'object') field.transformers = transformer
})
model.extend(fields, config)
if (makeArray(model.primary).every(key => key in fields)) {
model.ctx = this[Context.origin]
}
this.prepareTasks[name] = this.prepare(name)
;(this.ctx as Context).emit('model', name)
}
Expand Down Expand Up @@ -249,24 +247,26 @@ export class Database<S = {}, N = {}, C extends Context = Context> extends Servi
optional?: Dict<boolean, Keys<X>>,
): Selection<Join2.Output<S, X>>

join(tables: any, query?: any, optional?: any) {
if (Array.isArray(tables)) {
const sel = new Selection(this.getDriver(tables[0]), Object.fromEntries(tables.map((name) => [name, this.select(name)])))
if (typeof query === 'function') {
sel.args[0].having = Eval.and(query(...tables.map(name => sel.row[name])))
}
sel.args[0].optional = Object.fromEntries(tables.map((name, index) => [name, optional?.[index]]))
return this.select(sel)
join(tables: any, query = (...args: any[]) => Eval.and(), optional?: any) {
const oldTables = tables
if (Array.isArray(oldTables)) {
tables = Object.fromEntries(oldTables.map((name) => [name, this.select(name)]))
}
const sels = valueMap(tables, (t: TableLike<S>) => {
return typeof t === 'string' ? this.select(t) : t
})
if (Object.keys(sels).length === 0) throw new Error('no tables to join')
const drivers = new Set(Object.values(sels).map(sel => sel.driver))
if (drivers.size !== 1) throw new Error('cannot join tables from different drivers')
const sel = new Selection([...drivers][0], sels)
if (Array.isArray(oldTables)) {
sel.args[0].having = Eval.and(query(...oldTables.map(name => sel.row[name])))
sel.args[0].optional = Object.fromEntries(oldTables.map((name, index) => [name, optional?.[index]]))
} else {
const sel = new Selection(this.getDriver(Object.values(tables)[0]), valueMap(tables, (t: TableLike<S>) => {
return typeof t === 'string' ? this.select(t) : t
}))
if (typeof query === 'function') {
sel.args[0].having = Eval.and(query(sel.row))
}
sel.args[0].having = Eval.and(query(sel.row))
sel.args[0].optional = optional
return this.select(sel)
}
return this.select(sel)
}

async get<K extends Keys<S>, P extends FlatKeys<S[K]> = any>(
Expand Down Expand Up @@ -326,35 +326,74 @@ export class Database<S = {}, N = {}, C extends Context = Context> extends Servi
return await sel._action('upsert', upsert, keys).execute()
}

async withTransaction(callback: (database: this) => Promise<void>): Promise<void>
async withTransaction<K extends Keys<S>>(table: K, callback: (database: this) => Promise<void>): Promise<void>
async withTransaction(arg: any, ...args: any[]) {
if (this[kTransaction]) throw new Error('nested transactions are not supported')
const [table, callback] = typeof arg === 'string' ? [arg, ...args] : [null, arg, ...args]
const driver = this.getDriver(table)
return await driver.withTransaction(async (session) => {
const database = new Proxy(this, {
get(target, p, receiver) {
if (p === kTransaction) return true
else if (p === 'getDriver') return () => session
else return Reflect.get(target, p, receiver)
makeProxy(marker: any, getDriver?: (driver: Driver<any, C>, database: this) => Driver<any, C>) {
const drivers = new Map<Driver<any, C>, Driver<any, C>>()
const database = new Proxy(this, {
get: (target, p, receiver) => {
if (p === marker) return true
if (p !== 'getDriver') return Reflect.get(target, p, receiver)
return (name: any) => {
const original = this.getDriver(name)
let driver = drivers.get(original)
if (!driver) {
driver = getDriver?.(original, database) ?? new Proxy(original, {
get: (target, p, receiver) => {
if (p === 'database') return database
return Reflect.get(target, p, receiver)
},
})
drivers.set(original, driver)
}
return driver
}
},
})
return database
}

withTransaction(callback: (database: this) => Promise<void>) {
return this.transact(callback)
}

async transact(callback: (database: this) => Promise<void>) {
if (this[Database.transact]) throw new Error('nested transactions are not supported')
const finalTasks: Promise<void>[] = []
const database = this.makeProxy(Database.transact, (driver) => {
let session: any
let _resolve: (value: any) => void
const sessionTask = new Promise((resolve) => _resolve = resolve)
driver = new Proxy(driver, {
get: (target, p, receiver) => {
if (p === 'database') return database
if (p === 'session') return session
if (p === '_ensureSession') return () => sessionTask
return Reflect.get(target, p, receiver)
},
})
await callback(database)
finalTasks.push(driver.withTransaction((_session) => {
_resolve(session = _session)
return initialTask
}))
return driver
})
const initialTask = (async () => {
await Promise.resolve()
await callback(database)
})()
await initialTask.finally(() => Promise.all(finalTasks))
}

async stopAll() {
const drivers = Object.values(this.drivers)
this.drivers = Object.create(null)
await Promise.all(drivers.map(driver => driver.stop()))
await Promise.all(this.drivers.splice(0, Infinity).map(driver => driver.stop()))
}

async drop<K extends Keys<S>>(table: K) {
if (this[Database.transact]) throw new Error('cannot drop table in transaction')
await this.getDriver(table).drop(table)
}

async dropAll() {
if (this[Database.transact]) throw new Error('cannot drop table in transaction')
await Promise.all(Object.values(this.drivers).map(driver => driver.dropAll()))
}

Expand Down
24 changes: 12 additions & 12 deletions packages/core/src/driver.ts
@@ -1,4 +1,4 @@
import { Awaitable, Dict, valueMap } from 'cosmokit'
import { Awaitable, Dict, remove, valueMap } from 'cosmokit'
import { Context, Logger } from 'cordis'
import { Eval, Update } from './eval.ts'
import { Direction, Modifier, Selection } from './selection.ts'
Expand Down Expand Up @@ -59,7 +59,7 @@ export abstract class Driver<T = any, C extends Context = Context> {
abstract remove(sel: Selection.Mutable): Promise<Driver.WriteResult>
abstract create(sel: Selection.Mutable, data: any): Promise<any>
abstract upsert(sel: Selection.Mutable, data: any[], keys: string[]): Promise<Driver.WriteResult>
abstract withTransaction(callback: (driver: this) => Promise<void>): Promise<void>
abstract withTransaction(callback: (session?: any) => Promise<void>): Promise<void>

public database: Database<any, any, C>
public logger: Logger
Expand All @@ -72,15 +72,15 @@ export abstract class Driver<T = any, C extends Context = Context> {
ctx.on('ready', async () => {
await Promise.resolve()
await this.start()
ctx.model.drivers.default = this
ctx.model.drivers.push(this)
ctx.model.refresh()
const database = Object.create(ctx.model)
ctx.database = database
database._driver = this
ctx.set('database', database)
})

ctx.on('dispose', async () => {
ctx.database = null as never
delete ctx.model.drivers.default
remove(ctx.model.drivers, this)
await this.stop()
})
}
Expand Down Expand Up @@ -115,12 +115,10 @@ export abstract class Driver<T = any, C extends Context = Context> {
return model
}

async migrate(name: string, hooks: MigrationHooks) {
const database = Object.create(this.database)
protected async migrate(name: string, hooks: MigrationHooks) {
const database = this.database.makeProxy(Database.migrate)
const model = this.model(name)
database.migrating = true
if (this.database.migrating) await database.migrateTasks[name]
database.migrateTasks[name] = Promise.resolve(database.migrateTasks[name]).then(() => {
await (database.migrateTasks[name] = Promise.resolve(database.migrateTasks[name]).then(() => {
return Promise.all([...model.migrations].map(async ([migrate, keys]) => {
try {
if (!hooks.before(keys)) return
Expand All @@ -130,12 +128,14 @@ export abstract class Driver<T = any, C extends Context = Context> {
hooks.error(reason)
}
}))
}).then(hooks.finalize).catch(hooks.error)
}).then(hooks.finalize).catch(hooks.error))
}

define<S, T>(converter: Driver.Transformer<S, T>) {
converter.types.forEach(type => this.types[type] = converter)
}

async _ensureSession() {}
}

export interface MigrationHooks {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/model.ts
@@ -1,4 +1,5 @@
import { Binary, clone, isNullable, makeArray, MaybeArray, valueMap } from 'cosmokit'
import { Context } from 'cordis'
import { Eval, isEvalExpr } from './eval.ts'
import { Flatten, Keys, unravel } from './utils.ts'
import { Type } from './type.ts'
Expand Down Expand Up @@ -150,6 +151,7 @@ export namespace Model {
export interface Model extends Model.Config {}

export class Model<S = any> {
ctx?: Context
fields: Field.Config<S> = {}
migrations = new Map<Model.Migration, string[]>()

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/selection.ts
Expand Up @@ -125,6 +125,7 @@ class Executable<S = any, T = any> {

async execute(): Promise<T> {
await this.driver.database.prepared()
await this.driver._ensureSession()
return this.driver[this.type as any](this, ...this.args)
}
}
Expand Down
24 changes: 12 additions & 12 deletions packages/memory/src/index.ts
Expand Up @@ -4,7 +4,7 @@ import { Driver, Eval, executeEval, executeQuery, executeSort, executeUpdate, Ru
export class MemoryDriver extends Driver<MemoryDriver.Config> {
static name = 'memory'

#store: Dict<any[]> = {
_store: Dict<any[]> = {
_fields: [],
}

Expand All @@ -24,7 +24,7 @@ export class MemoryDriver extends Driver<MemoryDriver.Config> {

table(sel: string | Selection.Immutable | Dict<string | Selection.Immutable>, env: any = {}): any[] {
if (typeof sel === 'string') {
return this.#store[sel] ||= []
return this._store[sel] ||= []
}

if (!(sel instanceof Selection)) {
Expand Down Expand Up @@ -91,11 +91,11 @@ export class MemoryDriver extends Driver<MemoryDriver.Config> {
}

async drop(table: string) {
delete this.#store[table]
delete this._store[table]
}

async dropAll() {
this.#store = { _fields: [] }
this._store = { _fields: [] }
}

async stats() {
Expand Down Expand Up @@ -126,9 +126,9 @@ export class MemoryDriver extends Driver<MemoryDriver.Config> {
async remove(sel: Selection.Mutable) {
const { ref, query, table } = sel
const data = this.table(table)
this.#store[table] = data.filter(row => !executeQuery(row, query, ref))
this._store[table] = data.filter(row => !executeQuery(row, query, ref))
this.$save(table)
const count = data.length - this.#store[table].length
const count = data.length - this._store[table].length
return { removed: count, matched: count }
}

Expand All @@ -137,10 +137,10 @@ export class MemoryDriver extends Driver<MemoryDriver.Config> {
const { primary, autoInc } = model
const store = this.table(table)
if (!Array.isArray(primary) && autoInc && !(primary in data)) {
let meta = this.#store._fields.find(row => row.table === table && row.field === primary)
let meta = this._store._fields.find(row => row.table === table && row.field === primary)
if (!meta) {
meta = { table, field: primary, autoInc: 0 }
this.#store._fields.push(meta)
this._store._fields.push(meta)
}
meta.autoInc += 1
data[primary] = meta.autoInc
Expand Down Expand Up @@ -185,10 +185,10 @@ export class MemoryDriver extends Driver<MemoryDriver.Config> {
return res
}

async withTransaction(callback: (session: this) => Promise<void>) {
const data = clone(this.#store)
await callback(this).then(undefined, (e) => {
this.#store = data
async withTransaction(callback: () => Promise<void>) {
const data = clone(this._store)
await callback().catch((e) => {
this._store = data
throw e
})
}
Expand Down
6 changes: 3 additions & 3 deletions packages/memory/tests/index.spec.ts
Expand Up @@ -23,12 +23,12 @@ describe('@minatojs/driver-memory', () => {
},
object: {
typeModel: false,
}
},
},
query: {
comparison: {
nullableComparator: false,
}
}
},
},
})
})

0 comments on commit 2f1f990

Please sign in to comment.