Skip to content

Commit

Permalink
Support replication connections through PgBouncer
Browse files Browse the repository at this point in the history
In session pooling mode PgBouncer is pretty much a transparent proxy,
i.e. the client does normally not even need to know that PgBouncer is in
the middle. This allows things like load balancing and failovers without
the client needing to know about this at all. But as soon as replication
connections are needed, this was not possible anymore, because PgBouncer
would reject those instead of proxying them to the right server.

This PR fixes that by also proxying replication connections. They are
handled pretty differently from normal connections though. A client and
server replication connection will form a strong pair, as soon as one is
closed the other is closed too. So, there's no caching of the server
replication connections, like is done for regular connections. Reusing
replication connections comes with a ton of gotchas. Postgres will throw
errors in many cases when trying to do so. So simply not doing it seems
like a good tradeoff for ease of implementation. Especially because
replication connections are pretty much always very long lived. So
re-using them gains pretty much no performance benefits.

Fixes pgbouncer#382
  • Loading branch information
JelteF committed May 6, 2024
1 parent 3f6b3dc commit c17ba04
Show file tree
Hide file tree
Showing 36 changed files with 1,147 additions and 111 deletions.
6 changes: 3 additions & 3 deletions .cirrus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ task:
# - image: rockylinux:8
# - image: centos:centos7
# setup_script:
# - yum -y install autoconf automake diffutils file libevent-devel libtool make openssl-devel pkg-config postgresql-server systemd-devel wget
# - yum -y install autoconf automake diffutils file libevent-devel libtool make openssl-devel pkg-config postgresql-server postgresql-contrib systemd-devel wget
# - if cat /etc/centos-release | grep -q ' 7'; then yum -y install python python-pip; else yum -y install python3 python3-pip sudo iptables; fi
# - wget -O /tmp/pandoc.tar.gz https://github.com/jgm/pandoc/releases/download/2.10.1/pandoc-2.10.1-linux-amd64.tar.gz
# - tar xvzf /tmp/pandoc.tar.gz --strip-components 1 -C /usr/local/
Expand Down Expand Up @@ -138,7 +138,7 @@ task:
# - image: alpine:latest
# setup_script:
# - apk update
# - apk add autoconf automake bash build-base libevent-dev libtool openssl openssl-dev pkgconf postgresql python3 py3-pip wget sudo iptables
# - apk add autoconf automake bash build-base libevent-dev libtool openssl openssl-dev pkgconf postgresql postgresql-contrib python3 py3-pip wget sudo iptables
# - wget -O /tmp/pandoc.tar.gz https://github.com/jgm/pandoc/releases/download/2.10.1/pandoc-2.10.1-linux-amd64.tar.gz
# - tar xvzf /tmp/pandoc.tar.gz --strip-components 1 -C /usr/local/
# - python3 -m pip install -r requirements.txt
Expand Down Expand Up @@ -166,7 +166,7 @@ task:
HAVE_IPV6_LOCALHOST: yes
USE_SUDO: true
setup_script:
- pkg install -y autoconf automake bash gmake hs-pandoc libevent libtool pkgconf postgresql12-server python devel/py-pip sudo
- pkg install -y autoconf automake bash gmake hs-pandoc libevent libtool pkgconf postgresql12-server postgresql12-contrib python devel/py-pip sudo
- pip install -r requirements.txt
- kldload pf
- echo 'anchor "pgbouncer_test/*"' >> /etc/pf.conf
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pgbouncer_SOURCES = \
src/util.c \
src/varcache.c \
src/common/base64.c \
src/common/bool.c \
src/common/pgstrcasecmp.c \
src/common/saslprep.c \
src/common/scram-common.c \
src/common/unicode_norm.c \
Expand Down Expand Up @@ -55,6 +57,7 @@ pgbouncer_SOURCES = \
include/util.h \
include/varcache.h \
include/common/base64.h \
include/common/builtins.h \
include/common/pg_wchar.h \
include/common/postgres_compat.h \
include/common/saslprep.h \
Expand Down
2 changes: 1 addition & 1 deletion doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ The file follows the format of the PostgreSQL `pg_hba.conf` file
(see <https://www.postgresql.org/docs/current/auth-pg-hba-conf.html>).

* Supported record types: `local`, `host`, `hostssl`, `hostnossl`.
* Database field: Supports `all`, `sameuser`, `@file`, multiple names. Not supported: `replication`, `samerole`, `samegroup`.
* Database field: Supports `all`, `replication`, `sameuser`, `@file`, multiple names. Not supported: `samerole`, `samegroup`.
* User name field: Supports `all`, `@file`, multiple names. Not supported: `+groupname`.
* Address field: Supports IPv4, IPv6. Not supported: DNS names, domain prefixes.
* Auth-method field: Only methods supported by PgBouncer's `auth_type`
Expand Down
6 changes: 6 additions & 0 deletions doc/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ user
database
: Database name.

replication
: If server connection uses replication. Can be `none`, `logical` or `physical`.

state
: State of the pgbouncer server connection, one of **active**,
**idle**, **used**, **tested**, **new**, **active_cancel**,
Expand Down Expand Up @@ -322,6 +325,9 @@ user
database
: Database name.

replication
: If client connection uses replication. Can be `none`, `logical` or `physical`.

state
: State of the client connection, one of **active**, **waiting**,
**active_cancel_req**, or **waiting_cancel_req**.
Expand Down
12 changes: 12 additions & 0 deletions include/bouncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ typedef struct PktBuf PktBuf;
typedef struct ScramState ScramState;
typedef struct PgPreparedStatement PgPreparedStatement;
typedef enum ResponseAction ResponseAction;
typedef enum ReplicationType ReplicationType;

extern int cf_sbuf_len;

Expand Down Expand Up @@ -610,6 +611,14 @@ typedef struct OutstandingRequest {
uint64_t server_ps_query_id;
} OutstandingRequest;

enum ReplicationType {
REPLICATION_NONE = 0,
REPLICATION_LOGICAL,
REPLICATION_PHYSICAL,
};

extern const char *replication_type_parameters[3];

/*
* A client or server connection.
*
Expand Down Expand Up @@ -654,6 +663,9 @@ struct PgSocket {
* the outstanding requests until the next Sync */
bool query_failed : 1;

ReplicationType replication; /* If this is a replication connection */
char *startup_options; /* only tracked for replication connections */

usec_t connect_time; /* when connection was made */
usec_t request_time; /* last activity time */
usec_t query_start; /* client: query start moment */
Expand Down
18 changes: 18 additions & 0 deletions include/common/builtins.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*-------------------------------------------------------------------------
*
* builtins.h
* Declarations for operations on built-in types.
*
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* include/common/builtins.h
*
*-------------------------------------------------------------------------
*/

/* bool.c */
extern bool parse_bool(const char *value, bool *result);
extern bool parse_bool_with_len(const char *value, size_t len, bool *result);
extern int pg_strncasecmp(const char *s1, const char *s2, size_t n);
1 change: 1 addition & 0 deletions include/common/postgres_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
/* from c.h */

#include <string.h>
#include <usual/ctype.h>

#define int8 int8_t
#define uint8 uint8_t
Expand Down
7 changes: 4 additions & 3 deletions include/hba.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/

#define NAME_ALL 1
#define NAME_SAMEUSER 2
#define NAME_ALL 1
#define NAME_SAMEUSER 2
#define NAME_REPLICATION 4

enum RuleType {
RULE_LOCAL,
Expand Down Expand Up @@ -73,4 +74,4 @@ struct Ident *ident_load_map(const char *fn);
void ident_free(struct Ident *ident);
struct HBA *hba_load_rules(const char *fn, struct Ident *ident);
void hba_free(struct HBA *hba);
struct HBARule *hba_eval(struct HBA *hba, PgAddr *addr, bool is_tls, const char *dbname, const char *username);
struct HBARule *hba_eval(struct HBA *hba, PgAddr *addr, bool is_tls, ReplicationType replication, const char *dbname, const char *username);
1 change: 1 addition & 0 deletions include/objects.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ PgPool *get_pool(PgDatabase *db, PgCredentials *user_credentials);
PgPool *get_peer_pool(PgDatabase *);
PgSocket *compare_connections_by_time(PgSocket *lhs, PgSocket *rhs);
bool evict_connection(PgDatabase *db) _MUSTCHECK;
bool evict_pool_connection(PgPool *pool) _MUSTCHECK;
bool evict_user_connection(PgCredentials *user_credentials) _MUSTCHECK;
bool find_server(PgSocket *client) _MUSTCHECK;
bool life_over(PgSocket *server);
Expand Down
3 changes: 0 additions & 3 deletions include/pktbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ void pktbuf_write_ExtQuery(PktBuf *buf, const char *query, int nargs, ...);
#define pktbuf_write_CancelRequest(buf, key) \
pktbuf_write_generic(buf, PKT_CANCEL, "b", key, 8)

#define pktbuf_write_StartupMessage(buf, user, parms, parms_len) \
pktbuf_write_generic(buf, PKT_STARTUP_V3, "bsss", parms, parms_len, "user", user, "")

#define pktbuf_write_NegotiateProtocolVersion( \
buf, \
unsupported_protocol_extensions_count, \
Expand Down
5 changes: 3 additions & 2 deletions include/prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ typedef struct PgServerPreparedStatement {
PgPreparedStatement *ps;
} PgServerPreparedStatement;

#define is_prepared_statements_enabled(pool) \
(pool_pool_mode(pool) != POOL_SESSION && cf_max_prepared_statements != 0)
#define is_prepared_statements_enabled(client_or_server) \
(connection_pool_mode(client_or_server) != POOL_SESSION && cf_max_prepared_statements != 0)


bool handle_parse_command(PgSocket *client, PktHdr *pkt);
bool handle_bind_command(PgSocket *client, PktHdr *pkt);
Expand Down
3 changes: 2 additions & 1 deletion include/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *pkt) _MUSTCHECK;
void kill_pool_logins(PgPool *pool, const char *sqlstate, const char *msg);
int pool_pool_mode(PgPool *pool) _MUSTCHECK;
int connection_pool_mode(PgSocket *server) _MUSTCHECK;
int probably_wrong_pool_pool_mode(PgPool *pool) _MUSTCHECK;
int pool_pool_size(PgPool *pool) _MUSTCHECK;
int pool_min_pool_size(PgPool *pool) _MUSTCHECK;
usec_t pool_server_lifetime(PgPool *pool) _MUSTCHECK;
Expand Down
2 changes: 2 additions & 0 deletions include/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ bool cf_set_authdb(struct CfValue *cv, const char *value);

/* reserved database name checking */
bool check_reserved_database(const char *value);

bool strings_equal(const char *str_left, const char *str_right) _MUSTCHECK;
1 change: 1 addition & 0 deletions include/varcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ void init_var_lookup(const char *cf_track_extra_parameters);
int get_num_var_cached(void);
bool varcache_set(VarCache *cache, const char *key, const char *value) /* _MUSTCHECK */;
bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p) _MUSTCHECK;
void varcache_apply_startup(PktBuf *pkt, PgSocket *client);
void varcache_fill_unset(VarCache *src, PgSocket *dst);
void varcache_clean(VarCache *cache);
void varcache_add_params(PktBuf *pkt, VarCache *vars);
Expand Down
17 changes: 13 additions & 4 deletions src/admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,13 @@ static bool admin_show_users(PgSocket *admin, const char *arg)
return true;
}

