Skip to content

feat: allow existing subscription slots #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cf/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ function parseOptions(a, b) {
{}
),
connection : {
application_name: 'postgres.js',
application_name: env.PGAPPNAME || 'postgres.js',
...o.connection,
...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {})
},
Expand Down
46 changes: 37 additions & 9 deletions cf/src/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const noop = () => { /* noop */ }

export default function Subscribe(postgres, options) {
const subscribers = new Map()
, slot = 'postgresjs_' + Math.random().toString(36).slice(2)
, slot = options.slot || ('postgresjs_' + Math.random().toString(36).slice(2))
, state = {}

let connection
Expand Down Expand Up @@ -49,16 +49,20 @@ export default function Subscribe(postgres, options) {
return subscribe

async function subscribe(event, fn, onsubscribe = noop, onerror = noop) {
// Parse the subscription pattern into standardized format (command:schema.table=key)
event = parseEvent(event)

// Initialize connection if this is the first subscription
if (!connection)
connection = init(sql, slot, options.publications)

// Store the subscriber's callback function and onsubscribe handler
const subscriber = { fn, onsubscribe }
const fns = subscribers.has(event)
? subscribers.get(event).add(subscriber)
: subscribers.set(event, new Set([subscriber])).get(event)

// Return function to remove this specific subscription
const unsubscribe = () => {
fns.delete(subscriber)
fns.size === 0 && subscribers.delete(event)
Expand All @@ -82,12 +86,23 @@ export default function Subscribe(postgres, options) {
if (!publications)
throw new Error('Missing publication names')

const xs = await sql.unsafe(
`CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
)
let xs
if (slot.startsWith('postgresjs_')) {
// Create a new temporary replication slot and get its info
xs = await sql.unsafe(
`CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
)
} else {
// Look up existing replication slot
const slots = await sql.unsafe('SELECT confirmed_flush_lsn AS consistent_point FROM pg_replication_slots WHERE slot_name = $1', [slot])
if (!slots.length) throw new Error(`Replication slot "${slot}" does not exist`)
xs = slots
}

// Get the first (and only) slot info record
const [x] = xs

// Start replication from the last confirmed position
const stream = await sql.unsafe(
`START_REPLICATION SLOT ${ slot } LOGICAL ${
x.consistent_point
Expand All @@ -98,6 +113,7 @@ export default function Subscribe(postgres, options) {
lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex')))
}

// Set up stream handlers
stream.on('data', data)
stream.on('error', error)
stream.on('close', sql.close)
Expand All @@ -109,16 +125,26 @@ export default function Subscribe(postgres, options) {
}

function data(x) {
if (x[0] === 0x77) {
if (x[0] === 0x77) { // 'w' - WAL data
// Parse the WAL data starting after the 25-byte header
parse(x.subarray(25), state, sql.options.parsers, handle, options.transform)
} else if (x[0] === 0x6b && x[17]) {
} else if (x[0] === 0x6b && x[17]) { // 'k' - Keepalive message
// Update LSN and respond with pong
state.lsn = x.subarray(1, 9)
pong()
}
}

// Handle parsed WAL data by notifying relevant subscribers
function handle(a, b) {
const path = b.relation.schema + '.' + b.relation.table
// Notify subscribers using different patterns:
// - '*' for all changes
// - '*:schema.table' for all changes to a table
// - '*:schema.table=key' for all changes to a specific record
// - 'command' for specific operations (insert/update/delete)
// - 'command:schema.table' for specific operations on a table
// - 'command:schema.table=key' for specific operations on a record
call('*', a, b)
call('*:' + path, a, b)
b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b)
Expand All @@ -127,15 +153,17 @@ export default function Subscribe(postgres, options) {
b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b)
}

// Respond to keepalive messages
function pong() {
const x = Buffer.alloc(34)
x[0] = 'r'.charCodeAt(0)
x.fill(state.lsn, 1)
x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25)
x[0] = 'r'.charCodeAt(0) // 'r' for reply
x.fill(state.lsn, 1) // Copy LSN
x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) // Current timestamp
stream.write(x)
}
}

// Notify all subscribers for a given event pattern
function call(x, a, b) {
subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x))
}
Expand Down
2 changes: 1 addition & 1 deletion cjs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ function parseOptions(a, b) {
{}
),
connection : {
application_name: 'postgres.js',
application_name: env.PGAPPNAME || 'postgres.js',
...o.connection,
...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {})
},
Expand Down
46 changes: 37 additions & 9 deletions cjs/src/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const noop = () => { /* noop */ }

module.exports = Subscribe;function Subscribe(postgres, options) {
const subscribers = new Map()
, slot = 'postgresjs_' + Math.random().toString(36).slice(2)
, slot = options.slot || ('postgresjs_' + Math.random().toString(36).slice(2))
, state = {}

let connection
Expand Down Expand Up @@ -48,16 +48,20 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
return subscribe

async function subscribe(event, fn, onsubscribe = noop, onerror = noop) {
// Parse the subscription pattern into standardized format (command:schema.table=key)
event = parseEvent(event)

// Initialize connection if this is the first subscription
if (!connection)
connection = init(sql, slot, options.publications)

// Store the subscriber's callback function and onsubscribe handler
const subscriber = { fn, onsubscribe }
const fns = subscribers.has(event)
? subscribers.get(event).add(subscriber)
: subscribers.set(event, new Set([subscriber])).get(event)

// Return function to remove this specific subscription
const unsubscribe = () => {
fns.delete(subscriber)
fns.size === 0 && subscribers.delete(event)
Expand All @@ -81,12 +85,23 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
if (!publications)
throw new Error('Missing publication names')

const xs = await sql.unsafe(
`CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
)
let xs
if (slot.startsWith('postgresjs_')) {
// Create a new temporary replication slot and get its info
xs = await sql.unsafe(
`CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
)
} else {
// Look up existing replication slot
const slots = await sql.unsafe('SELECT confirmed_flush_lsn AS consistent_point FROM pg_replication_slots WHERE slot_name = $1', [slot])
if (!slots.length) throw new Error(`Replication slot "${slot}" does not exist`)
xs = slots
}

// Get the first (and only) slot info record
const [x] = xs

// Start replication from the last confirmed position
const stream = await sql.unsafe(
`START_REPLICATION SLOT ${ slot } LOGICAL ${
x.consistent_point
Expand All @@ -97,6 +112,7 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex')))
}

// Set up stream handlers
stream.on('data', data)
stream.on('error', error)
stream.on('close', sql.close)
Expand All @@ -108,16 +124,26 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
}

function data(x) {
if (x[0] === 0x77) {
if (x[0] === 0x77) { // 'w' - WAL data
// Parse the WAL data starting after the 25-byte header
parse(x.subarray(25), state, sql.options.parsers, handle, options.transform)
} else if (x[0] === 0x6b && x[17]) {
} else if (x[0] === 0x6b && x[17]) { // 'k' - Keepalive message
// Update LSN and respond with pong
state.lsn = x.subarray(1, 9)
pong()
}
}

// Handle parsed WAL data by notifying relevant subscribers
function handle(a, b) {
const path = b.relation.schema + '.' + b.relation.table
// Notify subscribers using different patterns:
// - '*' for all changes
// - '*:schema.table' for all changes to a table
// - '*:schema.table=key' for all changes to a specific record
// - 'command' for specific operations (insert/update/delete)
// - 'command:schema.table' for specific operations on a table
// - 'command:schema.table=key' for specific operations on a record
call('*', a, b)
call('*:' + path, a, b)
b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b)
Expand All @@ -126,15 +152,17 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b)
}

// Respond to keepalive messages
function pong() {
const x = Buffer.alloc(34)
x[0] = 'r'.charCodeAt(0)
x.fill(state.lsn, 1)
x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25)
x[0] = 'r'.charCodeAt(0) // 'r' for reply
x.fill(state.lsn, 1) // Copy LSN
x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) // Current timestamp
stream.write(x)
}
}

// Notify all subscribers for a given event pattern
function call(x, a, b) {
subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x))
}
Expand Down
56 changes: 56 additions & 0 deletions cjs/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2580,3 +2580,59 @@ t('arrays in reserved connection', async() => {
x.join('')
]
})

