Skip to content

Commit

Permalink
Support replication connections through PgBouncer
Browse files Browse the repository at this point in the history
In session pooling mode PgBouncer is pretty much a transparent proxy,
i.e. the client does normally not even need to know that PgBouncer is in
the middle. This allows things like load balancing and failovers without
the client needing to know about this at all. But as soon as replication
connections are needed, this was not possible anymore, because PgBouncer
would reject those instead of proxying them to the right server.

This PR fixes that by also proxying replication connections. They are
handled pretty differently from normal connections though. A client and
server replication connection will form a strong pair, as soon as one is
closed the other is closed too. So, there's no caching of the server
replication connections, like is done for regular connections. This
seems like a good tradeoff for ease of implementation, because

TODO:
- [ ] Implement hba support for replication database. The current
    version allows PgBouncer allows any user that can authenticate to
    also open a physical replication connection. Normally Postgres
    requires a replication database entry in the hba file for this.
    Obviously this is still checked by postgres, but if a hardcoded user
    is used for the connection this might still be problematic.
- [ ] tests with `pg_recvlogical`, `pg_receivewal` and `pg_basebackup`
- [ ] basic tests over SSL

Fixes pgbouncer#382
  • Loading branch information
JelteF committed Jul 19, 2023
1 parent 8beebc8 commit 62ab744
Show file tree
Hide file tree
Showing 32 changed files with 1,015 additions and 102 deletions.
6 changes: 3 additions & 3 deletions .cirrus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ task:
- image: rockylinux:8
- image: centos:centos7
setup_script:
- yum -y install autoconf automake diffutils file libevent-devel libtool make openssl-devel pkg-config postgresql-server systemd-devel wget
- yum -y install autoconf automake diffutils file libevent-devel libtool make openssl-devel pkg-config postgresql-server postgresql-contrib systemd-devel wget
- if cat /etc/centos-release | grep -q ' 7'; then yum -y install python python-pip; else yum -y install python3 python3-pip sudo iptables; fi
- wget -O /tmp/pandoc.tar.gz https://github.com/jgm/pandoc/releases/download/2.10.1/pandoc-2.10.1-linux-amd64.tar.gz
- tar xvzf /tmp/pandoc.tar.gz --strip-components 1 -C /usr/local/
Expand Down Expand Up @@ -139,7 +139,7 @@ task:
- image: alpine:latest
setup_script:
- apk update
- apk add autoconf automake bash build-base libevent-dev libtool openssl openssl-dev pkgconf postgresql python3 py3-pip wget sudo iptables
- apk add autoconf automake bash build-base libevent-dev libtool openssl openssl-dev pkgconf postgresql postgresql-contrib python3 py3-pip wget sudo iptables
- wget -O /tmp/pandoc.tar.gz https://github.com/jgm/pandoc/releases/download/2.10.1/pandoc-2.10.1-linux-amd64.tar.gz
- tar xvzf /tmp/pandoc.tar.gz --strip-components 1 -C /usr/local/
- python3 -m pip install -r requirements.txt
Expand Down Expand Up @@ -167,7 +167,7 @@ task:
HAVE_IPV6_LOCALHOST: yes
USE_SUDO: true
setup_script:
- pkg install -y autoconf automake bash gmake hs-pandoc libevent libtool pkgconf postgresql12-server python devel/py-pip sudo
- pkg install -y autoconf automake bash gmake hs-pandoc libevent libtool pkgconf postgresql12-server postgresql12-contrib python devel/py-pip sudo
- pip install -r requirements.txt
- kldload pf
- echo 'anchor "pgbouncer_test/*"' >> /etc/pf.conf
Expand Down
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ insert_final_newline = true
charset = utf-8
trim_trailing_whitespace = true

[*.{c,h}]
[*.{c,h,eval,rules}]
indent_style = tab
indent_size = 8

Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pgbouncer_SOURCES = \
src/util.c \
src/varcache.c \
src/common/base64.c \
src/common/bool.c \
src/common/saslprep.c \
src/common/scram-common.c \
src/common/unicode_norm.c \
Expand All @@ -51,6 +52,7 @@ pgbouncer_SOURCES = \
include/util.h \
include/varcache.h \
include/common/base64.h \
include/common/builtins.h \
include/common/pg_wchar.h \
include/common/postgres_compat.h \
include/common/saslprep.h \
Expand Down
2 changes: 1 addition & 1 deletion doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ The file follows the format of the PostgreSQL `pg_hba.conf` file
(see <https://www.postgresql.org/docs/current/auth-pg-hba-conf.html>).

* Supported record types: `local`, `host`, `hostssl`, `hostnossl`.
* Database field: Supports `all`, `sameuser`, `@file`, multiple names. Not supported: `replication`, `samerole`, `samegroup`.
* Database field: Supports `all`, `replication`, `sameuser`, `@file`, multiple names. Not supported: `samerole`, `samegroup`.
* User name field: Supports `all`, `@file`, multiple names. Not supported: `+groupname`.
* Address field: Supports IPv4, IPv6. Not supported: DNS names, domain prefixes.
* Auth-method field: Only methods supported by PgBouncer's `auth_type`
Expand Down
21 changes: 20 additions & 1 deletion include/bouncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum SocketState {
CL_LOGIN, /* login_client_list */
CL_WAITING, /* pool->waiting_client_list */
CL_WAITING_LOGIN, /* - but return to CL_LOGIN instead of CL_ACTIVE */
CL_WAITING_REPLICATION, /* pool->waiting_replication_client_list */
CL_ACTIVE, /* pool->active_client_list */
CL_WAITING_CANCEL, /* pool->waiting_cancel_req_list */
CL_ACTIVE_CANCEL, /* pool->active_cancel_req_list */
Expand Down Expand Up @@ -96,6 +97,7 @@ typedef union PgAddr PgAddr;
typedef enum SocketState SocketState;
typedef struct PktHdr PktHdr;
typedef struct ScramState ScramState;
typedef enum ReplicationType ReplicationType;

extern int cf_sbuf_len;

Expand Down Expand Up @@ -287,6 +289,13 @@ struct PgPool {
*/
struct StatList waiting_client_list;

/*
* Clients that are waiting for a physical/logical replication connection to be
* available. These requests are waiting for a new server connection to be
* opened, before the request can be forwarded.
*/
struct StatList waiting_replication_client_list;

/*
* Clients that sent cancel request, to cancel another client its query.
* These requests are waiting for a new server connection to be opened,
Expand Down Expand Up @@ -414,7 +423,8 @@ struct PgPool {
*/
#define pool_client_count(pool) ( \
statlist_count(&(pool)->active_client_list) + \
statlist_count(&(pool)->waiting_client_list))
statlist_count(&(pool)->waiting_client_list) + \
statlist_count(&(pool)->waiting_replication_client_list))

/*
* A user in login db.
Expand Down Expand Up @@ -490,6 +500,12 @@ struct PgDatabase {
struct AATree user_tree; /* users that have been queried on this database */
};

enum ReplicationType {
REPLICATION_NONE = 0,
REPLICATION_LOGICAL,
REPLICATION_PHYSICAL,
};


/*
* A client or server connection.
Expand Down Expand Up @@ -529,6 +545,9 @@ struct PgSocket {

bool wait_sslchar:1; /* server: waiting for ssl response: S/N */

ReplicationType replication; /* If this is a replication connection */
char *startup_options; /* only tracked for replication connections */

int expect_rfq_count; /* client: count of ReadyForQuery packets client should see */

usec_t connect_time; /* when connection was made */
Expand Down
17 changes: 17 additions & 0 deletions include/common/builtins.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*-------------------------------------------------------------------------
*
* builtins.h
* Declarations for operations on built-in types.
*
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* include/common/builtins.h
*
*-------------------------------------------------------------------------
*/

/* bool.c */
extern bool parse_bool(const char *value, bool *result);
extern bool parse_bool_with_len(const char *value, size_t len, bool *result);
4 changes: 4 additions & 0 deletions include/common/postgres_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
/* from c.h */

#include <string.h>
#include <usual/ctype.h>

#define int8 int8_t
#define uint8 uint8_t
Expand All @@ -15,6 +16,7 @@

#define lengthof(array) (sizeof (array) / sizeof ((array)[0]))
#define pg_hton32(x) htobe32(x)
#define pg_strncasecmp strncasecmp

#define pg_attribute_noreturn() _NORETURN

Expand All @@ -30,6 +32,8 @@
#define pg_sha256_update(ctx, data, len) sha256_update(ctx, data, len)
#define pg_sha256_final(ctx, dst) sha256_final(ctx, dst)

extern bool parse_bool(const char *value, bool *result);
extern bool parse_bool_with_len(const char *value, size_t len, bool *result);

/* define this to use non-server code paths */
#define FRONTEND
2 changes: 1 addition & 1 deletion include/hba.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ struct HBA;

struct HBA *hba_load_rules(const char *fn);
void hba_free(struct HBA *hba);
int hba_eval(struct HBA *hba, PgAddr *addr, bool is_tls, const char *dbname, const char *username);
int hba_eval(struct HBA *hba, PgAddr *addr, bool is_tls, ReplicationType replication, const char *dbname, const char *username);
3 changes: 0 additions & 3 deletions include/pktbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ void pktbuf_write_ExtQuery(PktBuf *buf, const char *query, int nargs, ...);
#define pktbuf_write_CancelRequest(buf, key) \
pktbuf_write_generic(buf, PKT_CANCEL, "b", key, 8)

