Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/validate-migration-checksums.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@danceroutine/tango-migrations": patch
---

Validate stored migration checksums before treating applied migrations as unchanged.
92 changes: 70 additions & 22 deletions packages/migrations/src/runner/MigrationRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { isError } from '@danceroutine/tango-core';
import { readdir } from 'node:fs/promises';
import { resolve } from 'node:path';
import { loadDefaultExport } from '../runtime/loadModule';
import { webcrypto } from 'node:crypto';

const JOURNAL = '_tango_migrations';

Expand All @@ -21,6 +22,16 @@ interface DBClient {
close(): Promise<void>;
}

interface AppliedMigrationRecord {
id: string;
checksum: string;
}

interface MigrationChecksums {
current: string;
legacyOperations: string;
}

/**
* Manages the lifecycle of database migrations: applying, planning, and tracking status.
*
Expand Down Expand Up @@ -84,7 +95,9 @@ export class MigrationRunner {
if (toId && migration.id > toId) {
break;
}
if (applied.has(migration.id)) {
const storedChecksum = applied.get(migration.id);
if (storedChecksum !== undefined) {
await this.assertAppliedChecksum(migration, storedChecksum);
continue;
}

Expand Down Expand Up @@ -126,10 +139,21 @@ export class MigrationRunner {
const applied = await this.listApplied();
const migrations = await this.loadMigrations();

return migrations.map((m) => ({
id: m.id,
applied: applied.has(m.id),
}));
const statuses: { id: string; applied: boolean }[] = [];

for (const migration of migrations) {
const storedChecksum = applied.get(migration.id);
if (storedChecksum !== undefined) {
await this.assertAppliedChecksum(migration, storedChecksum);
}

statuses.push({
id: migration.id,
applied: storedChecksum !== undefined,
});
}

return statuses;
}

private async ensureJournal(): Promise<void> {
Expand All @@ -149,10 +173,10 @@ export class MigrationRunner {
await this.client.query(sql);
}

private async listApplied(): Promise<Set<string>> {
private async listApplied(): Promise<Map<string, string>> {
const table = this.dialect === InternalDialect.POSTGRES ? `"${JOURNAL}"` : JOURNAL;
const { rows } = await this.client.query<{ id: string }>(`SELECT id FROM ${table}`);
return new Set(rows.map((r) => r.id));
const { rows } = await this.client.query<AppliedMigrationRecord>(`SELECT id, checksum FROM ${table}`);
return new Map(rows.map((record) => [record.id, record.checksum]));
}

private async loadMigrations(): Promise<Migration[]> {
Expand Down Expand Up @@ -194,7 +218,7 @@ export class MigrationRunner {

const preparedOps = this.compilerStrategy.prepareOperations(this.dialect, builder.ops);
const sqls = preparedOps.flatMap((op) => this.compileOperation(op));
const checksum = String(this.hashJSON(builder.ops));
const checksum = await this.calculateBuilderChecksum(builder);

const isOnline = (migration.mode ?? builder.getMode()) === 'online';

Expand Down Expand Up @@ -229,23 +253,47 @@ export class MigrationRunner {
}
}

private async assertAppliedChecksum(migration: Migration, storedChecksum: string): Promise<void> {
const checksums = await this.calculateChecksums(migration);

// Older Tango versions stored operation-only checksums. Keep those
// journals readable while writing data-aware checksums for new rows.
if (checksums.current !== storedChecksum && checksums.legacyOperations !== storedChecksum) {
throw new Error(
`Applied migration '${migration.id}' checksum mismatch. The migration file may have been modified after it was applied.`
);
}
}

private async calculateChecksums(migration: Migration): Promise<MigrationChecksums> {
const builder = new CollectingBuilder();
await migration.up(builder);

return {
current: await this.calculateBuilderChecksum(builder),
legacyOperations: await this.calculateLegacyOperationsChecksum(builder),
};
}

private async calculateBuilderChecksum(builder: CollectingBuilder): Promise<string> {
return this.hashPayload({
operations: builder.ops,
dataFns: builder.dataFns.map((fn) => fn.toString()),
});
}

private async calculateLegacyOperationsChecksum(builder: CollectingBuilder): Promise<string> {
return this.hashPayload(builder.ops);
}

/**
* Compute a simple hash of the migration's operation list.
* Compute a SHA-256 digest of a migration checksum payload.
* Stored alongside each applied migration in the journal table to detect
* if a migration file has been modified after it was already applied.
* Uses a djb2-like hash over the JSON-serialized operations.
*/
private hashJSON(x: unknown): number {
const s = JSON.stringify(x);
let h = 0;
for (let i = 0; i < s.length; i++) {
// oxlint-disable-next-line prefer-code-point
h = Math.imul(31, h) + s.charCodeAt(i);
// oxlint-disable-next-line prefer-math-trunc
h = h | 0;
}
// oxlint-disable-next-line unicorn/prefer-math-trunc
return h >>> 0;
private async hashPayload(payload: unknown): Promise<string> {
const digest = await webcrypto.subtle.digest('SHA-256', Buffer.from(JSON.stringify(payload), 'utf8'));
return Buffer.from(digest).toString('hex');
}

private compileOperation(op: MigrationOperation): SQL[] {
Expand Down
108 changes: 98 additions & 10 deletions packages/migrations/src/runner/tests/MigrationRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { aDBClient } from '@danceroutine/tango-testing';
import { MigrationRunner } from '../MigrationRunner';
import { InternalDialect } from '../../domain/internal/InternalDialect';
import type { CompilerStrategy } from '../../strategies/CompilerStrategy';
import { webcrypto } from 'node:crypto';

const DOMAIN_IMPORT = '../src/domain/index.ts';

Expand All @@ -20,7 +21,7 @@ function makeClient(queryImpl?: (sql: string, params?: readonly unknown[]) => Pr
if (queryImpl) {
return queryImpl(sql, params);
}
if (sql.includes('SELECT id FROM')) {
if (sql.includes('SELECT id, checksum FROM')) {
return { rows: [] };
}
return { rows: [] };
Expand All @@ -36,6 +37,22 @@ function strategyReturning(sql: string): CompilerStrategy {
} as unknown as CompilerStrategy;
}

async function checksumForPayload(payload: unknown): Promise<string> {
const digest = await webcrypto.subtle.digest('SHA-256', Buffer.from(JSON.stringify(payload), 'utf8'));
return Buffer.from(digest).toString('hex');
}

function tableDropChecksum(table: string): Promise<string> {
return checksumForPayload([{ kind: 'table.drop', table }]);
}

function tableDropMigrationChecksum(table: string, dataFns: string[] = []): Promise<string> {
return checksumForPayload({
operations: [{ kind: 'table.drop', table }],
dataFns,
});
}

describe(MigrationRunner, () => {
const tempDirs: string[] = [];

Expand All @@ -52,7 +69,7 @@ describe(MigrationRunner, () => {
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [] };
if (sql.includes('SELECT id, checksum FROM')) return { rows: [] };
return { rows: [] };
});
const runner = new MigrationRunner(
Expand Down Expand Up @@ -83,7 +100,7 @@ describe(MigrationRunner, () => {
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [] };
if (sql.includes('SELECT id, checksum FROM')) return { rows: [] };
if (sql === 'SELECT FAIL') throw new Error('boom');
return { rows: [] };
});
Expand All @@ -101,7 +118,7 @@ describe(MigrationRunner, () => {
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [] };
if (sql.includes('SELECT id, checksum FROM')) return { rows: [] };
if (sql === 'SELECT FAIL SQLITE') throw new Error('sqlite-fail');
return { rows: [] };
});
Expand All @@ -125,7 +142,9 @@ describe(MigrationRunner, () => {
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [{ id: '001_one' }] };
if (sql.includes('SELECT id, checksum FROM')) {
return { rows: [{ id: '001_one', checksum: await tableDropChecksum('a') }] };
}
return { rows: [] };
});
const runner = new MigrationRunner(client, InternalDialect.SQLITE, dir, strategyReturning('SELECT 1'));
Expand All @@ -136,13 +155,62 @@ describe(MigrationRunner, () => {
expect(inserts).toHaveLength(0);
});

it('detects a changed applied migration before skipping it', async () => {
const dir = await createTempMigrations({
'001_changed.ts': `import { Migration } from '${DOMAIN_IMPORT}';\nexport default class MChanged extends Migration { id='001_changed'; up(m){ m.run({ kind: 'table.drop', table: 'accounts' }); } down(){} }`,
});
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id, checksum FROM')) {
return { rows: [{ id: '001_changed', checksum: await tableDropChecksum('users') }] };
}
return { rows: [] };
});
const runner = new MigrationRunner(client, InternalDialect.SQLITE, dir, strategyReturning('SELECT 1'));

await expect(runner.apply()).rejects.toThrow("Applied migration '001_changed' checksum mismatch");
const sqlCalls = vi.mocked(client.query).mock.calls as Array<[string, ...unknown[]]>;
const inserts = sqlCalls.filter((call) => String(call[0]).includes('INSERT INTO _tango_migrations'));
expect(inserts).toHaveLength(0);
});

