Skip to content

Commit

Permalink
feat(migrations): improve migration system (#299)
Browse files Browse the repository at this point in the history
* client tokens

* fix

* fix security flaw

* fix tests

* use client tokens

* fixes

* fix mig test

* fix

* fix tests

* migration start

* down migrate

* remove useless sync stuff

* fix

* fix

* create tables in transaction

* fix status table

* init migration

* fix tests

* fix test

* remove registerTable tests

* fix mig test

* fix

* fix

* legacy token verification

* fix

* fix test

* pr comments

* fix
  • Loading branch information
samuelmasse committed Jan 13, 2022
1 parent 709f120 commit c8ae63e
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 141 deletions.
12 changes: 9 additions & 3 deletions packages/engine/src/database/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Logger } from '../logger/types'

export class DatabaseService extends Service {
public knex!: Knex
private tables: Table[] = []
private url!: string
private pool!: Knex.PoolConfig
private isLite!: boolean
Expand Down Expand Up @@ -83,10 +84,15 @@ export class DatabaseService extends Service {
}

async registerTable(table: Table) {
if (!(await this.knex.schema.hasTable(table.id))) {
this.logger.debug(`Created table '${table.id}'`)
this.tables.push(table)
}

await this.knex.schema.createTable(table.id, table.create)
async createTables(trx: Knex.Transaction) {
for (const table of this.tables) {
if (!(await trx.schema.hasTable(table.id))) {
await trx.schema.createTable(table.id, table.create)
this.logger.debug(`Created table '${table.id}'`)
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions packages/engine/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,11 @@ export class Engine {
await this.barriers.setup()
await this.kvs.setup()
}

async postSetup() {
const trx = await this.database.knex.transaction()
await this.database.createTables(trx)
await this.meta.update(this.meta.app(), trx)
await trx.commit()
}
}
45 changes: 29 additions & 16 deletions packages/engine/src/meta/service.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
import { Knex } from 'knex'
import semver from 'semver'
import { Service } from '../base/service'
import { DatabaseService } from '../database/service'
import { MetaTable } from './table'
import { ServerMetadata, ServerMetadataSchema, ServerMetaEntry } from './types'

export class MetaService extends Service {
private pkg: any
private pkg!: ServerMetadata
private table: MetaTable
private current!: ServerMetaEntry
private current?: ServerMetaEntry

constructor(private db: DatabaseService) {
super()
this.table = new MetaTable()
}

setupPkg(pkg: any) {
setPkg(pkg: ServerMetadata) {
this.pkg = pkg
}

async setup() {
await this.db.registerTable(this.table)

const stored = await this.fetch()
if (stored) {
this.current = stored
} else {
const meta: ServerMetadata = {
version: this.pkg.version
}
await this.update(meta)
}
await this.refresh()
}

app() {
return { version: this.pkg.version }
}

get() {
return this.current.data
return this.current?.data
}

async update(data: ServerMetadata) {
async refresh() {
this.current = await this.fetch()
}

async update(data: ServerMetadata, trx?: Knex.Transaction) {
if (!(await (trx || this.db.knex).schema.hasTable(this.table.id))) {
return
}

if (this.get() && semver.eq(this.get()!.version, data.version)) {
return
}

await ServerMetadataSchema.validateAsync(data)

const entry = {
Expand All @@ -48,10 +53,18 @@ export class MetaService extends Service {
}
this.current = entry

return this.query().insert(this.serialize(entry))
if (trx) {
await trx(this.table.id).insert(this.serialize(entry))
} else {
await this.query().insert(this.serialize(entry))
}
}

async fetch(): Promise<ServerMetaEntry | undefined> {
if (!(await this.db.knex.schema.hasTable(this.table.id))) {
return undefined
}

const rows = await this.query().orderBy('time', 'desc').limit(1)

if (rows?.length) {
Expand Down
25 changes: 17 additions & 8 deletions packages/engine/src/migration/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,19 @@ export class MigrationService extends Service {
super()
}

setupMigrations(migs: { new (): Migration }[]) {
setMigrations(migs: { new (): Migration }[]) {
this.migs = migs
}

async setup() {
this.srcVersion = process.env.TESTMIG_DB_VERSION || this.meta.get().version
this.srcVersion = process.env.TESTMIG_DB_VERSION || this.meta.get()?.version || '0.0.0'
this.dstVersion = process.env.MIGRATE_TARGET || this.meta.app().version
this.autoMigrate = !!yn(process.env.AUTO_MIGRATE) || !!process.env.MIGRATE_CMD?.length
this.isDown = process.env.MIGRATE_CMD === 'down'
this.isDry = !!yn(process.env.MIGRATE_DRYRUN)
this.loggerDry = this.logger.prefix(this.isDry ? '[DRY] ' : '')

await this.migrate()
await this.updateDbVersion()

if (process.env.MIGRATE_CMD) {
throw new ShutDownSignal()
Expand All @@ -47,9 +46,18 @@ export class MigrationService extends Service {
private async migrate() {
this.validateSrcAndDst()

if (!this.meta.get() && semver.eq(this.dstVersion, this.meta.app().version) && !process.env.MIGRATE_CMD?.length) {
// if there is no meta entry in the database, this means that the db hasn't been created yet
// in that case we don't run any migrations at all and just let the server create the db if
// the target version is the same version as the server and we aren't explicitly migrating
return
}

const migrations = this.listMigrationsToRun()
if (!migrations.length && !this.isDry && !process.env.MIGRATE_CMD?.length) {
return
// if there's no migration to run, and we didn't explicitly specify migration commands
// then there's no need to show all the fanfare of migration plans and we just update the db version silently
return this.updateDbVersion()
}

this.showMigrationsRequiredWindow(migrations.length)
Expand Down Expand Up @@ -108,6 +116,8 @@ export class MigrationService extends Service {
await this.runMigrationsForVersion(version, migrations, trx)
}

await this.updateDbVersion(trx)

if (this.isDry) {
await trx.rollback()
} else {
Expand All @@ -121,6 +131,7 @@ export class MigrationService extends Service {
throw new ShutDownSignal(1)
} finally {
await this.enableSqliteForeignKeys()
await this.meta.refresh()
}
}

Expand Down Expand Up @@ -168,10 +179,8 @@ export class MigrationService extends Service {
})
}

private async updateDbVersion() {
if (!semver.eq(this.meta.get().version, this.dstVersion)) {
await this.meta.update({ version: this.dstVersion })
}
private async updateDbVersion(trx?: Knex.Transaction) {
await this.meta.update({ version: this.dstVersion }, trx)
}

private showMigrationsRequiredWindow(migrationCount: number) {
Expand Down
64 changes: 0 additions & 64 deletions packages/engine/test/unit/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,38 +146,6 @@ describe('DatabaseService', () => {
})
})

describe('registerTable', () => {
test('Should create a new table if it does not exists', async () => {
const db = new DatabaseService()
await db.setup()

mocked(db.knex.schema.hasTable).mockReturnValueOnce(Promise.resolve(false))
mocked(db.knex.schema.createTable as any).mockReturnValueOnce(Promise.resolve({}))

await db.registerTable(table)

expect(db.knex.schema.hasTable).toHaveBeenCalledTimes(1)
expect(db.knex.schema.hasTable).toHaveBeenCalledWith(table.id)
expect(db['logger'].debug).toHaveBeenCalledTimes(1)
expect(db.knex.schema.createTable).toHaveBeenCalledTimes(1)
expect(db.knex.schema.createTable).toHaveBeenCalledWith(table.id, table.create)
})

test('Should not create a new table if it already exists', async () => {
const db = new DatabaseService()
await db.setup()

mocked(db.knex.schema.hasTable).mockReturnValueOnce(Promise.resolve(true))

await db.registerTable(table)

expect(db.knex.schema.hasTable).toHaveBeenCalledTimes(1)
expect(db.knex.schema.hasTable).toHaveBeenCalledWith(table.id)
expect(db['logger'].debug).not.toHaveBeenCalled()
expect(db.knex.schema.createTable).not.toHaveBeenCalled()
})
})

describe('getJson', () => {
test('Should return the value as is', async () => {
const db = new DatabaseService()
Expand Down Expand Up @@ -362,38 +330,6 @@ describe('DatabaseService', () => {
})
})

describe('registerTable', () => {
test('Should create a new table if it does not exists', async () => {
const db = new DatabaseService()
await db.setup()

mocked(db.knex.schema.hasTable).mockReturnValueOnce(Promise.resolve(false))
mocked(db.knex.schema.createTable as any).mockReturnValueOnce(Promise.resolve({}))

await db.registerTable(table)

expect(db.knex.schema.hasTable).toHaveBeenCalledTimes(1)
expect(db.knex.schema.hasTable).toHaveBeenCalledWith(table.id)
expect(db['logger'].debug).toHaveBeenCalledTimes(1)
expect(db.knex.schema.createTable).toHaveBeenCalledTimes(1)
expect(db.knex.schema.createTable).toHaveBeenCalledWith(table.id, table.create)
})

test('Should not create a new table if it already exists', async () => {
const db = new DatabaseService()
await db.setup()

mocked(db.knex.schema.hasTable).mockReturnValueOnce(Promise.resolve(true))

await db.registerTable(table)

expect(db.knex.schema.hasTable).toHaveBeenCalledTimes(1)
expect(db.knex.schema.hasTable).toHaveBeenCalledWith(table.id)
expect(db['logger'].debug).not.toHaveBeenCalled()
expect(db.knex.schema.createTable).not.toHaveBeenCalled()
})
})

describe('getJson', () => {
test('Should return the value parsed into an object', async () => {
const db = new DatabaseService()
Expand Down
11 changes: 8 additions & 3 deletions packages/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ export class App extends Engine {

constructor() {
super()
this.meta.setPkg(require('../package.json'))
this.migration.setMigrations(Migrations)

this.channels = new ChannelService(this.database)
this.providers = new ProviderService(this.database, this.caching)
this.clients = new ClientService(this.database, this.caching, this.providers)
Expand Down Expand Up @@ -96,10 +99,7 @@ export class App extends Engine {
}

async setup() {
this.meta.setupPkg(require('../package.json'))
this.migration.setupMigrations(Migrations)
await super.setup()

await this.channels.setup()
await this.providers.setup()
await this.clients.setup()
Expand All @@ -118,6 +118,11 @@ export class App extends Engine {
await this.sockets.setup()
}

async postSetup() {
await super.postSetup()
await this.channels.postSetup()
}

async monitor() {
await this.syncs.setup()
await this.instances.monitor()
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/channels/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ export class ChannelService extends Service {

async setup() {
await this.db.registerTable(this.table)
}

async postSetup() {
for (const channel of this.channels) {
if (!(await this.getInDb(channel.meta.name))) {
await this.createInDb(channel)
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/launcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export class Launcher {
try {
this.printLogo()
await this.app.setup()
await this.app.postSetup()
this.printChannels()

await this.api.setup()
Expand Down

0 comments on commit c8ae63e

Please sign in to comment.