#define pktbuf_write_StartupMessage(buf, user, parms, parms_len) \
pktbuf_write_generic(buf, PKT_STARTUP, "bsss", parms, parms_len, "user", user, "")

#define pktbuf_write_PasswordMessage(buf, psw) \
pktbuf_write_generic(buf, 'p', "s", psw)

Expand Down
2 changes: 2 additions & 0 deletions include/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ bool cf_set_authdb(struct CfValue *cv, const char *value);

/* reserved database name checking */
bool check_reserved_database(const char *value);

bool strings_equal(const char *str_left, const char *str_right) _MUSTCHECK;
1 change: 1 addition & 0 deletions include/varcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ void init_var_lookup(const char *cf_track_extra_parameters);
int get_num_var_cached(void);
bool varcache_set(VarCache *cache, const char *key, const char *value) /* _MUSTCHECK */;
bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p) _MUSTCHECK;
void varcache_apply_startup(PktBuf *pkt, PgSocket *client);
void varcache_fill_unset(VarCache *src, PgSocket *dst);
void varcache_clean(VarCache *cache);
void varcache_add_params(PktBuf *pkt, VarCache *vars);
Expand Down
17 changes: 14 additions & 3 deletions src/admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ static bool admin_show_fds(PgSocket *admin, const char *arg)
continue;
res = res && show_fds_from_list(admin, &pool->active_client_list);
res = res && show_fds_from_list(admin, &pool->waiting_client_list);
res = res && show_fds_from_list(admin, &pool->waiting_replication_client_list);
res = res && show_fds_from_list(admin, &pool->active_server_list);
res = res && show_fds_from_list(admin, &pool->idle_server_list);
res = res && show_fds_from_list(admin, &pool->used_server_list);
Expand Down Expand Up @@ -730,6 +731,7 @@ static bool admin_show_clients(PgSocket *admin, const char *arg)

show_socket_list(buf, &pool->active_client_list, "active", false);
show_socket_list(buf, &pool->waiting_client_list, "waiting", false);
show_socket_list(buf, &pool->waiting_replication_client_list, "waiting_replication", false);
show_socket_list(buf, &pool->active_cancel_req_list, "active_cancel_req", false);
show_socket_list(buf, &pool->waiting_cancel_req_list, "waiting_cancel_req", false);
}
Expand Down Expand Up @@ -796,6 +798,7 @@ static bool admin_show_sockets(PgSocket *admin, const char *arg)
pool = container_of(item, PgPool, head);
show_socket_list(buf, &pool->active_client_list, "cl_active", true);
show_socket_list(buf, &pool->waiting_client_list, "cl_waiting", true);
show_socket_list(buf, &pool->waiting_replication_client_list, "cl_waiting_replication", true);

show_socket_list(buf, &pool->active_server_list, "sv_active", true);
show_socket_list(buf, &pool->idle_server_list, "sv_idle", true);
Expand Down Expand Up @@ -836,6 +839,7 @@ static bool admin_show_active_sockets(PgSocket *admin, const char *arg)
pool = container_of(item, PgPool, head);
show_active_socket_list(buf, &pool->active_client_list, "cl_active");
show_active_socket_list(buf, &pool->waiting_client_list, "cl_waiting");
show_active_socket_list(buf, &pool->waiting_replication_client_list, "cl_waiting_replication");

show_active_socket_list(buf, &pool->active_server_list, "sv_active");
show_active_socket_list(buf, &pool->idle_server_list, "sv_idle");
Expand All @@ -854,9 +858,10 @@ static bool admin_show_pools(PgSocket *admin, const char *arg)
struct List *item;
PgPool *pool;
PktBuf *buf;
PgSocket *waiter;
PgSocket *waiter, *replication_waiter;
usec_t now = get_cached_time();
usec_t max_wait;
usec_t max_replication_wait;
struct CfValue cv;
int pool_mode;