it('detects a changed applied migration data step before skipping it', async () => {
const dir = await createTempMigrations({
'001_changed_data.ts': `import { Migration } from '${DOMAIN_IMPORT}';\nexport default class MChangedData extends Migration { id='001_changed_data'; up(m){ m.run({ kind: 'table.drop', table: 'users' }); m.data(async (ctx)=>{ await ctx.query('SELECT changed'); }); } down(){} }`,
});
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id, checksum FROM')) {
return {
rows: [
{
id: '001_changed_data',
checksum: await tableDropMigrationChecksum('users', [
"async (ctx) => { await ctx.query('SELECT original'); }",
]),
},
],
};
}
return { rows: [] };
});
const runner = new MigrationRunner(client, InternalDialect.SQLITE, dir, strategyReturning('SELECT 1'));

await expect(runner.apply()).rejects.toThrow("Applied migration '001_changed_data' checksum mismatch");
});

it('generates plans, statuses, and validates invalid migration modules', async () => {
const validDir = await createTempMigrations({
'001_plan.ts': `import { Migration } from '${DOMAIN_IMPORT}';\nexport default class MPlan extends Migration { id='001_plan'; up(m){ m.run({ kind: 'table.drop', table: 'users' }); m.data(async()=>{}); } down(){} }`,
'002_pending.ts': `import { Migration } from '${DOMAIN_IMPORT}';\nexport default class MPending extends Migration { id='002_pending'; up(m){ m.run({ kind: 'table.drop', table: 'posts' }); } down(){} }`,
});
tempDirs.push(validDir);
const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [{ id: '001_plan' }] };
if (sql.includes('SELECT id, checksum FROM')) {
return { rows: [{ id: '001_plan', checksum: await tableDropChecksum('users') }] };
}
return { rows: [] };
});
const runner = new MigrationRunner(
Expand All @@ -152,7 +220,10 @@ describe(MigrationRunner, () => {
strategyReturning('DROP TABLE users')
);
await expect(runner.plan()).resolves.toContain('-- (data step present)');
await expect(runner.status()).resolves.toEqual([{ id: '001_plan', applied: true }]);
await expect(runner.status()).resolves.toEqual([
{ id: '001_plan', applied: true },
{ id: '002_pending', applied: false },
]);

const invalidDir = await createTempMigrations({ '001_invalid.js': 'export default { nope: true };' });
tempDirs.push(invalidDir);
Expand All @@ -165,14 +236,31 @@ describe(MigrationRunner, () => {
await expect(invalidRunner.plan()).rejects.toThrow('Invalid migration module');
});

it('detects a changed applied migration while reporting status', async () => {
const dir = await createTempMigrations({
'001_changed.ts': `import { Migration } from '${DOMAIN_IMPORT}';\nexport default class MChanged extends Migration { id='001_changed'; up(m){ m.run({ kind: 'table.drop', table: 'accounts' }); } down(){} }`,
});
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id, checksum FROM')) {
return { rows: [{ id: '001_changed', checksum: await tableDropChecksum('users') }] };
}
return { rows: [] };
});
const runner = new MigrationRunner(client, InternalDialect.SQLITE, dir, strategyReturning('SELECT 1'));

await expect(runner.status()).rejects.toThrow("Applied migration '001_changed' checksum mismatch");
});

