diff --git a/cf/src/index.js b/cf/src/index.js index d24e9f9c..d27ea0c6 100644 --- a/cf/src/index.js +++ b/cf/src/index.js @@ -481,7 +481,7 @@ function parseOptions(a, b) { {} ), connection : { - application_name: 'postgres.js', + application_name: env.PGAPPNAME || 'postgres.js', ...o.connection, ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) }, diff --git a/cf/src/subscribe.js b/cf/src/subscribe.js index 8716100e..0b9ab515 100644 --- a/cf/src/subscribe.js +++ b/cf/src/subscribe.js @@ -3,7 +3,7 @@ const noop = () => { /* noop */ } export default function Subscribe(postgres, options) { const subscribers = new Map() - , slot = 'postgresjs_' + Math.random().toString(36).slice(2) + , slot = options.slot || ('postgresjs_' + Math.random().toString(36).slice(2)) , state = {} let connection @@ -49,16 +49,20 @@ export default function Subscribe(postgres, options) { return subscribe async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { + // Parse the subscription pattern into standardized format (command:schema.table=key) event = parseEvent(event) + // Initialize connection if this is the first subscription if (!connection) connection = init(sql, slot, options.publications) + // Store the subscriber's callback function and onsubscribe handler const subscriber = { fn, onsubscribe } const fns = subscribers.has(event) ? subscribers.get(event).add(subscriber) : subscribers.set(event, new Set([subscriber])).get(event) + // Return function to remove this specific subscription const unsubscribe = () => { fns.delete(subscriber) fns.size === 0 && subscribers.delete(event) @@ -82,12 +86,23 @@ export default function Subscribe(postgres, options) { if (!publications) throw new Error('Missing publication names') - const xs = await sql.unsafe( - `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` - ) + let xs + if (slot.startsWith('postgresjs_')) { + // Create a new temporary replication slot and get its info + xs = await sql.unsafe( + `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` + ) + } else { + // Look up existing replication slot + const slots = await sql.unsafe('SELECT confirmed_flush_lsn AS consistent_point FROM pg_replication_slots WHERE slot_name = $1', [slot]) + if (!slots.length) throw new Error(`Replication slot "${slot}" does not exist`) + xs = slots + } + // Get the first (and only) slot info record const [x] = xs + // Start replication from the last confirmed position const stream = await sql.unsafe( `START_REPLICATION SLOT ${ slot } LOGICAL ${ x.consistent_point @@ -98,6 +113,7 @@ export default function Subscribe(postgres, options) { lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex'))) } + // Set up stream handlers stream.on('data', data) stream.on('error', error) stream.on('close', sql.close) @@ -109,16 +125,26 @@ export default function Subscribe(postgres, options) { } function data(x) { - if (x[0] === 0x77) { + if (x[0] === 0x77) { // 'w' - WAL data + // Parse the WAL data starting after the 25-byte header parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - } else if (x[0] === 0x6b && x[17]) { + } else if (x[0] === 0x6b && x[17]) { // 'k' - Keepalive message + // Update LSN and respond with pong state.lsn = x.subarray(1, 9) pong() } } + // Handle parsed WAL data by notifying relevant subscribers function handle(a, b) { const path = b.relation.schema + '.' + b.relation.table + // Notify subscribers using different patterns: + // - '*' for all changes + // - '*:schema.table' for all changes to a table + // - '*:schema.table=key' for all changes to a specific record + // - 'command' for specific operations (insert/update/delete) + // - 'command:schema.table' for specific operations on a table + // - 'command:schema.table=key' for specific operations on a record call('*', a, b) call('*:' + path, a, b) b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) @@ -127,15 +153,17 @@ export default function Subscribe(postgres, options) { b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) } + // Respond to keepalive messages function pong() { const x = Buffer.alloc(34) - x[0] = 'r'.charCodeAt(0) - x.fill(state.lsn, 1) - x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) + x[0] = 'r'.charCodeAt(0) // 'r' for reply + x.fill(state.lsn, 1) // Copy LSN + x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) // Current timestamp stream.write(x) } } + // Notify all subscribers for a given event pattern function call(x, a, b) { subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x)) } diff --git a/cjs/src/index.js b/cjs/src/index.js index 40ac2c18..8f8d68df 100644 --- a/cjs/src/index.js +++ b/cjs/src/index.js @@ -480,7 +480,7 @@ function parseOptions(a, b) { {} ), connection : { - application_name: 'postgres.js', + application_name: env.PGAPPNAME || 'postgres.js', ...o.connection, ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) }, diff --git a/cjs/src/subscribe.js b/cjs/src/subscribe.js index 6aaa8962..d1994ca9 100644 --- a/cjs/src/subscribe.js +++ b/cjs/src/subscribe.js @@ -2,7 +2,7 @@ const noop = () => { /* noop */ } module.exports = Subscribe;function Subscribe(postgres, options) { const subscribers = new Map() - , slot = 'postgresjs_' + Math.random().toString(36).slice(2) + , slot = options.slot || ('postgresjs_' + Math.random().toString(36).slice(2)) , state = {} let connection @@ -48,16 +48,20 @@ module.exports = Subscribe;function Subscribe(postgres, options) { return subscribe async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { + // Parse the subscription pattern into standardized format (command:schema.table=key) event = parseEvent(event) + // Initialize connection if this is the first subscription if (!connection) connection = init(sql, slot, options.publications) + // Store the subscriber's callback function and onsubscribe handler const subscriber = { fn, onsubscribe } const fns = subscribers.has(event) ? subscribers.get(event).add(subscriber) : subscribers.set(event, new Set([subscriber])).get(event) + // Return function to remove this specific subscription const unsubscribe = () => { fns.delete(subscriber) fns.size === 0 && subscribers.delete(event) @@ -81,12 +85,23 @@ module.exports = Subscribe;function Subscribe(postgres, options) { if (!publications) throw new Error('Missing publication names') - const xs = await sql.unsafe( - `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` - ) + let xs + if (slot.startsWith('postgresjs_')) { + // Create a new temporary replication slot and get its info + xs = await sql.unsafe( + `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` + ) + } else { + // Look up existing replication slot + const slots = await sql.unsafe('SELECT confirmed_flush_lsn AS consistent_point FROM pg_replication_slots WHERE slot_name = $1', [slot]) + if (!slots.length) throw new Error(`Replication slot "${slot}" does not exist`) + xs = slots + } + // Get the first (and only) slot info record const [x] = xs + // Start replication from the last confirmed position const stream = await sql.unsafe( `START_REPLICATION SLOT ${ slot } LOGICAL ${ x.consistent_point @@ -97,6 +112,7 @@ module.exports = Subscribe;function Subscribe(postgres, options) { lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex'))) } + // Set up stream handlers stream.on('data', data) stream.on('error', error) stream.on('close', sql.close) @@ -108,16 +124,26 @@ module.exports = Subscribe;function Subscribe(postgres, options) { } function data(x) { - if (x[0] === 0x77) { + if (x[0] === 0x77) { // 'w' - WAL data + // Parse the WAL data starting after the 25-byte header parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - } else if (x[0] === 0x6b && x[17]) { + } else if (x[0] === 0x6b && x[17]) { // 'k' - Keepalive message + // Update LSN and respond with pong state.lsn = x.subarray(1, 9) pong() } } + // Handle parsed WAL data by notifying relevant subscribers function handle(a, b) { const path = b.relation.schema + '.' + b.relation.table + // Notify subscribers using different patterns: + // - '*' for all changes + // - '*:schema.table' for all changes to a table + // - '*:schema.table=key' for all changes to a specific record + // - 'command' for specific operations (insert/update/delete) + // - 'command:schema.table' for specific operations on a table + // - 'command:schema.table=key' for specific operations on a record call('*', a, b) call('*:' + path, a, b) b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) @@ -126,15 +152,17 @@ module.exports = Subscribe;function Subscribe(postgres, options) { b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) } + // Respond to keepalive messages function pong() { const x = Buffer.alloc(34) - x[0] = 'r'.charCodeAt(0) - x.fill(state.lsn, 1) - x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) + x[0] = 'r'.charCodeAt(0) // 'r' for reply + x.fill(state.lsn, 1) // Copy LSN + x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) // Current timestamp stream.write(x) } } + // Notify all subscribers for a given event pattern function call(x, a, b) { subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x)) } diff --git a/cjs/tests/index.js b/cjs/tests/index.js index 7d84ac67..c4dd5e78 100644 --- a/cjs/tests/index.js +++ b/cjs/tests/index.js @@ -2580,3 +2580,59 @@ t('arrays in reserved connection', async() => { x.join('') ] }) + +t('named subscription', { timeout: 4 }, async() => { + const sql = postgres({ + database: 'postgres_js_test', + publications: 'alltables', + fetch_types: false + }) + + await sql.unsafe('create publication alltables for all tables') + + const result = [] + let onsubscribes = 0 + + // Create two subscribers using the same subscription name + const { unsubscribe: unsub1 } = await sql.subscribe( + '*', + (row, { command }) => result.push('sub1', command, row.name || row.id), + () => onsubscribes++, + { name: 'test_sub' } + ) + + const { unsubscribe: unsub2 } = await sql.subscribe( + '*', + (row, { command }) => result.push('sub2', command, row.name || row.id), + () => onsubscribes++, + { name: 'test_sub' } + ) + + await sql` + create table test ( + id serial primary key, + name text + ) + ` + + await sql`insert into test (name) values ('Murray')` + await delay(10) + + // Force a reconnection to verify subscription persists + await sql.end({ timeout: 0 }) + await delay(500) + + await sql`insert into test (name) values ('Rothbard')` + await delay(100) + + await unsub1() + await unsub2() + + return [ + 'sub1,insert,Murray,sub2,insert,Murray,sub1,insert,Rothbard,sub2,insert,Rothbard', + result.join(','), + await sql`drop table test`, + await sql`drop publication alltables`, + await sql.end() + ] +}) diff --git a/deno/README.md b/deno/README.md index 6f8085cf..354fee26 100644 --- a/deno/README.md +++ b/deno/README.md @@ -1121,20 +1121,25 @@ It is also possible to connect to the database without a connection string or an const sql = postgres() ``` -| Option | Environment Variables | -| ----------------- | ------------------------ | -| `host` | `PGHOST` | -| `port` | `PGPORT` | -| `database` | `PGDATABASE` | -| `username` | `PGUSERNAME` or `PGUSER` | -| `password` | `PGPASSWORD` | -| `idle_timeout` | `PGIDLE_TIMEOUT` | -| `connect_timeout` | `PGCONNECT_TIMEOUT` | +| Option | Environment Variables | +| ------------------ | ------------------------ | +| `host` | `PGHOST` | +| `port` | `PGPORT` | +| `database` | `PGDATABASE` | +| `username` | `PGUSERNAME` or `PGUSER` | +| `password` | `PGPASSWORD` | +| `application_name` | `PGAPPNAME` | +| `idle_timeout` | `PGIDLE_TIMEOUT` | +| `connect_timeout` | `PGCONNECT_TIMEOUT` | ### Prepared statements Prepared statements will automatically be created for any queries where it can be inferred that the query is static. This can be disabled by using the `prepare: false` option. For instance — this is useful when [using PGBouncer in `transaction mode`](https://github.com/porsager/postgres/issues/93#issuecomment-656290493). +**update**: [since 1.21.0](https://www.pgbouncer.org/2023/10/pgbouncer-1-21-0) +PGBouncer supports protocol-level named prepared statements when [configured +properly](https://www.pgbouncer.org/config.html#max_prepared_statements) + ## Custom Types You can add ergonomic support for custom types, or simply use `sql.typed(value, type)` inline, where type is the PostgreSQL `oid` for the type and the correctly serialized string. _(`oid` values for types can be found in the `pg_catalog.pg_type` table.)_ @@ -1294,8 +1299,8 @@ This error is thrown if the user has called [`sql.end()`](#teardown--cleanup) an This error is thrown for any queries that were pending when the timeout to [`sql.end({ timeout: X })`](#teardown--cleanup) was reached. -##### CONNECTION_CONNECT_TIMEOUT -> write CONNECTION_CONNECT_TIMEOUT host:port +##### CONNECT_TIMEOUT +> write CONNECT_TIMEOUT host:port This error is thrown if the startup phase of the connection (tcp, protocol negotiation, and auth) took more than the default 30 seconds or what was specified using `connect_timeout` or `PGCONNECT_TIMEOUT`. diff --git a/deno/src/index.js b/deno/src/index.js index 3bbdf2ba..17796505 100644 --- a/deno/src/index.js +++ b/deno/src/index.js @@ -481,7 +481,7 @@ function parseOptions(a, b) { {} ), connection : { - application_name: 'postgres.js', + application_name: env.PGAPPNAME || 'postgres.js', ...o.connection, ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) }, diff --git a/deno/src/subscribe.js b/deno/src/subscribe.js index b20efb96..c9834b5c 100644 --- a/deno/src/subscribe.js +++ b/deno/src/subscribe.js @@ -3,7 +3,7 @@ const noop = () => { /* noop */ } export default function Subscribe(postgres, options) { const subscribers = new Map() - , slot = 'postgresjs_' + Math.random().toString(36).slice(2) + , slot = options.slot || ('postgresjs_' + Math.random().toString(36).slice(2)) , state = {} let connection @@ -49,16 +49,20 @@ export default function Subscribe(postgres, options) { return subscribe async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { + // Parse the subscription pattern into standardized format (command:schema.table=key) event = parseEvent(event) + // Initialize connection if this is the first subscription if (!connection) connection = init(sql, slot, options.publications) + // Store the subscriber's callback function and onsubscribe handler const subscriber = { fn, onsubscribe } const fns = subscribers.has(event) ? subscribers.get(event).add(subscriber) : subscribers.set(event, new Set([subscriber])).get(event) + // Return function to remove this specific subscription const unsubscribe = () => { fns.delete(subscriber) fns.size === 0 && subscribers.delete(event) @@ -82,12 +86,23 @@ export default function Subscribe(postgres, options) { if (!publications) throw new Error('Missing publication names') - const xs = await sql.unsafe( - `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` - ) + let xs + if (slot.startsWith('postgresjs_')) { + // Create a new temporary replication slot and get its info + xs = await sql.unsafe( + `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` + ) + } else { + // Look up existing replication slot + const slots = await sql.unsafe('SELECT confirmed_flush_lsn AS consistent_point FROM pg_replication_slots WHERE slot_name = $1', [slot]) + if (!slots.length) throw new Error(`Replication slot "${slot}" does not exist`) + xs = slots + } + // Get the first (and only) slot info record const [x] = xs + // Start replication from the last confirmed position const stream = await sql.unsafe( `START_REPLICATION SLOT ${ slot } LOGICAL ${ x.consistent_point @@ -98,6 +113,7 @@ export default function Subscribe(postgres, options) { lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex'))) } + // Set up stream handlers stream.on('data', data) stream.on('error', error) stream.on('close', sql.close) @@ -109,16 +125,26 @@ export default function Subscribe(postgres, options) { } function data(x) { - if (x[0] === 0x77) { + if (x[0] === 0x77) { // 'w' - WAL data + // Parse the WAL data starting after the 25-byte header parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - } else if (x[0] === 0x6b && x[17]) { + } else if (x[0] === 0x6b && x[17]) { // 'k' - Keepalive message + // Update LSN and respond with pong state.lsn = x.subarray(1, 9) pong() } } + // Handle parsed WAL data by notifying relevant subscribers function handle(a, b) { const path = b.relation.schema + '.' + b.relation.table + // Notify subscribers using different patterns: + // - '*' for all changes + // - '*:schema.table' for all changes to a table + // - '*:schema.table=key' for all changes to a specific record + // - 'command' for specific operations (insert/update/delete) + // - 'command:schema.table' for specific operations on a table + // - 'command:schema.table=key' for specific operations on a record call('*', a, b) call('*:' + path, a, b) b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) @@ -127,15 +153,17 @@ export default function Subscribe(postgres, options) { b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) } + // Respond to keepalive messages function pong() { const x = Buffer.alloc(34) - x[0] = 'r'.charCodeAt(0) - x.fill(state.lsn, 1) - x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) + x[0] = 'r'.charCodeAt(0) // 'r' for reply + x.fill(state.lsn, 1) // Copy LSN + x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) // Current timestamp stream.write(x) } } + // Notify all subscribers for a given event pattern function call(x, a, b) { subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x)) } diff --git a/deno/tests/index.js b/deno/tests/index.js index 5b5d6e57..b9776d2a 100644 --- a/deno/tests/index.js +++ b/deno/tests/index.js @@ -2583,4 +2583,60 @@ t('arrays in reserved connection', async() => { ] }) +t('named subscription', { timeout: 4 }, async() => { + const sql = postgres({ + database: 'postgres_js_test', + publications: 'alltables', + fetch_types: false + }) + + await sql.unsafe('create publication alltables for all tables') + + const result = [] + let onsubscribes = 0 + + // Create two subscribers using the same subscription name + const { unsubscribe: unsub1 } = await sql.subscribe( + '*', + (row, { command }) => result.push('sub1', command, row.name || row.id), + () => onsubscribes++, + { name: 'test_sub' } + ) + + const { unsubscribe: unsub2 } = await sql.subscribe( + '*', + (row, { command }) => result.push('sub2', command, row.name || row.id), + () => onsubscribes++, + { name: 'test_sub' } + ) + + await sql` + create table test ( + id serial primary key, + name text + ) + ` + + await sql`insert into test (name) values ('Murray')` + await delay(10) + + // Force a reconnection to verify subscription persists + await sql.end({ timeout: 0 }) + await delay(500) + + await sql`insert into test (name) values ('Rothbard')` + await delay(100) + + await unsub1() + await unsub2() + + return [ + 'sub1,insert,Murray,sub2,insert,Murray,sub1,insert,Rothbard,sub2,insert,Rothbard', + result.join(','), + await sql`drop table test`, + await sql`drop publication alltables`, + await sql.end() + ] +}) + ;globalThis.addEventListener("unload", () => Deno.exit(process.exitCode)) \ No newline at end of file diff --git a/src/subscribe.js b/src/subscribe.js index 4f8934cc..66755575 100644 --- a/src/subscribe.js +++ b/src/subscribe.js @@ -2,7 +2,7 @@ const noop = () => { /* noop */ } export default function Subscribe(postgres, options) { const subscribers = new Map() - , slot = 'postgresjs_' + Math.random().toString(36).slice(2) + , slot = options.slot || ('postgresjs_' + Math.random().toString(36).slice(2)) , state = {} let connection @@ -48,16 +48,20 @@ export default function Subscribe(postgres, options) { return subscribe async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { + // Parse the subscription pattern into standardized format (command:schema.table=key) event = parseEvent(event) + // Initialize connection if this is the first subscription if (!connection) connection = init(sql, slot, options.publications) + // Store the subscriber's callback function and onsubscribe handler const subscriber = { fn, onsubscribe } const fns = subscribers.has(event) ? subscribers.get(event).add(subscriber) : subscribers.set(event, new Set([subscriber])).get(event) + // Return function to remove this specific subscription const unsubscribe = () => { fns.delete(subscriber) fns.size === 0 && subscribers.delete(event) @@ -81,12 +85,23 @@ export default function Subscribe(postgres, options) { if (!publications) throw new Error('Missing publication names') - const xs = await sql.unsafe( - `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` - ) + let xs + if (slot.startsWith('postgresjs_')) { + // Create a new temporary replication slot and get its info + xs = await sql.unsafe( + `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` + ) + } else { + // Look up existing replication slot + const slots = await sql.unsafe('SELECT confirmed_flush_lsn AS consistent_point FROM pg_replication_slots WHERE slot_name = $1', [slot]) + if (!slots.length) throw new Error(`Replication slot "${slot}" does not exist`) + xs = slots + } + // Get the first (and only) slot info record const [x] = xs + // Start replication from the last confirmed position const stream = await sql.unsafe( `START_REPLICATION SLOT ${ slot } LOGICAL ${ x.consistent_point @@ -97,6 +112,7 @@ export default function Subscribe(postgres, options) { lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex'))) } + // Set up stream handlers stream.on('data', data) stream.on('error', error) stream.on('close', sql.close) @@ -108,16 +124,26 @@ export default function Subscribe(postgres, options) { } function data(x) { - if (x[0] === 0x77) { + if (x[0] === 0x77) { // 'w' - WAL data + // Parse the WAL data starting after the 25-byte header parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - } else if (x[0] === 0x6b && x[17]) { + } else if (x[0] === 0x6b && x[17]) { // 'k' - Keepalive message + // Update LSN and respond with pong state.lsn = x.subarray(1, 9) pong() } } + // Handle parsed WAL data by notifying relevant subscribers function handle(a, b) { const path = b.relation.schema + '.' + b.relation.table + // Notify subscribers using different patterns: + // - '*' for all changes + // - '*:schema.table' for all changes to a table + // - '*:schema.table=key' for all changes to a specific record + // - 'command' for specific operations (insert/update/delete) + // - 'command:schema.table' for specific operations on a table + // - 'command:schema.table=key' for specific operations on a record call('*', a, b) call('*:' + path, a, b) b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) @@ -126,15 +152,17 @@ export default function Subscribe(postgres, options) { b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) } + // Respond to keepalive messages function pong() { const x = Buffer.alloc(34) - x[0] = 'r'.charCodeAt(0) - x.fill(state.lsn, 1) - x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) + x[0] = 'r'.charCodeAt(0) // 'r' for reply + x.fill(state.lsn, 1) // Copy LSN + x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) // Current timestamp stream.write(x) } } + // Notify all subscribers for a given event pattern function call(x, a, b) { subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x)) } diff --git a/tests/index.js b/tests/index.js index bf81b036..80306b93 100644 --- a/tests/index.js +++ b/tests/index.js @@ -2580,3 +2580,59 @@ t('arrays in reserved connection', async() => { x.join('') ] }) + +t('named subscription', { timeout: 4 }, async() => { + const sql = postgres({ + database: 'postgres_js_test', + publications: 'alltables', + fetch_types: false + }) + + await sql.unsafe('create publication alltables for all tables') + + const result = [] + let onsubscribes = 0 + + // Create two subscribers using the same subscription name + const { unsubscribe: unsub1 } = await sql.subscribe( + '*', + (row, { command }) => result.push('sub1', command, row.name || row.id), + () => onsubscribes++, + { name: 'test_sub' } + ) + + const { unsubscribe: unsub2 } = await sql.subscribe( + '*', + (row, { command }) => result.push('sub2', command, row.name || row.id), + () => onsubscribes++, + { name: 'test_sub' } + ) + + await sql` + create table test ( + id serial primary key, + name text + ) + ` + + await sql`insert into test (name) values ('Murray')` + await delay(10) + + // Force a reconnection to verify subscription persists + await sql.end({ timeout: 0 }) + await delay(500) + + await sql`insert into test (name) values ('Rothbard')` + await delay(100) + + await unsub1() + await unsub2() + + return [ + 'sub1,insert,Murray,sub2,insert,Murray,sub1,insert,Rothbard,sub2,insert,Rothbard', + result.join(','), + await sql`drop table test`, + await sql`drop publication alltables`, + await sql.end() + ] +})