Skip to content

Commit c42a94f

Browse files
authored
UBERF-9559: Make CR accounts migrations concurrency safe (#8821)
* uberf-9559: make cr accounts migrations concurrency safe Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com> * uberf-9559: fix test Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
1 parent 0eac3cd commit c42a94f

File tree

2 files changed

+129
-10
lines changed

2 files changed

+129
-10
lines changed

server/account/src/__tests__/postgres.test.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,10 +474,13 @@ describe('PostgresAccountDB', () => {
474474

475475
expect(mockClient.begin).toHaveBeenCalled()
476476
expect(mockClient).toHaveBeenCalledWith(
477-
'global_account' // First call with schema name
477+
'global_account' // Verify schema name
478+
)
479+
expect(mockClient.mock.calls[3][0].map((s: string) => s.replace(/\s+/g, ' ')).join('')).toBe(
480+
' INSERT INTO ._account_applied_migrations (identifier, ddl, last_processed_at) VALUES (, , NOW()) ON CONFLICT (identifier) DO NOTHING '
478481
)
479482
expect(mockClient).toHaveBeenCalledWith(
480-
['INSERT INTO ', '._account_applied_migrations (identifier, ddl) VALUES (', ', ', ') ON CONFLICT DO NOTHING'],
483+
expect.anything(),
481484
expect.anything(),
482485
'test_migration',
483486
'CREATE TABLE test'

server/account/src/collections/postgres.ts

Lines changed: 124 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -488,15 +488,102 @@ export class PostgresAccountDB implements AccountDB {
488488
}
489489

490490
async migrate (name: string, ddl: string): Promise<void> {
491-
await this.client.begin(async (client) => {
492-
const res =
493-
await client`INSERT INTO ${this.client(this.ns)}._account_applied_migrations (identifier, ddl) VALUES (${name}, ${ddl}) ON CONFLICT DO NOTHING`
491+
const staleTimeoutMs = 30000
492+
const retryIntervalMs = 5000
493+
let migrationComplete = false
494+
let updateInterval: NodeJS.Timeout | null = null
495+
let executed = false
496+
497+
const executeMigration = async (client: Sql): Promise<void> => {
498+
updateInterval = setInterval(() => {
499+
this.client`
500+
UPDATE ${this.client(this.ns)}._account_applied_migrations
501+
SET last_processed_at = NOW()
502+
WHERE identifier = ${name} AND applied_at IS NULL
503+
`.catch((err) => {
504+
console.error(`Failed to update last_processed_at for migration ${name}:`, err)
505+
})
506+
}, 5000)
507+
508+
await client.unsafe(ddl)
509+
executed = true
510+
}
511+
512+
try {
513+
while (!migrationComplete) {
514+
try {
515+
executed = false
516+
await this.client.begin(async (client) => {
517+
// Only locks if row exists and is not already locked
518+
const existing = await client`
519+
SELECT identifier, applied_at, last_processed_at
520+
FROM ${this.client(this.ns)}._account_applied_migrations
521+
WHERE identifier = ${name}
522+
FOR UPDATE NOWAIT
523+
`
524+
525+
if (existing.length > 0) {
526+
if (existing[0].applied_at !== null) {
527+
// Already completed
528+
migrationComplete = true
529+
} else if (
530+
existing[0].last_processed_at === null ||
531+
Date.now() - new Date(existing[0].last_processed_at).getTime() > staleTimeoutMs
532+
) {
533+
// Take over the stale migration
534+
await client`
535+
UPDATE ${this.client(this.ns)}._account_applied_migrations
536+
SET last_processed_at = NOW()
537+
WHERE identifier = ${name}
538+
`
539+
540+
await executeMigration(client)
541+
}
542+
} else {
543+
const res = await client`
544+
INSERT INTO ${this.client(this.ns)}._account_applied_migrations
545+
(identifier, ddl, last_processed_at)
546+
VALUES (${name}, ${ddl}, NOW())
547+
ON CONFLICT (identifier) DO NOTHING
548+
`
549+
550+
if (res.count === 1) {
551+
// Successfully inserted
552+
await executeMigration(client)
553+
}
554+
// If insert failed (count === 0), another worker got it first, we'll retry the loop
555+
}
556+
})
557+
558+
if (executed) {
559+
await this.client`
560+
UPDATE ${this.client(this.ns)}._account_applied_migrations
561+
SET applied_at = NOW()
562+
WHERE identifier = ${name}
563+
`
564+
migrationComplete = true
565+
}
566+
} catch (err: any) {
567+
if (['55P03', '40001'].includes(err.code)) {
568+
// newLockNotAvailableError, WriteTooOldError
569+
} else {
570+
console.error(`Error in migration ${name}: ${err.code} - ${err.message}`)
571+
}
494572

495-
if (res.count === 1) {
496-
console.log(`Applying migration: ${name}`)
497-
await client.unsafe(ddl)
573+
if (updateInterval !== null) {
574+
clearInterval(updateInterval)
575+
}
576+
}
577+
578+
if (!migrationComplete) {
579+
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs))
580+
}
498581
}
499-
})
582+
} finally {
583+
if (updateInterval !== null) {
584+
clearInterval(updateInterval)
585+
}
586+
}
500587
}
501588

502589
async _init (): Promise<void> {
@@ -507,10 +594,39 @@ export class PostgresAccountDB implements AccountDB {
507594
CREATE TABLE IF NOT EXISTS ${this.ns}._account_applied_migrations (
508595
identifier VARCHAR(255) NOT NULL PRIMARY KEY
509596
, ddl TEXT NOT NULL
510-
, applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
597+
, applied_at TIMESTAMP WITH TIME ZONE
598+
, last_processed_at TIMESTAMP WITH TIME ZONE
511599
);
600+
601+
ALTER TABLE ${this.ns}._account_applied_migrations
602+
ADD COLUMN IF NOT EXISTS last_processed_at TIMESTAMP WITH TIME ZONE;
512603
`
513604
)
605+
606+
const constraintsExist = await this.client`
607+
SELECT 1
608+
FROM information_schema.columns
609+
WHERE table_schema = ${this.ns}
610+
AND table_name = '_account_applied_migrations'
611+
AND column_name = 'applied_at'
612+
AND (column_default IS NOT NULL OR is_nullable = 'NO')
613+
`
614+
615+
if (constraintsExist.length > 0) {
616+
try {
617+
await this.client.unsafe(
618+
`
619+
ALTER TABLE ${this.ns}._account_applied_migrations
620+
ALTER COLUMN applied_at DROP DEFAULT;
621+
622+
ALTER TABLE ${this.ns}._account_applied_migrations
623+
ALTER COLUMN applied_at DROP NOT NULL;
624+
`
625+
)
626+
} catch (err) {
627+
// Ignore errors since they likely mean constraints were already removed by another concurrent migration
628+
}
629+
}
514630
}
515631

516632
async createWorkspace (data: WorkspaceData, status: WorkspaceStatusData): Promise<WorkspaceUuid> {

0 commit comments

Comments
 (0)