Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection examples -- NodeJS subscribes #91

Open
chuck-alt-delete opened this issue Oct 12, 2023 · 0 comments
Open

Connection examples -- NodeJS subscribes #91

chuck-alt-delete opened this issue Oct 12, 2023 · 0 comments

Comments

@chuck-alt-delete
Copy link
Contributor

Hey team,

I have two suggestions for the NodeJS subscribe example:

  1. Our current example could be made much simpler with the use of the new SUBSCRIBE ... ENVELOPE UPSERT ...
  2. Our current example doesn't show how to gracefully shut down subscribes

I included a minimal express js app where multiple users can subscribe to data by hitting http://localhost:3000/data. The subscribes are closed when the client closes, and the whole thing shuts down gracefully when the server is closed.

Some ideas for further refinement of this express app:

  • add a parameter to the endpoint so the user can choose which view to subscribe to
  • add a Redis cache so each view is subscribed to only once, and expose the server sent events over Redis pubsub channels
require('dotenv').config();
const express = require('express');
const app = express();
const { Pool } = require('pg');

// Configure the Postgres client
const pool = new Pool({
  user: process.env.MZ_USER,
  host: process.env.MZ_HOST,
  database: process.env.MZ_DB,
  password: process.env.MZ_PASSWORD,
  port: process.env.MZ_PORT,
  ssl: true
});

// global set of active subscription loops
let activeLoops = new Set();

app.get('/data', (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    let client;

    // on request, activate a subscription loop and add it to the set of active loops
    let loopControl = { active: true };
    activeLoops.add(loopControl);

    (async () => {
        try {
            client = await pool.connect();
            await client.query('BEGIN');
            await client.query('DECLARE c CURSOR FOR SUBSCRIBE t WITH (PROGRESS) ENVELOPE UPSERT (KEY(id))');

            // Fetch rows while the loop is switched on.
            // We need FETCH ALL for minimal latency, but that means this loop will not exit until the next datum comes through.
            // Hence why we needed WITH (PROGRESS). This ensures data will be received and the loop with be able to exit.
            while (loopControl.active) {
                const data = await client.query("FETCH ALL c");
                data.rows.forEach(function(row) {
                    // filter out progress messages
                    if (!row.mz_progressed) {
                        // map row fields
                        row = {
                            mz_progressed: row.mz_progressed,
                            mz_timestamp: Number(row.mz_timestamp),
                            mz_state: row.mz_state,
                            id: row.id,
                            content: row.content
                        }
                        // publish server-sent events
                        res.write(`data: ${JSON.stringify(row)}\n\n`);
                    }

                });
            }
        } catch (err) {
            handleError(err);
        } finally {
            if (client) {
                res.end()
                console.log('closing pg client');
                await client.query('COMMIT')
                console.log('committed transaction');
                await client.release();
            }
        }
    })();

    const handleError = (err) => {
        console.error(err);
        res.end();
        loopControl.active = false;
        activeLoops.delete(loopControl);
    };

    req.on('close', () => {
        res.end();
        loopControl.active = false;
        activeLoops.delete(loopControl);
        console.log('client closed');
    });
});


server = app.listen(3000, function () {
  console.log('Example app listening on port 3000!');
});

async function gracefulShutdown() {
    console.log('Initiating graceful shutdown');

    // Stop all active subscriptions
    activeLoops.forEach(loop => {
        loop.active = false;
    });

    try {
        // Using a promise to handle server.close since it doesn't natively return one
        await new Promise((resolve) => {
            server.close(resolve);
        });
        console.log('Express server closed.');

        // End the database pool
        await pool.end();
        console.log('Database pool closed.');
        
        process.exit(0);
    } catch (err) {
        console.error('Error during graceful shutdown:', err);
        process.exit(1);
    }
}

// Listen for specific signals to initiate graceful shutdown
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant