Conversation
✅ Deploy Preview for cedarjs canceled.
|
|
| Command | Status | Duration | Result |
|---|---|---|---|
nx run-many -t build:pack --exclude create-ceda... |
✅ Succeeded | 2s | View ↗ |
nx run-many -t build |
✅ Succeeded | 4s | View ↗ |
nx run-many -t test --minWorkers=1 --maxWorkers=4 |
✅ Succeeded | 1m 34s | View ↗ |
nx run-many -t test:types |
✅ Succeeded | 9s | View ↗ |
☁️ Nx Cloud last updated this comment at 2026-02-20 03:27:38 UTC
Greptile SummaryAdds CLI command Key improvements in templates:
Issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App as GraphQL Server
participant Listener as liveQueriesListener
participant PG as PostgreSQL
participant Cache as liveQueryStore
participant Client as GraphQL Client
App->>Listener: startLiveQueryListener()
Listener->>PG: LISTEN table_change
Note over Client,App: Client makes @live query
Client->>App: query @live { posts }
App->>Cache: Store query result with key
App-->>Client: Return data + invalidation key
Note over PG: User mutates data
App->>PG: INSERT/UPDATE/DELETE on table
PG->>PG: Trigger fires cedar_notify_table_change()
PG->>PG: pg_notify('table_change', json_build_object(...))
PG-->>Listener: NOTIFY with table + recordId
Listener->>Listener: Parse notification payload
Listener->>Listener: Generate invalidation keys
Listener->>Cache: invalidate(['Query.posts', 'Post:123'])
Cache-->>Client: SSE update with new data
Last reviewed commit: 90ca6fb |
| async function connect() { | ||
| try { | ||
| if (client) { | ||
| await client.end() | ||
| } | ||
|
|
||
| client = new Client({ | ||
| connectionString: process.env.DATABASE_URL, | ||
| }) | ||
|
|
||
| client.on('notification', async (msg) => { | ||
| await onNotification(msg.payload) | ||
| }) | ||
|
|
||
| client.on('error', (error) => { | ||
| logger.error( | ||
| { error }, | ||
| 'Postgres live query listener encountered an error' | ||
| ) | ||
| reconnect() | ||
| }) | ||
|
|
||
| client.on('end', () => { | ||
| logger.warn('Postgres live query listener disconnected') | ||
| reconnect() | ||
| }) |
There was a problem hiding this comment.
Reconnect loop from intentional client.end()
When connect() is called while a previous client exists, client.end() on line 82 fires the end event on the old client (line 101-104), which calls reconnect(), scheduling another connect() in 5 seconds. The current connect() continues and successfully establishes a new connection — but after 5 seconds the spurious reconnect fires, tears down the new connection, and the cycle repeats.
To fix this, either remove the event listeners from the old client before ending it, or set a flag to suppress reconnection during an intentional reconnect:
| async function connect() { | |
| try { | |
| if (client) { | |
| await client.end() | |
| } | |
| client = new Client({ | |
| connectionString: process.env.DATABASE_URL, | |
| }) | |
| client.on('notification', async (msg) => { | |
| await onNotification(msg.payload) | |
| }) | |
| client.on('error', (error) => { | |
| logger.error( | |
| { error }, | |
| 'Postgres live query listener encountered an error' | |
| ) | |
| reconnect() | |
| }) | |
| client.on('end', () => { | |
| logger.warn('Postgres live query listener disconnected') | |
| reconnect() | |
| }) | |
| async function connect() { | |
| try { | |
| if (client) { | |
| client.removeAllListeners() | |
| await client.end().catch(() => {}) | |
| } | |
| client = new Client({ | |
| connectionString: process.env.DATABASE_URL, | |
| }) | |
| client.on('notification', async (msg) => { | |
| await onNotification(msg.payload) | |
| }) | |
| client.on('error', (error) => { | |
| logger.error( | |
| { error }, | |
| 'Postgres live query listener encountered an error' | |
| ) | |
| reconnect() | |
| }) | |
| client.on('end', () => { | |
| logger.warn('Postgres live query listener disconnected') | |
| reconnect() | |
| }) |
| PERFORM pg_notify( | ||
| 'table_change', | ||
| json_build_object( | ||
| 'schema', TG_TABLE_SCHEMA, | ||
| 'table', TG_TABLE_NAME, | ||
| 'operation', TG_OP, | ||
| 'record', record_data | ||
| )::text | ||
| ); |
There was a problem hiding this comment.
pg_notify payload can exceed 8000-byte limit
PostgreSQL's NOTIFY payload is limited to ~8000 bytes. row_to_json(NEW) / row_to_json(OLD) serializes the entire row — for tables with large text columns, many columns, or binary data this will exceed the limit. When that happens, pg_notify raises an exception that rolls back the entire transaction, causing the original INSERT/UPDATE/DELETE to fail.
Consider truncating the payload or only sending the fields needed for invalidation (table name, operation, and primary key):
| PERFORM pg_notify( | |
| 'table_change', | |
| json_build_object( | |
| 'schema', TG_TABLE_SCHEMA, | |
| 'table', TG_TABLE_NAME, | |
| 'operation', TG_OP, | |
| 'record', record_data | |
| )::text | |
| ); | |
| PERFORM pg_notify( | |
| 'table_change', | |
| left( | |
| json_build_object( | |
| 'schema', TG_TABLE_SCHEMA, | |
| 'table', TG_TABLE_NAME, | |
| 'operation', TG_OP, | |
| 'record', record_data | |
| )::text, | |
| 7999 | |
| ) | |
| ); |
Alternatively, you could only include the id field (or the primary key) in record_data instead of the full row, which would keep payloads small and avoid the truncation issue entirely.
There was a problem hiding this comment.
@greptileai How can I make it only include the primary key of the record?
There was a problem hiding this comment.
Here's how to send only the primary key instead of the full row:
| PERFORM pg_notify( | |
| 'table_change', | |
| json_build_object( | |
| 'schema', TG_TABLE_SCHEMA, | |
| 'table', TG_TABLE_NAME, | |
| 'operation', TG_OP, | |
| 'record', record_data | |
| )::text | |
| ); | |
| IF (TG_OP = 'DELETE') THEN | |
| record_data = json_build_object('id', OLD.id); | |
| ELSE | |
| record_data = json_build_object('id', NEW.id); | |
| END IF; |
This assumes your primary key column is named id. If your tables use a different primary key column name, replace id with that column name. For composite primary keys, you'd include multiple fields in the json_build_object call (e.g., 'user_id', NEW.user_id, 'order_id', NEW.order_id).
| END; | ||
| $$ LANGUAGE plpgsql; | ||
|
|
||
| SELECT cedar_attach_notify_triggers(); |
There was a problem hiding this comment.
Triggers only attached for tables existing at migration time
cedar_attach_notify_triggers() is invoked once during migration. Any tables created after this migration runs (e.g., from future Prisma migrations) will not have the notify trigger. Users would need to manually re-run SELECT cedar_attach_notify_triggers(); or re-run the migration.
Consider calling out in the docs / final CLI message that the function needs to be re-invoked after schema changes, or adding a note about this limitation.
|
@greptileai please do a full review and update your existing review summary |
| record_data = row_to_json(OLD); | ||
| ELSE | ||
| record_data = row_to_json(NEW); |
There was a problem hiding this comment.
Test project migration uses row_to_json(OLD/NEW) instead of just the id field like the template does. This is inconsistent with the improved template at packages/cli/src/commands/setup/live-queries/templates/migration.sql.template which sends only recordId.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| END; | ||
| $$ LANGUAGE plpgsql; | ||
|
|
||
| SELECT cedar_attach_notify_triggers(); |
There was a problem hiding this comment.
Test project migration is missing the event trigger (cedar_event_on_table_create) that automatically attaches triggers to future tables. The template includes this at line 55-98.
| async function connect() { | ||
| try { | ||
| if (client) { | ||
| await client.end() | ||
| } | ||
|
|
||
| client = new Client({ | ||
| connectionString: process.env.DATABASE_URL, | ||
| }) | ||
|
|
||
| client.on('notification', async (msg) => { | ||
| await onNotification(msg.payload) | ||
| }) | ||
|
|
||
| client.on('error', (error) => { | ||
| logger.error( | ||
| { error }, | ||
| 'Postgres live query listener encountered an error' | ||
| ) | ||
| reconnect() | ||
| }) | ||
|
|
||
| client.on('end', () => { | ||
| logger.warn('Postgres live query listener disconnected') | ||
| reconnect() | ||
| }) | ||
|
|
||
| await client.connect() | ||
| await client.query(`LISTEN ${LIVE_QUERY_CHANNEL}`) | ||
|
|
||
| logger.info('Postgres live query listener connected') | ||
| } catch (error) { | ||
| logger.error({ error }, 'Failed to connect Postgres live query listener') | ||
| reconnect() |
There was a problem hiding this comment.
Test project listener lacks the connectionGeneration pattern that prevents reconnect loops. When client.end() is called on line 82, it triggers the end event handler (line 101-104) which calls reconnect(), creating a loop. The template fixes this with a generation counter (see template lines 75-127).
| interface GetKeysToInvalidateArgs { | ||
| table: string | ||
| record: unknown |
There was a problem hiding this comment.
Interface expects record: unknown but the improved migration template sends recordId: string. This mismatch means the listener won't work correctly with the improved migration template.
| function getKeysToInvalidate({ table, record }: GetKeysToInvalidateArgs) { | ||
| const keys = [`Query.${table}`, `Query.${table.toLocaleLowerCase()}`] | ||
|
|
||
| if (isRecordWithId(record)) { | ||
| keys.push(`${table}:${String(record.id)}`) | ||
| } | ||
|
|
||
| return keys |
There was a problem hiding this comment.
Function extracts id from record object, but the improved migration template sends recordId directly as a string. Should accept recordId parameter instead (see template line 15-27).
|
The changes in this PR are now available in 3.0.0-canary.13430 |
Adds CLI command `yarn cedar setup live-queries` to configure Postgres LISTEN/NOTIFY-based live query invalidation. Creates database triggers that send notifications on table changes, and a Node.js listener that invalidates live query cache keys.

Adds CLI command
yarn cedar setup live-queriesto configure Postgres LISTEN/NOTIFY-based live query invalidation. Creates database triggers that send notifications on table changes, and a Node.js listener that invalidates live query cache keys.