Skip to content

Commit

Permalink
feat: concurrency safety (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
eseliger committed Jun 12, 2018
1 parent f9d042d commit a1f3f65
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 80 deletions.
52 changes: 51 additions & 1 deletion src/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import chalk from 'chalk'
import { parse } from 'url'
import { Logger } from '.'
import { Commit } from './git'
import { Migration, Task, TaskType } from './migration'

Expand All @@ -14,6 +16,16 @@ export class UnsupportedDialectError extends Error {
}
}

export class PendingMigrationTimedOutError extends Error {
/* istanbul ignore next */
public readonly name = 'PendingMigrationTimedOutError'
}

export class PendingMigrationFoundError extends Error {
/* istanbul ignore next */
public readonly name = 'PendingMigrationFoundError'
}

export interface TableRow {
id: number
name: string
Expand All @@ -26,9 +38,47 @@ export interface TableRow {
export abstract class DbAdapter {
public abstract init(): Promise<void>
public abstract getLastMigrationTask(): Promise<Task | null>
public abstract logMigrationTask(task: Task): Promise<void>
public abstract beginMigrationTask(task: Task): Promise<void>
public abstract finishMigrationTask(task: Task): Promise<void>
public abstract checkIfTaskCanExecute(task: Task): Promise<void>
public abstract close(): Promise<void>
protected abstract hasPendingMigration(): Promise<boolean>

public async waitForPending(logger: Logger): Promise<boolean> {
let wasPending = false
let shouldRetry = true
await Promise.race([
new Promise<never>((_, reject) =>
setTimeout(() => reject(new PendingMigrationTimedOutError()), 1000 * 60 * 10)
),
(async () => {
// fail after 10 min
let interval: NodeJS.Timer | undefined
while (shouldRetry) {
// if there are rows, a migration is already running
if (!(await this.hasPendingMigration())) {
if (wasPending) {
logger.log('\n\n')
}
break
}
if (!wasPending) {
logger.log(`${chalk.yellow('Waiting for pending migrations')} ...`)
// we had to wait for at least 1 pending migration
wasPending = true
interval = setInterval(() => logger.log('.'), 300)
}
// wait for 1000ms before retrying
await new Promise<void>(resolve => setTimeout(resolve, 1000))
}
if (interval) {
clearInterval(interval)
}
})(),
])
shouldRetry = false
return wasPending
}

protected rowToTask(row: TableRow): Task {
const task = new Task({
Expand Down
76 changes: 60 additions & 16 deletions src/adapters/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as pg from 'pg'
import { SQL } from 'sql-template-strings'
import { DbAdapter } from '../adapter'
import { DbAdapter, PendingMigrationFoundError } from '../adapter'
import { FirstDownMigrationError, MigrationRunTwiceError, Task } from '../migration'

export class PostgresAdapter extends DbAdapter {
Expand Down Expand Up @@ -37,13 +37,15 @@ export class PostgresAdapter extends DbAdapter {
"type" merkel_migration_type,
"commit" TEXT,
"head" TEXT NOT NULL,
"applied_at" TIMESTAMP WITH TIME ZONE NOT NULL
"applied_at" TIMESTAMP WITH TIME ZONE
);
`)
// migrate schema from merkel <= 0.19
await this.client.query(`ALTER TABLE "merkel_meta" ALTER COLUMN "applied_at" DROP NOT NULL`)
}

public close(): Promise<void> {
return new Promise<void>((resolve, reject) => {
return new Promise<void>(resolve => {
this.client.on('end', resolve)
// tslint:disable-next-line:no-floating-promises
this.client.end()
Expand All @@ -58,28 +60,59 @@ export class PostgresAdapter extends DbAdapter {
const { rows } = await this.client.query(`
SELECT "id", "name", "applied_at", "type", "commit", "head"
FROM "merkel_meta"
WHERE "applied_at" IS NOT NULL
ORDER BY "id" DESC
LIMIT 1
`)
return rows.length === 0 ? null : this.rowToTask(rows[0])
}

/**
* Logs an executed task to the database. Sets the task ID
* Logs a task to the database. Sets the task ID
*/
public async logMigrationTask(task: Task): Promise<void> {
const { rows } = await this.client.query(SQL`
INSERT INTO merkel_meta ("name", "type", "commit", "head", "applied_at")
VALUES (
${task.migration.name},
${task.type},
${task.commit ? task.commit.sha1 : null},
${task.head ? task.head.sha1 : null},
${task.appliedAt}
)
RETURNING id
public async beginMigrationTask(task: Task): Promise<void> {
/* istanbul ignore if */
if (!task.head) {
throw new Error('Task has no HEAD')
}
await this.client.query(`BEGIN TRANSACTION`)
try {
await this.client.query(`LOCK TABLE "merkel_meta"`)
if (await this.hasPendingMigration()) {
/* istanbul ignore next */
throw new PendingMigrationFoundError()
}
const { rows } = await this.client.query(SQL`
INSERT INTO merkel_meta ("name", "type", "commit", "head")
VALUES (
${task.migration.name},
${task.type},
${task.commit ? task.commit.sha1 : null},
${task.head.sha1}
)
RETURNING id
`)
await this.client.query(`COMMIT`)
task.id = rows[0].id
} finally {
await this.client.query(`ROLLBACK`)
}
}

/**
* Marks the task as finished
*/
public async finishMigrationTask(task: Task): Promise<void> {
const head = task.head ? task.head.sha1 : null
const commit = task.commit ? task.commit.sha1 : null
await this.client.query(SQL`
UPDATE merkel_meta
SET
"applied_at" = ${task.appliedAt},
"head" = ${head},
"commit" = ${commit}
WHERE "id" = ${task.id}
`)
task.id = rows[0].id
}

/**
Expand All @@ -91,6 +124,7 @@ export class PostgresAdapter extends DbAdapter {
SELECT "type"
FROM "merkel_meta"
WHERE "name" = ${task.migration.name}
AND "applied_at" IS NOT NULL
ORDER BY "id" DESC
LIMIT 1
`)
Expand All @@ -106,4 +140,14 @@ export class PostgresAdapter extends DbAdapter {
}
}
}

protected async hasPendingMigration(): Promise<boolean> {
const { rows } = await this.client.query(SQL`
SELECT "type"
FROM "merkel_meta"
WHERE "applied_at" IS NULL
LIMIT 1
`)
return rows.length !== 0
}
}
113 changes: 81 additions & 32 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import { createAdapterFromUrl } from './adapter'
import { getHead } from './git'
import { addGitHook, HookAlreadyFoundError } from './git'
import {
CLI_LOGGER,
createConfig,
createMigrationDir,
generate,
getConfigurationForCommit,
getStatus,
isMerkelRepository,
PendingMigrationFoundError,
prepareCommitMsg,
} from './index'
import { Migration, Task, TaskType } from './migration'
Expand Down Expand Up @@ -133,12 +135,12 @@ yargs.command(
when: () => !!argv.db,
},
])
if (await createMigrationDir(migrationDir as string)) {
process.stdout.write(`Created ${chalk.cyan(migrationDir as string)}\n`)
if (await createMigrationDir(migrationDir)) {
process.stdout.write(`Created ${chalk.cyan(migrationDir)}\n`)
}
await createConfig({
migrationDir: migrationDir as string,
migrationOutDir: (migrationOutDir as string) || './migrations',
migrationDir,
migrationOutDir: migrationOutDir || './migrations',
})
process.stdout.write(`Created ${chalk.cyan(path.join('.', '.merkelrc.json'))}\n`)
if (initMetaNow) {
Expand Down Expand Up @@ -249,6 +251,8 @@ yargs.command(
const adapter = createAdapterFromUrl(argv.db!)
await adapter.init()
const head = await getHead()
// wait for current migration to finish
await adapter.waitForPending(CLI_LOGGER)
const status = await getStatus(adapter, head)
process.stdout.write('\n' + status.toString())
if (status.newCommits.some(commit => commit.tasks.length > 0)) {
Expand Down Expand Up @@ -283,33 +287,62 @@ yargs.command(
try {
const adapter = createAdapterFromUrl(argv.db!)
await adapter.init()
const head = await getHead()
const status = await getStatus(adapter, head)
process.stdout.write(status.toString())
if (status.newCommits.some(commit => commit.tasks.length > 0)) {
if (argv.confirm) {
const answer = await inquirer.prompt<{ continue: boolean }>({
type: 'confirm',
name: 'continue',
message: 'Continue?',
})
if (!answer.continue) {
process.exit(0)
while (true) {
const head = await getHead()
const status = await getStatus(adapter, head)
process.stdout.write(status.toString())
const tasks = status.newCommits.reduce<Task[]>((prev, next) => prev.concat(next.tasks), [])
if (tasks.length > 0) {
if (argv.confirm) {
const answer = await inquirer.prompt<{ continue: boolean }>({
type: 'confirm',
name: 'continue',
message: 'Continue?',
})
if (!answer.continue) {
process.exit(0)
}
process.stdout.write('\n')
}
process.stdout.write('\n')
}
process.stdout.write('Starting migration\n\n')
for (const commit of status.newCommits) {
process.stdout.write(`${chalk.yellow(commit.shortSha1)} ${commit.subject}\n`)
for (const task of commit.tasks) {
process.stdout.write(task.toString() + ' ...')
const interval = setInterval(() => process.stdout.write('.'), 100)
await task.execute(argv.migrationOutDir!, adapter, head, commit)
clearInterval(interval)
process.stdout.write(' Success\n')

process.stdout.write('Starting migration\n\n')

const hasChanged = await adapter.waitForPending(CLI_LOGGER)

if (hasChanged) {
process.stdout.write('The migrations have changed, reloading..\n\n')
continue
}
// create pending tasks
for (const task of tasks) {
try {
task.head = head
await adapter.beginMigrationTask(task)
} catch (error) {
if (error instanceof PendingMigrationFoundError) {
continue
} else {
throw error
}
}
}

for (const commit of status.newCommits) {
process.stdout.write(`${chalk.yellow(commit.shortSha1)} ${commit.subject}\n`)
for (const task of commit.tasks) {
process.stdout.write(task.toString() + ' ...')
const interval = setInterval(() => process.stdout.write('.'), 100)
try {
await task.execute(argv.migrationOutDir!, adapter, head, commit)
} finally {
clearInterval(interval)
}
process.stdout.write(' Success\n')
}
}
process.stdout.write(chalk.green('\nAll migrations successful\n'))
}
process.stdout.write(chalk.green('\nAll migrations successful\n'))
break
}
process.exit(0)
} catch (err) {
Expand All @@ -328,13 +361,29 @@ const migrationCommand = (type: TaskType) => async (argv: MigrationCommandArgv)
try {
const adapter = createAdapterFromUrl(argv.db!)
await adapter.init()
const head = await getHead()
for (const name of argv.migrations!) {
const task = new Task({ type, migration: new Migration(name) })
const tasks = argv.migrations!.map(name => new Task({ type, migration: new Migration(name) }))
while (true) {
await adapter.waitForPending(CLI_LOGGER)
const head = await getHead()
for (const task of tasks) {
try {
task.head = head
await adapter.beginMigrationTask(task)
} catch (error) {
if (error instanceof PendingMigrationFoundError) {
continue
} else {
throw error
}
}
}
break
}
for (const task of tasks) {
process.stdout.write(`${task.toString()} ...`)
const interval = setInterval(() => process.stdout.write('.'), 100)
try {
await task.execute(argv.migrationOutDir!, adapter, head)
await task.execute(argv.migrationOutDir!, adapter)
} finally {
clearInterval(interval)
}
Expand Down
2 changes: 1 addition & 1 deletion src/git.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import chalk from 'chalk'
import { ChildProcess, execFile, spawn } from 'mz/child_process'
import { execFile, spawn } from 'mz/child_process'
import * as fs from 'mz/fs'
import * as path from 'path'
import { basename, resolve } from 'path'
Expand Down
Loading

0 comments on commit a1f3f65

Please sign in to comment.