Skip to content

Commit

Permalink
repmgr: improve slot handling in "node rejoin"
Browse files Browse the repository at this point in the history
On the rejoined node, if a replication slot for the new upstream exists
(which is typically the case after a failover), delete that slot.

Also emit a warning about any inactive replication slots which may need
to be cleaned up manually.

GitHub #499.
  • Loading branch information
ibarwick committed Aug 30, 2018
1 parent 3573950 commit 9681708
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 44 deletions.
51 changes: 50 additions & 1 deletion repmgr-action-node.c
Original file line number Diff line number Diff line change
Expand Up @@ -2417,6 +2417,54 @@ do_node_rejoin(void)
success = is_downstream_node_attached(upstream_conn, config_file_options.node_name);
}

/*
* Handle replication slots:
* - if a slot for the new upstream exists, delete that
* - warn about any other inactive replication slots
*/
if (runtime_options.force_rewind_used == false && config_file_options.use_replication_slots)
{
PGconn *local_conn = NULL;
local_conn = establish_db_connection(config_file_options.conninfo, false);

if (PQstatus(local_conn) != CONNECTION_OK)
{
log_warning(_("unable to connect to local node to check replication slot status"));
log_hint(_("execute \"repmgr node check\" to check inactive slots and drop manually if necessary"));
}
else
{
KeyValueList inactive_replication_slots = {NULL, NULL};
KeyValueListCell *cell = NULL;
int inactive_count = 0;
PQExpBufferData slotinfo;

drop_replication_slot_if_exists(local_conn,
config_file_options.node_id,
primary_node_record.slot_name);

(void) get_inactive_replication_slots(local_conn, &inactive_replication_slots);

initPQExpBuffer(&slotinfo);
for (cell = inactive_replication_slots.head; cell; cell = cell->next)
{
appendPQExpBuffer(&slotinfo,
" - %s (%s)", cell->key, cell->value);
inactive_count++;
}

if (inactive_count > 0)
{
log_warning(_("%i inactive replication slots detected"), inactive_count);
log_detail(_("inactive replication slots:\n%s"), slotinfo.data);
log_hint(_("these replication slots may need to be removed manually"));
}

termPQExpBuffer(&slotinfo);

PQfinish(local_conn);
}
}

if (success == true)
{
Expand All @@ -2426,7 +2474,8 @@ do_node_rejoin(void)
else
{
/*
* if we reach here, no record found in upstream node's pg_stat_replication */
* if we reach here, no record found in upstream node's pg_stat_replication
*/
log_notice(_("NODE REJOIN has completed but node is not yet reattached to upstream"));
log_hint(_("you will need to manually check the node's replication status"));
}
Expand Down
47 changes: 4 additions & 43 deletions repmgr-action-standby.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ static int run_file_backup(t_node_info *node_record);

static void copy_configuration_files(bool delete_after_copy);

static void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);

static void tablespace_data_append(TablespaceDataList *list, const char *name, const char *oid, const char *location);

static void get_barman_property(char *dst, char *name, char *local_repmgr_directory);
Expand Down Expand Up @@ -2734,6 +2732,10 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
* If replication slots are in use, and an inactive one for this node
* exists on the former upstream, drop it.
*
* Note that if this function is called by do_standby_switchover(), the
* "repmgr node rejoin" command executed on the demotion candidate may already
* have removed the slot, so there may be nothing to do.
*
* XXX check if former upstream is current primary?
*/

Expand Down Expand Up @@ -3116,8 +3118,6 @@ do_standby_switchover(void)
}
termPQExpBuffer(&command_output);


// check walsenders here
/*
* populate local node record with current state of various replication-related
* values, so we can check for sufficient walsenders and replication slots
Expand Down Expand Up @@ -6067,45 +6067,6 @@ check_recovery_type(PGconn *conn)
}


static void
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
{
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
RecordStatus record_status = get_slot_record(conn, slot_name, &slot_info);

log_verbose(LOG_DEBUG, "attempting to delete slot \"%s\" on node %i",
slot_name, node_id);

if (record_status != RECORD_FOUND)
{
log_info(_("no slot record found for slot \"%s\" on node %i"),
slot_name, node_id);
}
else
{
if (slot_info.active == false)
{
if (drop_replication_slot(conn, slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
}
}

/*
* if active replication slot exists, call Houston as we have a
* problem
*/
else
{
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
}
}
}


/*
* Creates a recovery.conf file for a standby
Expand Down
1 change: 1 addition & 0 deletions repmgr-client-global.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,5 +237,6 @@ extern void get_node_config_directory(char *config_dir_buf);
extern void get_node_data_directory(char *data_dir_buf);
extern void init_node_record(t_node_info *node_record);
extern bool can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *reason);
extern void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);

#endif /* _REPMGR_CLIENT_GLOBAL_H_ */
43 changes: 43 additions & 0 deletions repmgr-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2978,3 +2978,46 @@ can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *rea

return can_use;
}


void
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
{
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
RecordStatus record_status = get_slot_record(conn, slot_name, &slot_info);

log_verbose(LOG_DEBUG, "attempting to delete slot \"%s\" on node %i",
slot_name, node_id);

if (record_status != RECORD_FOUND)
{
/* this is a good thing */
log_verbose(LOG_INFO,
_("slot \"%s\" does not exist on node %i, nothing to remove"),
slot_name, node_id);
}
else
{
if (slot_info.active == false)
{
if (drop_replication_slot(conn, slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
}
}

/*
* if active replication slot exists, call Houston as we have a
* problem
*/
else
{
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
}
}
}

0 comments on commit 9681708

Please sign in to comment.