t('named subscription', { timeout: 4 }, async() => {
const sql = postgres({
database: 'postgres_js_test',
publications: 'alltables',
fetch_types: false
})

await sql.unsafe('create publication alltables for all tables')

const result = []
let onsubscribes = 0

// Create two subscribers using the same subscription name
const { unsubscribe: unsub1 } = await sql.subscribe(
'*',
(row, { command }) => result.push('sub1', command, row.name || row.id),
() => onsubscribes++,
{ name: 'test_sub' }
)

const { unsubscribe: unsub2 } = await sql.subscribe(
'*',
(row, { command }) => result.push('sub2', command, row.name || row.id),
() => onsubscribes++,
{ name: 'test_sub' }
)

await sql`
create table test (
id serial primary key,
name text
)
`

await sql`insert into test (name) values ('Murray')`
await delay(10)

// Force a reconnection to verify subscription persists
await sql.end({ timeout: 0 })
await delay(500)

await sql`insert into test (name) values ('Rothbard')`
await delay(100)

await unsub1()
await unsub2()

return [
'sub1,insert,Murray,sub2,insert,Murray,sub1,insert,Rothbard,sub2,insert,Rothbard',
result.join(','),
await sql`drop table test`,
await sql`drop publication alltables`,
await sql.end()
]
})
27 changes: 16 additions & 11 deletions deno/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1121,20 +1121,25 @@ It is also possible to connect to the database without a connection string or an
const sql = postgres()
```

| Option | Environment Variables |
| ----------------- | ------------------------ |
| `host` | `PGHOST` |
| `port` | `PGPORT` |
| `database` | `PGDATABASE` |
| `username` | `PGUSERNAME` or `PGUSER` |
| `password` | `PGPASSWORD` |
| `idle_timeout` | `PGIDLE_TIMEOUT` |
| `connect_timeout` | `PGCONNECT_TIMEOUT` |
| Option | Environment Variables |
| ------------------ | ------------------------ |
| `host` | `PGHOST` |
| `port` | `PGPORT` |
| `database` | `PGDATABASE` |
| `username` | `PGUSERNAME` or `PGUSER` |
| `password` | `PGPASSWORD` |
| `application_name` | `PGAPPNAME` |
| `idle_timeout` | `PGIDLE_TIMEOUT` |
| `connect_timeout` | `PGCONNECT_TIMEOUT` |

### Prepared statements

Prepared statements will automatically be created for any queries where it can be inferred that the query is static. This can be disabled by using the `prepare: false` option. For instance — this is useful when [using PGBouncer in `transaction mode`](https://github.com/porsager/postgres/issues/93#issuecomment-656290493).

**update**: [since 1.21.0](https://www.pgbouncer.org/2023/10/pgbouncer-1-21-0)
PGBouncer supports protocol-level named prepared statements when [configured
properly](https://www.pgbouncer.org/config.html#max_prepared_statements)

## Custom Types

You can add ergonomic support for custom types, or simply use `sql.typed(value, type)` inline, where type is the PostgreSQL `oid` for the type and the correctly serialized string. _(`oid` values for types can be found in the `pg_catalog.pg_type` table.)_
Expand Down Expand Up @@ -1294,8 +1299,8 @@ This error is thrown if the user has called [`sql.end()`](#teardown--cleanup) an

This error is thrown for any queries that were pending when the timeout to [`sql.end({ timeout: X })`](#teardown--cleanup) was reached.

##### CONNECTION_CONNECT_TIMEOUT
> write CONNECTION_CONNECT_TIMEOUT host:port
##### CONNECT_TIMEOUT
> write CONNECT_TIMEOUT host:port

This error is thrown if the startup phase of the connection (tcp, protocol negotiation, and auth) took more than the default 30 seconds or what was specified using `connect_timeout` or `PGCONNECT_TIMEOUT`.

Expand Down
2 changes: 1 addition & 1 deletion deno/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ function parseOptions(a, b) {
{}
),
connection : {
application_name: 'postgres.js',
application_name: env.PGAPPNAME || 'postgres.js',
...o.connection,
...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {})
},
Expand Down
Loading
Loading