Expand All @@ -867,9 +872,10 @@ static bool admin_show_pools(PgSocket *admin, const char *arg)
admin_error(admin, "no mem");
return true;
}
pktbuf_write_RowDescription(buf, "ssiiiiiiiiiiiiis",
pktbuf_write_RowDescription(buf, "ssiiiiiiiiiiiiiis",
"database", "user",
"cl_active", "cl_waiting",
"cl_waiting_replication",
"cl_active_cancel_req",
"cl_waiting_cancel_req",
"sv_active",
Expand All @@ -882,12 +888,17 @@ static bool admin_show_pools(PgSocket *admin, const char *arg)
statlist_for_each(item, &pool_list) {
pool = container_of(item, PgPool, head);
waiter = first_socket(&pool->waiting_client_list);
replication_waiter = first_socket(&pool->waiting_replication_client_list);
max_wait = (waiter && waiter->query_start) ? now - waiter->query_start : 0;
max_replication_wait = (replication_waiter && replication_waiter->query_start) ? now - replication_waiter->query_start : 0;
if (max_replication_wait > max_wait)
max_wait = max_replication_wait;
pool_mode = pool_pool_mode(pool);
pktbuf_write_DataRow(buf, "ssiiiiiiiiiiiiis",
pktbuf_write_DataRow(buf, "ssiiiiiiiiiiiiiis",
pool->db->name, pool->user->name,
statlist_count(&pool->active_client_list),
statlist_count(&pool->waiting_client_list),
statlist_count(&pool->waiting_replication_client_list),
statlist_count(&pool->active_cancel_req_list),
statlist_count(&pool->waiting_cancel_req_list),
statlist_count(&pool->active_server_list),
Expand Down
72 changes: 70 additions & 2 deletions src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "bouncer.h"
#include "pam.h"
#include "scram.h"
#include "common/builtins.h"

#include <usual/pgutil.h>

Expand Down Expand Up @@ -288,8 +289,13 @@ static bool finish_set_pool(PgSocket *client, bool takeover)

auth = cf_auth_type;
if (auth == AUTH_HBA) {
auth = hba_eval(parsed_hba, &client->remote_addr, !!client->sbuf.tls,
client->db->name, client->login_user->name);
auth = hba_eval(
parsed_hba,
&client->remote_addr,
!!client->sbuf.tls,
client->replication,
client->db->dbname,
client->login_user->name);
}

if (auth == AUTH_MD5)
Expand Down Expand Up @@ -596,6 +602,25 @@ static bool set_startup_options(PgSocket *client, const char *options)
char arg_buf[400];
struct MBuf arg;
const char *position = options;

if (client->replication) {
/*
* Since replication clients will be bound 1-to-1 to a server
* connection, we can support any configuration flags and
* fields in the options startup parameter. Because we can
* simply send the exact same value for the options parameter
* when opening the replication connection to the server. This
* allows us to also support GUCs that don't have the
* GUC_REPORT flag, specifically extra_float_digits which is a
* configuration that is set by CREATE SUBSCRIPTION in the
* options parameter.
*/
client->startup_options = strdup(options);
if (!client->startup_options)
disconnect_client(client, true, "out of memory");
return true;
}

mbuf_init_fixed_writer(&arg, arg_buf, sizeof(arg_buf));
slog_debug(client, "received options: %s", options);

Expand Down Expand Up @@ -659,12 +684,52 @@ static void set_appname(PgSocket *client, const char *app_name)
}
}

/*
* set_replication sets the replication field on the client according the given
* replicationString.
*/
static bool set_replication(PgSocket *client, const char *replicationString)
{
bool replicationBool = false;
if (strcmp(replicationString, "database") == 0) {
client->replication = REPLICATION_LOGICAL;
return true;
}
if (!parse_bool(replicationString, &replicationBool)) {
return false;
}
client->replication = replicationBool ? REPLICATION_PHYSICAL : REPLICATION_NONE;
return true;
}

static bool decide_startup_pool(PgSocket *client, PktHdr *pkt)
{
const char *username = NULL, *dbname = NULL;
const char *key, *val;
bool ok;
bool appname_found = false;
unsigned original_read_pos = pkt->data.read_pos;

/*
* First check if we're dealing with a replication connection. Because for
* those we support some additional things when parsing the startup
* parameters, specifically we support any arguments in the options startup
* packet.
*/
while (1) {
ok = mbuf_get_string(&pkt->data, &key);
if (!ok || *key == 0)
break;
ok = mbuf_get_string(&pkt->data, &val);
if (!ok)
break;
if (strcmp(key, "replication") == 0) {
slog_debug(client, "got var: %s=%s", key, val);
set_replication(client, val);
}
}

pkt->data.read_pos = original_read_pos;

while (1) {
ok = mbuf_get_string(&pkt->data, &key);
Expand All @@ -686,6 +751,8 @@ static bool decide_startup_pool(PgSocket *client, PktHdr *pkt)
} else if (strcmp(key, "application_name") == 0) {
set_appname(client, val);
appname_found = true;
} else if (strcmp(key, "replication") == 0) {
/* do nothing, already checked in the previous loop */
} else if (varcache_set(&client->vars, key, val)) {
slog_debug(client, "got var: %s=%s", key, val);
} else if (strlist_contains(cf_ignore_startup_params, key)) {
Expand Down Expand Up @@ -1188,6 +1255,7 @@ bool client_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
res = handle_client_work(client, &pkt);
break;
case CL_WAITING:
case CL_WAITING_REPLICATION:
fatal("why waiting client in client_proto()");
default:
fatal("bad client state: %d", client->state);
Expand Down

0 comments on commit 62ab744

Please sign in to comment.