#define SKF_STD "sssssisiTTiiississi"
#define SKF_DBG "sssssisiTTiiississiiiiiiii"
#define SKF_STD "ssssssisiTTiiississi"
#define SKF_DBG "ssssssisiTTiiississiiiiiiii"

static void socket_header(PktBuf *buf, bool debug)
{
pktbuf_write_RowDescription(buf, debug ? SKF_DBG : SKF_STD,
"type", "user", "database", "state",
"type", "user", "database", "replication", "state",
"addr", "port", "local_addr", "local_port",
"connect_time", "request_time",
"wait", "wait_us", "close_needed",
Expand Down Expand Up @@ -660,6 +660,7 @@ static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug)
const struct PStr *application_name = v->var_list[VAppName];
usec_t now = get_cached_time();
usec_t wait_time = sk->query_start ? now - sk->query_start : 0;
char *replication;

if (io) {
pkt_avail = iobuf_amount_parse(sk->sbuf.io);
Expand Down Expand Up @@ -692,10 +693,18 @@ static void socket_row(PktBuf *buf, PgSocket *sk, const char *state, bool debug)
else
prepared_statement_count = HASH_COUNT(sk->client_prepared_statements);

if (sk->replication == REPLICATION_NONE)
replication = "none";
else if (sk->replication == REPLICATION_LOGICAL)
replication = "logical";
else
replication = "physical";

pktbuf_write_DataRow(buf, debug ? SKF_DBG : SKF_STD,
is_server_socket(sk) ? "S" : "C",
sk->login_user_credentials ? sk->login_user_credentials->name : "(nouser)",
sk->pool && !sk->pool->db->peer_id ? sk->pool->db->name : "(nodb)",
replication,
state, r_addr, pga_port(&sk->remote_addr),
l_addr, pga_port(&sk->local_addr),
sk->connect_time,
Expand Down Expand Up @@ -898,7 +907,7 @@ static bool admin_show_pools(PgSocket *admin, const char *arg)
pool = container_of(item, PgPool, head);
waiter = first_socket(&pool->waiting_client_list);
max_wait = (waiter && waiter->query_start) ? now - waiter->query_start : 0;
pool_mode = pool_pool_mode(pool);
pool_mode = probably_wrong_pool_pool_mode(pool);
pktbuf_write_DataRow(buf, "ssiiiiiiiiiiiiis",
pool->db->name, pool->user_credentials->name,
statlist_count(&pool->active_client_list),
Expand Down

0 comments on commit c17ba04

Please sign in to comment.