Skip to content

Using sqlgen with LISTEN/NOTIFY #104

@mcraveiro

Description

@mcraveiro

Hi sqlgen developers,

Me again, I'm afraid :-) OK so my next woe is, I want to be able to plug in the traditional Postgres mechanism for handling notifications [1]. I am trying to figure out the best way to integrate this with sqlgen. I can try to submit a PR, but I am not entirely sure on what the "correct" approach should be. First let's look at a "raw" libpq example of how to do this (courtesy of Grok):

// listen_raw_libpq.cpp
// Compile: g++ -std=c++23 -Wall -Wextra -lpq listen_raw_libpq.cpp -o listen_raw

#include <postgresql/libpq-fe.h>
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <csignal>
#include <cstring>

using namespace std::literals;

std::atomic<bool> g_running{true};

void signal_handler(int) {
    g_running = false;
}

int main() {
    // 1. Connection string (same format as psql)
    const char* conninfo =
        "host=localhost "
        "port=5432 "
        "dbname=your_database "
        "user=your_username "
        "password=your_password "
        "connect_timeout=10";

    // 2. Connect (non-blocking mode is optional but recommended)
    PGconn* conn = PQconnectdb(conninfo);

    if (PQstatus(conn) != CONNECTION_OK) {
        std::cerr << "Connection failed: " << PQerrorMessage(conn) << std::endl;
        PQfinish(conn);
        return 1;
    }

    // Make the connection non-blocking (important for clean polling)
    if (PQsetnonblocking(conn, 1) == -1) {
        std::cerr << "Failed to set non-blocking mode\n";
        PQfinish(conn);
        return 1;
    }

    std::cout << "Connected and listening on channel 'my_channel'\n";

    // 3. Execute LISTEN
    const char* listen_cmd = "LISTEN my_channel";
    if (!PQsendQuery(conn, listen_cmd)) {
        std::cerr << "PQsendQuery failed: " << PQerrorMessage(conn) << std::endl;
        PQfinish(conn);
        return 1;
    }

    // Flush the LISTEN command
    PQflush(conn);

    // 4. Set up graceful shutdown (Ctrl+C)
    std::signal(SIGINT, signal_handler);
    std::signal(SIGTERM, signal_handler);

    // 5. Main polling loop
    while (g_running) {
        // Wait up to 500ms for socket activity
        int socket = PQsocket(conn);
        if (socket < 0) break;

        fd_set input_mask;
        FD_ZERO(&input_mask);
        FD_SET(socket, &input_mask);

        timeval timeout{0, 500'000}; // 500ms

        if (select(socket + 1, &input_mask, nullptr, nullptr, &timeout) < 0) {
            if (errno == EINTR) continue;  // interrupted by signal
            break;
        }

        // Consume incoming data
        if (!PQconsumeInput(conn)) {
            std::cerr << "consumeInput failed: " << PQerrorMessage(conn) << std::endl;
            break;
        }

        // Process all available notifications
        PGnotify* notify;
        while ((notify = PQnotifies(conn)) != nullptr) {
            std::cout << "\nNOTIFY received!\n"
                      << "  Channel : " << notify->relname << "\n"
                      << "  PID     : " << notify->be_pid << "\n"
                      << "  Payload : "
                      << (notify->extra[0] ? notify->extra : "(none)") << "\n\n";

            PQfreemem(notify);
        }

        // Optional: check connection health
        if (PQstatus(conn) != CONNECTION_OK) {
            std::cerr << "Connection lost: " << PQerrorMessage(conn) << std::endl;
            break;
        }
    }

    // 6. Cleanup
    std::cout << "Shutting down...\n";
    PQsendQuery(conn, "UNLISTEN my_channel");
    PQflush(conn);
    PQfinish(conn);

    std::cout << "Done.\n";
    return 0;
}

I am thinking, we could obtain a session as we do normally and via a and_then subscribe to a topic and supply a lambda for the listen - which we would plumb in via code similar to the above. However, the downside of this approach is that we'd be "locked-in" to that particular subscription. This does not seem entirely sensible. The real use case for this - at least in my code - is to dynamically add and remove topics to listen on rather than have it fixed.

Perhaps a better take is to plumb notifications into a connection. Interestingly enough, Qwen hallucinated one such approach:

#include <sqlgen/sqlgen.hpp>
#include <iostream>
#include <thread>
#include <chrono>
#include <csignal>

using namespace sqlgen;

std::atomic<bool> keep_running = true;

void signal_handler(int) {
    keep_running.store(false);
}

int main() {
    // Set up signal handling for graceful shutdown
    std::signal(SIGINT, signal_handler);
    std::signal(SIGTERM, signal_handler);

    try {
        // 1. Create a connection pool
        ConnectionPoolConfig pool_config;
        pool_config.host = "localhost";
        pool_config.port = 5432;
        pool_config.database = "your_database";
        pool_config.username = "your_username";
        pool_config.password = "your_password";
        pool_config.min_connections = 1;
        pool_config.max_connections = 3;

        auto pool = ConnectionPool::create(pool_config);

        // 2. Get a connection for listening
        auto listen_conn = pool->get_connection();
        
        // 3. Set up notification handler (callback)
        listen_conn->set_notification_handler([](const Notification& notification) {
            std::cout << "NOTIFY received:\n"
                      << "  Channel: " << notification.channel << "\n"
                      << "  Payload: " << notification.payload << "\n"
                      << "  PID: " << notification.process_id << std::endl;
            
            // Process the notification based on payload
            if (notification.payload == "data_updated") {
                std::cout << "Data was updated, refreshing cache..." << std::endl;
            } else if (notification.payload == "user_logged_in") {
                std::cout << "User activity detected..." << std::endl;
            }
        });

        // 4. Start listening to channels
        std::cout << "Starting to listen on channels..." << std::endl;
        
        // Listen to multiple channels
        listen_conn->execute("LISTEN data_changes");
        listen_conn->execute("LISTEN user_activity");
        listen_conn->execute("LISTEN system_alerts");
        
        // 5. Check what we're listening to
        auto result = listen_conn->query("SELECT pg_listening_channels()");
        std::cout << "Currently listening on:" << std::endl;
        for (const auto& row : result) {
            std::cout << "  - " << row[0].as<std::string>() << std::endl;
        }

        // 6. In a separate thread/process, create a connection for sending notifications
        std::thread notifier_thread([&pool]() {
            auto notify_conn = pool->get_connection();
            
            for (int i = 0; i < 5 && keep_running; ++i) {
                std::this_thread::sleep_for(std::chrono::seconds(2));
                
                // Send different types of notifications
                if (i % 2 == 0) {
                    notify_conn->execute("NOTIFY data_changes, 'data_updated'");
                    std::cout << "Sent: data_updated" << std::endl;
                } else {
                    notify_conn->execute("NOTIFY user_activity, 'user_logged_in'");
                    std::cout << "Sent: user_logged_in" << std::endl;
                }
            }
        });

        // 7. Main loop to wait for notifications
        std::cout << "Waiting for notifications (Ctrl+C to exit)..." << std::endl;
        
        while (keep_running) {
            // This will block and wait for notifications
            // The notification handler callback will be called when notifications arrive
            listen_conn->wait_for_notification(std::chrono::seconds(1));
        }

        // 8. Clean up
        notifier_thread.join();
        
        // Stop listening
        listen_conn->execute("UNLISTEN *");
        std::cout << "Stopped listening on all channels" << std::endl;

    } catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return 1;
    }

    return 0;
}

Do you have any thoughts on this? Also, I guess this would be postgres specific - is that a problem. I will keep experimenting but meanwhile any pointers would be very helpful.

Cheers

Marco

[1] https://www.postgresql.org/docs/current/sql-notify.html

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions