Skip to content

Commit

Permalink
Report current LSN of Postgres nodes rather than WAL lag. (#53)
Browse files Browse the repository at this point in the history
We are now keeping track of LSN values at the monitor instead of WAL lag of the secondary. This approach would help us figuring out which secondary to promote when we add multiple secondaries.
  • Loading branch information
mtuncer committed Oct 8, 2019
1 parent 9c4cf24 commit 1d85287
Show file tree
Hide file tree
Showing 21 changed files with 165 additions and 132 deletions.
2 changes: 1 addition & 1 deletion src/bin/pg_autoctl/cli_do_monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ keeper_cli_monitor_node_active(int argc, char **argv)
keeper.state.current_group,
keeper.state.current_role,
keeper.postgres.pgIsRunning,
keeper.postgres.walLag,
keeper.postgres.currentLSN,
keeper.postgres.pgsrSyncState,
&assignedState))
{
Expand Down
6 changes: 3 additions & 3 deletions src/bin/pg_autoctl/fsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,13 @@ keeper_fsm_step(Keeper *keeper)
log_info("Calling node_active for node %s/%d/%d with current state: "
"PostgreSQL is running is %s, "
"sync_state is \"%s\", "
"WAL delta is %" PRId64 ".",
"latest WAL LSN is %s.",
config->formation,
keeperState->current_node_id,
keeperState->current_group,
postgres->pgIsRunning ? "true" : "false",
postgres->pgsrSyncState,
postgres->walLag);
postgres->currentLSN);

if (!monitor_node_active(monitor,
config->formation,
Expand All @@ -296,7 +296,7 @@ keeper_fsm_step(Keeper *keeper)
keeperState->current_group,
keeperState->current_role,
postgres->pgIsRunning,
postgres->walLag,
postgres->currentLSN,
postgres->pgsrSyncState,
&assignedState))
{
Expand Down
18 changes: 10 additions & 8 deletions src/bin/pg_autoctl/keeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ keeper_update_pg_state(Keeper *keeper)
/* reinitialize the replication state values each time we update */
postgres->pgIsRunning = false;
memset(postgres->pgsrSyncState, 0, PGSR_SYNC_STATE_MAXLENGTH);
postgres->walLag = -1;
strcpy(postgres->currentLSN, "0/0");

/*
* In some states, it's ok to not have a PostgreSQL data directory at all.
Expand Down Expand Up @@ -484,6 +484,7 @@ keeper_update_pg_state(Keeper *keeper)
case PRIMARY_STATE:
case WAIT_PRIMARY_STATE:
case SECONDARY_STATE:
case CATCHINGUP_STATE:
{
return postgres->pgIsRunning
&& keeper_get_replication_state(keeper);
Expand Down Expand Up @@ -565,8 +566,7 @@ keeper_get_replication_state(Keeper *keeper)
LocalPostgresServer *postgres = &(keeper->postgres);

PGSQL *pgsql = &(postgres->sqlClient);
bool missing_wallag_ok =
keeperState->current_role == WAIT_PRIMARY_STATE;
bool missingStateOk = keeperState->current_role == WAIT_PRIMARY_STATE;

bool success = false;

Expand All @@ -583,17 +583,19 @@ keeper_get_replication_state(Keeper *keeper)
if (pg_setup_is_primary(pgSetup))
{
success =
pgsql_get_sync_state_and_wal_lag(
pgsql_get_sync_state_and_current_lsn(
pgsql,
config->replication_slot_name,
postgres->pgsrSyncState,
&(postgres->walLag),
missing_wallag_ok);
postgres->currentLSN,
PG_LSN_MAXLENGTH,
missingStateOk);
log_warn("latest received lsn = %s", postgres->currentLSN);
}
else
{
success =
pgsql_get_wal_lag_from_standby(pgsql, &(postgres->walLag));
success = pgsql_get_received_lsn_from_standby(pgsql, postgres->currentLSN,
PG_LSN_MAXLENGTH);
}
pgsql_finish(pgsql);

Expand Down
6 changes: 3 additions & 3 deletions src/bin/pg_autoctl/keeper_pg_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ wait_until_primary_is_ready(Keeper *keeper,
MonitorAssignedState *assignedState)
{
bool pgIsRunning = false;
int64_t replicationLag = 0L;
char currrentLSN[PG_LSN_MAXLENGTH] = "0/0";
char *pgsrSyncState = "";
int errors = 0, tries = 0;
bool firstLoop = true;
Expand All @@ -629,7 +629,7 @@ wait_until_primary_is_ready(Keeper *keeper,
keeper->state.current_group,
keeper->state.current_role,
pgIsRunning,
replicationLag,
currrentLSN,
pgsrSyncState,
assignedState))
{
Expand Down Expand Up @@ -927,7 +927,7 @@ keeper_pg_init_node_active(Keeper *keeper)
keeper->state.current_group,
keeper->state.current_role,
ReportPgIsRunning(keeper),
keeper->postgres.walLag,
keeper->postgres.currentLSN,
keeper->postgres.pgsrSyncState,
&assignedState))
{
Expand Down
6 changes: 3 additions & 3 deletions src/bin/pg_autoctl/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ keeper_service_run(Keeper *keeper, pid_t *start_pid)
"%s, "
"PostgreSQL %s running, "
"sync_state is \"%s\", "
"WAL delta is %" PRId64 ".",
"current lsn is \"%s\".",
config->formation,
keeperState->current_node_id,
keeperState->current_group,
NodeStateToString(keeperState->current_role),
reportPgIsRunning ? "is" : "is not",
postgres->pgsrSyncState,
postgres->walLag);
postgres->currentLSN);

/*
* Report the current state to the monitor and get the assigned state.
Expand All @@ -182,7 +182,7 @@ keeper_service_run(Keeper *keeper, pid_t *start_pid)
keeperState->current_group,
keeperState->current_role,
reportPgIsRunning,
postgres->walLag,
postgres->currentLSN,
postgres->pgsrSyncState,
&assignedState);

Expand Down
14 changes: 7 additions & 7 deletions src/bin/pg_autoctl/monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ monitor_node_active(Monitor *monitor,
char *formation, char *host, int port, int nodeId,
int groupId, NodeState currentState,
bool pgIsRunning,
uint64_t replicationLag, char *pgsrSyncState,
char *currentLSN, char *pgsrSyncState,
MonitorAssignedState *assignedState)
{
PGSQL *pgsql = &monitor->pgsql;
Expand All @@ -298,7 +298,7 @@ monitor_node_active(Monitor *monitor,
"$6::pgautofailover.replication_state, $7, $8, $9)";
int paramCount = 9;
Oid paramTypes[9] = { TEXTOID, TEXTOID, INT4OID, INT4OID,
INT4OID, TEXTOID, BOOLOID, INT8OID, TEXTOID };
INT4OID, TEXTOID, BOOLOID, LSNOID, TEXTOID };
const char *paramValues[9];
MonitorAssignedStateParseContext parseContext = { assignedState, false };
const char *nodeStateString = NodeStateToString(currentState);
Expand All @@ -310,7 +310,7 @@ monitor_node_active(Monitor *monitor,
paramValues[4] = intToString(groupId).strValue;
paramValues[5] = nodeStateString;
paramValues[6] = pgIsRunning ? "true" : "false";
paramValues[7] = intToString(replicationLag).strValue;
paramValues[7] = currentLSN;
paramValues[8] = pgsrSyncState;

if (!pgsql_execute_with_params(pgsql, sql,
Expand All @@ -320,10 +320,10 @@ monitor_node_active(Monitor *monitor,
log_error("Failed to get node state for node %d (%s:%d) "
"in group %d of formation \"%s\" with initial state "
"\"%s\", replication state \"%s\", "
"and replication lag %" PRId64 ", "
"and current lsn \"%s\", "
"see previous lines for details",
nodeId, host, port, groupId, formation, nodeStateString,
pgsrSyncState, replicationLag);
pgsrSyncState, currentLSN);
return false;
}

Expand All @@ -334,11 +334,11 @@ monitor_node_active(Monitor *monitor,
{
log_error("Failed to get node state for node %d (%s:%d) in group %d of formation "
"\"%s\" with initial state \"%s\", replication state \"%s\","
" and replication lag %" PRId64
" and current lsn \"%s\""
" because the monitor returned an unexpected result, "
"see previous lines for details",
nodeId, host, port, groupId, formation, nodeStateString,
pgsrSyncState, replicationLag);
pgsrSyncState, currentLSN);
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/bin/pg_autoctl/monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ bool monitor_node_active(Monitor *monitor,
char *formation, char *host, int port, int nodeId,
int groupId, NodeState currentState,
bool pgIsRunning,
uint64_t replicationLag, char *pgsrSyncState,
char *currentLSN, char *pgsrSyncState,
MonitorAssignedState *assignedState);
bool monitor_remove(Monitor *monitor, char *host, int port);
bool monitor_print_state(Monitor *monitor, char *formation, int group);
Expand Down
66 changes: 41 additions & 25 deletions src/bin/pg_autoctl/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1182,23 +1182,25 @@ validate_connection_string(const char *connectionString)
/*
* pgsql_get_sync_state_and_wal_lag queries a primary PostgreSQL server to get
* both the current pg_stat_replication.sync_state value and replication lag.
*
* currentLSN is text representation of a 64 bit LSN value.
*/
typedef struct PgsrSyncAndWALContext
{
bool parsedOk;
char syncState[PGSR_SYNC_STATE_MAXLENGTH];
uint64_t walLag;
char currentLSN[PG_LSN_MAXLENGTH];
} PgsrSyncAndWALContext;

bool
pgsql_get_sync_state_and_wal_lag(PGSQL *pgsql, const char *slotName,
char *pgsrSyncState, int64_t *walLag,
bool missing_ok)
pgsql_get_sync_state_and_current_lsn(PGSQL *pgsql, const char *slotName,
char *pgsrSyncState, char *currentLSN,
int maxLSNSize, bool missing_ok)
{
PgsrSyncAndWALContext context = { 0 };
char *sql =
"select sync_state, "
"pg_current_wal_lsn() - flush_lsn as wal_lag "
"pg_current_wal_lsn() "
"from pg_replication_slots slot join pg_stat_replication rep "
"on rep.pid = slot.active_pid "
"where slot_name = $1";
Expand All @@ -1223,15 +1225,15 @@ pgsql_get_sync_state_and_wal_lag(PGSQL *pgsql, const char *slotName,
}

strlcpy(pgsrSyncState, context.syncState, PGSR_SYNC_STATE_MAXLENGTH);
*walLag = context.walLag;
strlcpy(currentLSN, context.currentLSN, maxLSNSize);

return true;
}


/*
* parsePgsrSyncStateAndWAL parses the result from a PostgreSQL query fetching
* two columns from pg_stat_replication: sync_state and wal lag.
* two columns from pg_stat_replication: sync_state and currentLSN.
*/
static void
parsePgsrSyncStateAndWAL(void *ctx, PGresult *result)
Expand All @@ -1253,20 +1255,14 @@ parsePgsrSyncStateAndWAL(void *ctx, PGresult *result)

case 1:
{
char *strWalLag = PQgetvalue(result, 0, 1);

/* we trust our length and PostgreSQL results */
strlcpy(context->syncState,
PQgetvalue(result, 0, 0),
PGSR_SYNC_STATE_MAXLENGTH);

context->walLag = strtoull(strWalLag, NULL, 10);

if (context->walLag == 0 && errno != 0)
{
context->parsedOk = false;
log_error("Failed to parse int64_t result \"%s\"", strWalLag);
}
strlcpy(context->currentLSN,
PQgetvalue(result, 0, 1),
PG_LSN_MAXLENGTH);

context->parsedOk = true;
return;
Expand All @@ -1279,21 +1275,42 @@ parsePgsrSyncStateAndWAL(void *ctx, PGresult *result)
}
}


/*
* pgsql_get_wal_lag_from_standby queries a standby PostgreSQL server to get the
* replication lag as seen from the pg_stat_wal_receiver system view.
* pgsql_get_received_lsn_from_standby queries a standby PostgreSQL server to get the
* received_lsn value from the pg_stat_wal_receiver system view.
*
* received_lsn is the latest lsn known to be received and flushed to the disk. It does
* not specify if it is applied or not. Caller should have allocated necessary memory
* for result value.
*
* We are collecting the latest WAL entry that is received successfully. It will be
* eventually applied to the receiving database. This information will later be
* used by monitor to decide which secondary has the latest data.
*
* Once a WAL is received and stored, it would be replayed to ensure database state
* is current just before the promotion time. Therefore when we look from monitor side
* it is the same if the WAL is just received and stored, or already applied.
*
* Related PostgreSQL documentation at
* https://www.postgresql.org/docs/current/warm-standby.html#STANDBY-SERVER-OPERATION
* states that
* Standby mode is exited and the server switches to normal operation when
* pg_ctl promote is run or a trigger file is found (trigger_file). Before failover,
* any WAL immediately available in the archive or in pg_wal will be restored,
* but no attempt is made to connect to the master.
*/
bool
pgsql_get_wal_lag_from_standby(PGSQL *pgsql, int64_t *walLag)
pgsql_get_received_lsn_from_standby(PGSQL *pgsql, char *receivedLSN, int maxLSNSize)
{
SingleValueResultContext context;
char *sql =
"SELECT coalesce(latest_end_lsn - received_lsn, -1) "
" FROM pg_stat_wal_receiver";
char *sql = "SELECT received_lsn FROM pg_stat_wal_receiver";

context.resultType = PGSQL_RESULT_BIGINT;
context.resultType = PGSQL_RESULT_STRING;
context.parsedOk = false;

log_trace("pgsql_get_received_lsn_from_standby : running %s", sql);

pgsql_execute_with_params(pgsql, sql, 0, NULL, NULL,
&context, &parseSingleValueResult);

Expand All @@ -1304,12 +1321,11 @@ pgsql_get_wal_lag_from_standby(PGSQL *pgsql, int64_t *walLag)
return false;
}

*walLag = context.bigint;
strlcpy(receivedLSN, context.strVal, maxLSNSize);

return true;
}


/*
* LISTEN/NOTIFY support.
*
Expand Down
19 changes: 14 additions & 5 deletions src/bin/pg_autoctl/pgsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
*/
#define MAXCONNINFO 1024


/*
* Maximum length of serialized pg_lsn value
* It is taken from postgres file pg_lsn.c.
* It defines MAXPG_LSNLEN to be 17 and
* allocates a buffer 1 byte larger. We
* went for 18 to make buffer allocation simpler.
*/
#define PG_LSN_MAXLENGTH 18

/*
* pg_stat_replication.sync_state is one if:
* sync, async, quorum, potential
Expand Down Expand Up @@ -152,11 +162,10 @@ int make_conninfo_field_str(char *destination, const char *key, const char *valu
int make_conninfo_field_int(char *destination, const char *key, int value);
bool validate_connection_string(const char *connectionString);

bool pgsql_get_sync_state_and_wal_lag(PGSQL *pgsql, const char *slotName,
char *pgsrSyncState, int64_t *walLag,
bool missing_ok);
bool pgsql_get_wal_lag_from_standby(PGSQL *pgsql, int64_t *walLag);

bool pgsql_get_sync_state_and_current_lsn(PGSQL *pgsql, const char *slotName,
char *pgsrSyncState, char *currentLSN,
int maxLSNSize, bool missing_ok);
bool pgsql_get_received_lsn_from_standby(PGSQL *pgsql, char *receivedLSN, int maxLSNSize);
bool pgsql_listen(PGSQL *pgsql, char *channels[]);

bool pgsql_alter_extension_update_to(PGSQL *pgsql,
Expand Down
5 changes: 4 additions & 1 deletion src/bin/pg_autoctl/primary_standby.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
* LocalPostgresServer represents a local postgres database cluster that
* we can manage via a SQL connection and operations on the database
* directory contained in the PostgresSetup.
*
* currentLSN value is kept as text for better portability. We do not
* perform any operation on the value after it was read from database.
*/
typedef struct LocalPostgresServer
{
PGSQL sqlClient;
PostgresSetup postgresSetup;
bool pgIsRunning;
char pgsrSyncState[PGSR_SYNC_STATE_MAXLENGTH];
int64_t walLag;
char currentLSN[PG_LSN_MAXLENGTH];
uint64_t pgFirstStartFailureTs;
int pgStartRetries;
PgInstanceKind pgKind;
Expand Down

0 comments on commit 1d85287

Please sign in to comment.