it('loads typescript migration modules directly', async () => {
const dir = await createTempMigrations({
'001_ts.ts': `import { Migration } from '${DOMAIN_IMPORT}'; export default class MTs extends Migration { id='001_ts'; up(m){ m.run({ kind: 'table.drop', table: 'users' }); } down(){} }`,
});
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [] };
if (sql.includes('SELECT id, checksum FROM')) return { rows: [] };
return { rows: [] };
});
const runner = new MigrationRunner(client, InternalDialect.SQLITE, dir, strategyReturning('SELECT 1'));
Expand All @@ -187,7 +275,7 @@ describe(MigrationRunner, () => {
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [] };
if (sql.includes('SELECT id, checksum FROM')) return { rows: [] };
return { rows: [] };
});
const runner = new MigrationRunner(client, InternalDialect.SQLITE, dir, strategyReturning('SELECT 1'));
Expand All @@ -202,7 +290,7 @@ describe(MigrationRunner, () => {
tempDirs.push(dir);

const client = makeClient(async (sql) => {
if (sql.includes('SELECT id FROM')) return { rows: [] };
if (sql.includes('SELECT id, checksum FROM')) return { rows: [] };
return { rows: [] };
});
const runner = new MigrationRunner(client, InternalDialect.SQLITE, dir, strategyReturning('SELECT 1'));
Expand Down