Skip to content

Commit

Permalink
Support replication connections through PgBouncer
Browse files Browse the repository at this point in the history
In session pooling mode PgBouncer is pretty much a transparent proxy,
i.e. the client does normally not even need to know that PgBouncer is in
the middle. This allows things like load balancing and failovers without
the client needing to know about this at all. But as soon as replication
connections are needed, this was not possible anymore, because PgBouncer
would reject those instead of proxying them to the right server.

This PR fixes that by also proxying replication connections. They are
handled pretty differently from normal connections though. A client and
server replication connection will form a strong pair, as soon as one is
closed the other is closed too. So, there's no caching of the server
replication connections, like is done for regular connections. This
seems like a good tradeoff for ease of implementation, because

TODO:
- [ ] Implement hba support for replication database. The current
    version allows PgBouncer allows any user that can authenticate to
    also open a physical replication connection. Normally Postgres
    requires a replication database entry in the hba file for this.
    Obviously this is still checked by postgres, but if a hardcoded user
    is used for the connection this might still be problematic.
- [ ] tests with `pg_recvlogical`, `pg_receivewal` and `pg_basebackup`
- [ ] tests over SSL

Fixes pgbouncer#382
  • Loading branch information
JelteF committed Jun 30, 2023
1 parent 1f70d25 commit 4f2d4dc
Show file tree
Hide file tree
Showing 17 changed files with 447 additions and 51 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pgbouncer_SOURCES = \
src/util.c \
src/varcache.c \
src/common/base64.c \
src/common/bool.c \
src/common/saslprep.c \
src/common/scram-common.c \
src/common/unicode_norm.c \
Expand All @@ -51,6 +52,7 @@ pgbouncer_SOURCES = \
include/util.h \
include/varcache.h \
include/common/base64.h \
include/common/builtins.h \
include/common/pg_wchar.h \
include/common/postgres_compat.h \
include/common/saslprep.h \
Expand Down
24 changes: 24 additions & 0 deletions include/bouncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ enum SocketState {
CL_LOGIN, /* login_client_list */
CL_WAITING, /* pool->waiting_client_list */
CL_WAITING_LOGIN, /* - but return to CL_LOGIN instead of CL_ACTIVE */
CL_WAITING_LOGICAL_REP, /* pool->waiting_logical_replication_list */
CL_WAITING_PHYSICAL_REP, /* pool->waiting_physical_replication_list */
CL_ACTIVE, /* pool->active_client_list */
CL_WAITING_CANCEL, /* pool->waiting_cancel_req_list */
CL_ACTIVE_CANCEL, /* pool->active_cancel_req_list */
Expand Down Expand Up @@ -287,6 +289,20 @@ struct PgPool {
*/
struct StatList waiting_client_list;

/*
* Clients that are waiting for a logical replication connection to be
* available. These requests are waiting for a new server connection to be
* opened, before the request can be forwarded.
*/
struct StatList waiting_logical_replication_list;

/*
* Clients that are waiting for a logical replication connection to be
* available. These requests are waiting for a new server connection to be
* opened, before the request can be forwarded.
*/
struct StatList waiting_physical_replication_list;

/*
* Clients that sent cancel request, to cancel another client its query.
* These requests are waiting for a new server connection to be opened,
Expand Down Expand Up @@ -490,6 +506,12 @@ struct PgDatabase {
struct AATree user_tree; /* users that have been queried on this database */
};

typedef enum ReplicationType {
REPLICATION_NONE,
REPLICATION_LOGICAL,
REPLICATION_PHYSICAL,
} ReplicationType;


/*
* A client or server connection.
Expand All @@ -508,6 +530,8 @@ struct PgSocket {

SocketState state:8; /* this also specifies socket location */

ReplicationType replication; /* If this is a replication connection */

bool ready:1; /* server: accepts new query */
bool idle_tx:1; /* server: idling in tx */
bool close_needed:1; /* server: this socket must be closed ASAP */
Expand Down
17 changes: 17 additions & 0 deletions include/common/builtins.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*-------------------------------------------------------------------------
*
* builtins.h
* Declarations for operations on built-in types.
*
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* include/common/builtins.h
*
*-------------------------------------------------------------------------
*/

/* bool.c */
extern bool parse_bool(const char *value, bool *result);
extern bool parse_bool_with_len(const char *value, size_t len, bool *result);
4 changes: 4 additions & 0 deletions include/common/postgres_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
/* from c.h */

#include <string.h>
#include <usual/ctype.h>

#define int8 int8_t
#define uint8 uint8_t
Expand All @@ -15,6 +16,7 @@

#define lengthof(array) (sizeof (array) / sizeof ((array)[0]))
#define pg_hton32(x) htobe32(x)
#define pg_strncasecmp strncasecmp

#define pg_attribute_noreturn() _NORETURN

Expand All @@ -30,6 +32,8 @@
#define pg_sha256_update(ctx, data, len) sha256_update(ctx, data, len)
#define pg_sha256_final(ctx, dst) sha256_final(ctx, dst)

extern bool parse_bool(const char *value, bool *result);
extern bool parse_bool_with_len(const char *value, size_t len, bool *result);

/* define this to use non-server code paths */
#define FRONTEND
3 changes: 0 additions & 3 deletions include/pktbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ void pktbuf_write_ExtQuery(PktBuf *buf, const char *query, int nargs, ...);
#define pktbuf_write_CancelRequest(buf, key) \
pktbuf_write_generic(buf, PKT_CANCEL, "b", key, 8)

#define pktbuf_write_StartupMessage(buf, user, parms, parms_len) \
pktbuf_write_generic(buf, PKT_STARTUP, "bsss", parms, parms_len, "user", user, "")

#define pktbuf_write_PasswordMessage(buf, psw) \
pktbuf_write_generic(buf, 'p', "s", psw)

Expand Down
1 change: 1 addition & 0 deletions include/varcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct VarCache {

bool varcache_set(VarCache *cache, const char *key, const char *value) /* _MUSTCHECK */;
bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p) _MUSTCHECK;
void varcache_apply_startup(PktBuf *pkt, PgSocket *client);
void varcache_fill_unset(VarCache *src, PgSocket *dst);
void varcache_clean(VarCache *cache);
void varcache_add_params(PktBuf *pkt, VarCache *vars);
Expand Down
11 changes: 11 additions & 0 deletions src/admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,8 @@ static bool admin_show_clients(PgSocket *admin, const char *arg)
show_socket_list(buf, &pool->waiting_client_list, "waiting", false);
show_socket_list(buf, &pool->active_cancel_req_list, "active_cancel_req", false);
show_socket_list(buf, &pool->waiting_cancel_req_list, "waiting_cancel_req", false);
show_socket_list(buf, &pool->waiting_logical_replication_list, "waiting_logical_replication", false);
show_socket_list(buf, &pool->waiting_physical_replication_list, "waiting_physical_replication", false);
}

