Skip to content

Commit

Permalink
feat: postgres lock (#3443)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwasniew committed Apr 4, 2023
1 parent 06b969a commit 8654c9e
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 7 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -161,6 +161,7 @@
"@types/node": "16.18.21",
"@types/nodemailer": "6.4.7",
"@types/owasp-password-strength-test": "1.3.0",
"@types/pg": "8.6.6",
"@types/semver": "7.3.13",
"@types/stoppable": "1.1.1",
"@types/supertest": "2.0.12",
Expand Down
2 changes: 2 additions & 0 deletions src/lib/__snapshots__/create-config.test.ts.snap
Expand Up @@ -79,6 +79,7 @@ exports[`should create default config 1`] = `
"loginHistory": false,
"maintenanceMode": false,
"messageBanner": false,
"migrationLock": false,
"newProjectOverview": false,
"optimal304": false,
"optimal304Differ": false,
Expand All @@ -105,6 +106,7 @@ exports[`should create default config 1`] = `
"loginHistory": false,
"maintenanceMode": false,
"messageBanner": false,
"migrationLock": false,
"newProjectOverview": false,
"optimal304": false,
"optimal304Differ": false,
Expand Down
10 changes: 7 additions & 3 deletions src/lib/server-impl.test.ts
@@ -1,7 +1,7 @@
import { EventEmitter } from 'events';
import express from 'express';
import { createTestConfig } from '../test/config/test-config';
import { start, create } from './server-impl';
import FakeEventStore from '../test/fixtures/fake-event-store';

jest.mock(
'./routes',
Expand All @@ -15,7 +15,7 @@ jest.mock(

const noop = () => {};

const eventStore = new EventEmitter();
const eventStore = new FakeEventStore();
const settingStore = {
get: () => {
Promise.resolve('secret');
Expand Down Expand Up @@ -54,6 +54,10 @@ jest.mock('../migrator', () => ({
migrateDb: () => Promise.resolve(),
}));

jest.mock('./util/db-lock', () => ({
withDbLock: () => (fn) => fn,
}));

jest.mock(
'./util/version',
() =>
Expand Down Expand Up @@ -123,5 +127,5 @@ test('should shutdown the server when calling stop()', async () => {
createTestConfig({ server: { port: 0 } }),
);
await stop();
expect(server.address()).toBe(null);
expect(server!.address()).toBe(null);
});
14 changes: 13 additions & 1 deletion src/lib/server-impl.ts
Expand Up @@ -32,6 +32,7 @@ import { Knex } from 'knex';
import * as permissions from './types/permissions';
import * as eventType from './types/events';
import { Db } from './db/db';
import { defaultLockKey, defaultTimeout, withDbLock } from './util/db-lock';

async function createApp(
config: IUnleashConfig,
Expand Down Expand Up @@ -138,7 +139,18 @@ async function start(opts: IUnleashOptions = {}): Promise<IUnleash> {
logger.info('DB migration: disabled');
} else {
logger.debug('DB migration: start');
await migrateDb(config);
if (opts.flagResolver?.isEnabled('migrationLock')) {
logger.info('Running migration with lock');
const lock = withDbLock(config.db, {
lockKey: defaultLockKey,
timeout: defaultTimeout,
logger,
});
await lock(migrateDb)(config);
} else {
await migrateDb(config);
}

logger.debug('DB migration: end');
}
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/types/experimental.ts
Expand Up @@ -77,6 +77,7 @@ const flags = {
process.env.UNLEASH_EXPERIMENTAL_OPTIMAL_304_DIFFER,
false,
),
migrationLock: parseEnvVarBoolean(process.env.MIGRATION_LOCK, false),
};

export const defaultExperimentalOptions: IExperimentalOptions = {
Expand Down
69 changes: 69 additions & 0 deletions src/lib/util/db-lock.test.ts
@@ -0,0 +1,69 @@
import { withDbLock } from './db-lock';
import { getDbConfig } from '../../test/e2e/helpers/database-config';
import { IDBOption } from '../types';
import { Logger } from '../logger';

test('should lock access to any action', async () => {
const lock = withDbLock(getDbConfig() as IDBOption);

const asyncAction = (input: string) => Promise.resolve(`result: ${input}`);

const result = await lock(asyncAction)('data');

expect(result).toBe('result: data');
});

const ms = (millis: number) =>
new Promise((resolve) => {
setTimeout(() => resolve('time'), millis);
});

test('should await other actions on lock', async () => {
const lock = withDbLock(getDbConfig() as IDBOption);

const results: string[] = [];
const slowAsyncAction = (input: string) => {
return new Promise((resolve) => {
setTimeout(() => {
results.push(input);
resolve(input);
}, 200);
});
};
const fastAction = async (input: string) => {
results.push(input);
};

const lockedAction = lock(slowAsyncAction);
const lockedAnotherAction = lock(fastAction);

// deliberately skipped await to simulate another server running slow operation
lockedAction('first');
await ms(100); // start fast action after slow action established DB connection
await lockedAnotherAction('second');

await expect(results).toStrictEqual(['first', 'second']);
});

test('should handle lock timeout', async () => {
const timeoutMs = 1;
let loggedError = '';
const lock = withDbLock(getDbConfig() as IDBOption, {
lockKey: 1,
timeout: timeoutMs,
logger: {
error(msg: string) {
loggedError = msg;
},
} as unknown as Logger,
});

// the query should fail because of the timeout. This one is a fallback when timeout
// was not triggered in the integration test
const asyncAction = () => Promise.reject(new Error('Query read timeout'));

await expect(lock(asyncAction)()).rejects.toStrictEqual(
new Error('Query read timeout'),
);
expect(loggedError).toBe('Locking error: Query read timeout');
});
43 changes: 43 additions & 0 deletions src/lib/util/db-lock.ts
@@ -0,0 +1,43 @@
import { Client } from 'pg';
import { IDBOption } from '../types';
import { Logger } from '../logger';

export const defaultLockKey = 479341;
export const defaultTimeout = 5000;

interface IDbLockOptions {
timeout: number;
lockKey: number;
logger: Logger;
}

const defaultOptions: IDbLockOptions = {
timeout: defaultTimeout,
lockKey: defaultLockKey,
logger: { ...console, fatal: console.error },
};

export const withDbLock =
(dbConfig: IDBOption, config = defaultOptions) =>
<A extends any[], R>(fn: (...args: A) => Promise<R>) =>
async (...args: A): Promise<R> => {
const client = new Client({
...dbConfig,
query_timeout: config.timeout,
});
try {
await client.connect();
// wait to obtain a lock
await client.query('SELECT pg_advisory_lock($1)', [config.lockKey]);
const result = await fn(...args);
return result;
} catch (e) {
config.logger.error(`Locking error: ${e.message}`);
throw e;
} finally {
await client.query('SELECT pg_advisory_unlock($1)', [
config.lockKey,
]);
await client.end();
}
};
4 changes: 2 additions & 2 deletions src/test/fixtures/fake-event-store.ts
Expand Up @@ -16,7 +16,7 @@ class FakeEventStore implements IEventStore {
}

getMaxRevisionId(): Promise<number> {
throw new Error('Method not implemented.');
return Promise.resolve(1);
}

store(event: IEvent): Promise<void> {
Expand Down Expand Up @@ -64,7 +64,7 @@ class FakeEventStore implements IEventStore {
}

async get(key: number): Promise<IEvent> {
return this.events.find((e) => e.id === key);
return this.events.find((e) => e.id === key)!;
}

async getAll(): Promise<IEvent[]> {
Expand Down
16 changes: 15 additions & 1 deletion yarn.lock
Expand Up @@ -1271,6 +1271,15 @@
resolved "https://registry.yarnpkg.com/@types/owasp-password-strength-test/-/owasp-password-strength-test-1.3.0.tgz#f639e38847eeb0db14bf7b70896cecd4342ac571"
integrity sha512-eKYl6svyRua5OVUFm+AXSxdBrKo7snzrCcFv0KoqKNvNgB3fJzRq3s/xphf+jNTllqYxgsx1uWLeAcL4MjLWQQ==

"@types/pg@8.6.6":
version "8.6.6"
resolved "https://registry.yarnpkg.com/@types/pg/-/pg-8.6.6.tgz#21cdf873a3e345a6e78f394677e3b3b1b543cb80"
integrity sha512-O2xNmXebtwVekJDD+02udOncjVcMZQuTEQEMpKJ0ZRf5E7/9JJX3izhKUcUifBkyKpljyUM6BTgy2trmviKlpw==
dependencies:
"@types/node" "*"
pg-protocol "*"
pg-types "^2.2.0"

"@types/prettier@^2.1.5":
version "2.7.2"
resolved "https://registry.yarnpkg.com/@types/prettier/-/prettier-2.7.2.tgz#6c2324641cc4ba050a8c710b2b251b377581fbf0"
Expand Down Expand Up @@ -5646,12 +5655,17 @@ pg-pool@^3.5.2:
resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.5.2.tgz#ed1bed1fb8d79f1c6fd5fb1c99e990fbf9ddf178"
integrity sha512-His3Fh17Z4eg7oANLob6ZvH8xIVen3phEZh2QuyrIl4dQSDVEabNducv6ysROKpDNPSD+12tONZVWfSgMvDD9w==

pg-protocol@*:
version "1.6.0"
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.6.0.tgz#4c91613c0315349363af2084608db843502f8833"
integrity sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q==

pg-protocol@^1.5.0:
version "1.5.0"
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.5.0.tgz#b5dd452257314565e2d54ab3c132adc46565a6a0"
integrity sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==

pg-types@^2.1.0:
pg-types@^2.1.0, pg-types@^2.2.0:
version "2.2.0"
resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3"
integrity sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==
Expand Down

1 comment on commit 8654c9e

@vercel
Copy link

@vercel vercel bot commented on 8654c9e Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.