statlist_for_each(item, &peer_pool_list) {
Expand Down Expand Up @@ -796,6 +798,8 @@ static bool admin_show_sockets(PgSocket *admin, const char *arg)
pool = container_of(item, PgPool, head);
show_socket_list(buf, &pool->active_client_list, "cl_active", true);
show_socket_list(buf, &pool->waiting_client_list, "cl_waiting", true);
show_socket_list(buf, &pool->waiting_logical_replication_list, "cl_waiting_logical_rep", true);
show_socket_list(buf, &pool->waiting_physical_replication_list, "cl_waiting_physical_rep", true);

show_socket_list(buf, &pool->active_server_list, "sv_active", true);
show_socket_list(buf, &pool->idle_server_list, "sv_idle", true);
Expand Down Expand Up @@ -836,6 +840,8 @@ static bool admin_show_active_sockets(PgSocket *admin, const char *arg)
pool = container_of(item, PgPool, head);
show_active_socket_list(buf, &pool->active_client_list, "cl_active");
show_active_socket_list(buf, &pool->waiting_client_list, "cl_waiting");
show_active_socket_list(buf, &pool->waiting_logical_replication_list, "cl_waiting_logical_rep");
show_active_socket_list(buf, &pool->waiting_physical_replication_list, "cl_waiting_physical_rep");

show_active_socket_list(buf, &pool->active_server_list, "sv_active");
show_active_socket_list(buf, &pool->idle_server_list, "sv_idle");
Expand Down Expand Up @@ -870,6 +876,8 @@ static bool admin_show_pools(PgSocket *admin, const char *arg)
pktbuf_write_RowDescription(buf, "ssiiiiiiiiiiiiis",
"database", "user",
"cl_active", "cl_waiting",
"cl_waiting_logical_rep",
"cl_waiting_physical_rep",
"cl_active_cancel_req",
"cl_waiting_cancel_req",
"sv_active",
Expand All @@ -882,12 +890,15 @@ static bool admin_show_pools(PgSocket *admin, const char *arg)
statlist_for_each(item, &pool_list) {
pool = container_of(item, PgPool, head);
waiter = first_socket(&pool->waiting_client_list);
/* XXX: should maxwait include replication connections? */
max_wait = (waiter && waiter->query_start) ? now - waiter->query_start : 0;
pool_mode = pool_pool_mode(pool);
pktbuf_write_DataRow(buf, "ssiiiiiiiiiiiiis",
pool->db->name, pool->user->name,
statlist_count(&pool->active_client_list),
statlist_count(&pool->waiting_client_list),
statlist_count(&pool->waiting_logical_replication_list),
statlist_count(&pool->waiting_physical_replication_list),
statlist_count(&pool->active_cancel_req_list),
statlist_count(&pool->waiting_cancel_req_list),
statlist_count(&pool->active_server_list),
Expand Down
22 changes: 22 additions & 0 deletions src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "bouncer.h"
#include "pam.h"
#include "scram.h"
#include "common/builtins.h"

#include <usual/pgutil.h>

Expand Down Expand Up @@ -569,6 +570,24 @@ static void set_appname(PgSocket *client, const char *app_name)
}
}

/*
* set_replication sets the replication field on the client according the given
* replicationString.
*/
static bool set_replication(PgSocket *client, const char *replicationString)
{
bool replicationBool = false;
if (strcmp(replicationString, "database") == 0) {
client->replication = REPLICATION_LOGICAL;
return true;
}
if (!parse_bool(replicationString, &replicationBool)) {
return false;
}
client->replication = replicationBool ? REPLICATION_PHYSICAL : REPLICATION_NONE;
return true;
}

static bool decide_startup_pool(PgSocket *client, PktHdr *pkt)
{
const char *username = NULL, *dbname = NULL;
Expand All @@ -593,6 +612,9 @@ static bool decide_startup_pool(PgSocket *client, PktHdr *pkt)
} else if (strcmp(key, "application_name") == 0) {
set_appname(client, val);
appname_found = true;
} else if (strcmp(key, "replication") == 0) {
slog_debug(client, "got var: %s=%s", key, val);
set_replication(client, val);
} else if (varcache_set(&client->vars, key, val)) {
slog_debug(client, "got var: %s=%s", key, val);
} else if (strlist_contains(cf_ignore_startup_params, key)) {
Expand Down
110 changes: 110 additions & 0 deletions src/common/bool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*-------------------------------------------------------------------------
*
* bool.c
* Functions for the built-in type "bool".
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/utils/adt/bool.c
*
*-------------------------------------------------------------------------
*/

#include "common/postgres_compat.h"

/*
* Try to interpret value as boolean value. Valid values are: true,
* false, yes, no, on, off, 1, 0; as well as unique prefixes thereof.
* If the string parses okay, return true, else false.
* If okay and result is not NULL, return the value in *result.
*/
bool
parse_bool(const char *value, bool *result)
{
return parse_bool_with_len(value, strlen(value), result);
}

bool
parse_bool_with_len(const char *value, size_t len, bool *result)
{
switch (*value)
{
case 't':
case 'T':
if (pg_strncasecmp(value, "true", len) == 0)
{
if (result)
*result = true;
return true;
}
break;
case 'f':
case 'F':
if (pg_strncasecmp(value, "false", len) == 0)
{
if (result)
*result = false;
return true;
}
break;
case 'y':
case 'Y':
if (pg_strncasecmp(value, "yes", len) == 0)
{
if (result)
*result = true;
return true;
}
break;
case 'n':
case 'N':
if (pg_strncasecmp(value, "no", len) == 0)
{
if (result)
*result = false;
return true;
}
break;
case 'o':
case 'O':
/* 'o' is not unique enough */
if (pg_strncasecmp(value, "on", (len > 2 ? len : 2)) == 0)
{
if (result)
*result = true;
return true;
}
else if (pg_strncasecmp(value, "off", (len > 2 ? len : 2)) == 0)
{
if (result)
*result = false;
return true;
}
break;
case '1':
if (len == 1)
{
if (result)
*result = true;
return true;
}
break;
case '0':
if (len == 1)
{
if (result)
*result = false;
return true;
}
break;
default:
break;
}

if (result)
*result = false; /* suppress compiler warning */
return false;
}
14 changes: 7 additions & 7 deletions src/janitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -536,13 +536,13 @@ static void pool_server_maint(PgPool *pool)
check_unused_servers(pool, &pool->idle_server_list, 1);

/* disconnect close_needed active servers if server_fast_close is set */
if (cf_server_fast_close) {
statlist_for_each_safe(item, &pool->active_server_list, tmp) {
server = container_of(item, PgSocket, head);
Assert(server->state == SV_ACTIVE);
if (server->ready && server->close_needed)
disconnect_server(server, true, "database configuration changed");
}
statlist_for_each_safe(item, &pool->active_server_list, tmp) {
server = container_of(item, PgSocket, head);
Assert(server->state == SV_ACTIVE);
if (cf_server_fast_close && server->ready && server->close_needed)
disconnect_server(server, true, "database configuration changed");
if (server->replication != REPLICATION_NONE && server->close_needed)
disconnect_server(server, true, "database configuration changed");
}

/* handle query_timeout and idle_transaction_timeout */
Expand Down

0 comments on commit 4f2d4dc

Please sign in to comment.