Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ SHLIB_LINK = $(libpq)

OBJS = src/pgactive.o \
src/pgactive_apply.o \
src/pgactive_elog.o \
src/pgactive_dbcache.o \
src/pgactive_ddlrep.o \
src/pgactive_ddlrep_truncate.o \
Expand Down
3 changes: 3 additions & 0 deletions include/pgactive.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "libpq-fe.h"

#include "pgactive_config.h"
#include "pgactive_elog.h"
#include "pgactive_internal.h"
#include "pgactive_version.h"
#include "pgactive_compat.h"
Expand Down Expand Up @@ -383,6 +384,8 @@ typedef struct pgactiveWorker
/* proc entry of worker if running, or NULL */
PGPROC *worker_proc;

/* last error info of worker */
pgactiveLastErrorInfo last_error_info;
union data
{
pgactiveApplyWorker apply;
Expand Down
88 changes: 88 additions & 0 deletions include/pgactive_elog.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/* -------------------------------------------------------------------------
*
* pgactive_elog.h
* pgactive error reporting facility
*
* Copyright (C) 2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* pgactive_elog.h
*
* -------------------------------------------------------------------------
*/
#ifndef pgactive_ELOG_H
#define pgactive_ELOG_H

#include "datatype/timestamp.h"

/*
* Define pgactive error codes for which last error info needs to be tracked.
* When adding new error code, remember to add corresponding entry in
* pgactiveErrorMessages.
*
* NB: If ever changing start and end limits for one worker, adjust other
* workers' start and end limits.
*/

#define PGACTIVE_TRACKED_ERRORS_CHUNK 64
typedef enum pgactiveTrackedErrorCodes
{
PGACTIVE_ERRCODE_NONE = 0,

/* perdb worker error codes start --> */
PGACTIVE_PERDB_WORKER_ERROR_CODE_START = 1,
PGACTIVE_ERROR_CODE_MAX_NODES_PARAM_MISMATCH = 2,

/* add new perdb worker error codes here */

PGACTIVE_PERDB_WORKER_ERROR_CODE_END = PGACTIVE_TRACKED_ERRORS_CHUNK,
/* <-- perdb worker error codes end */

/* apply worker error codes start --> */
PGACTIVE_APPLY_WORKER_ERROR_CODE_START = PGACTIVE_TRACKED_ERRORS_CHUNK + 1,
PGACTIVE_ERROR_CODE_APPLY_FAILURE,

/* add new apply worker error codes here */

PGACTIVE_APPLY_WORKER_ERROR_CODE_END = 2 * PGACTIVE_TRACKED_ERRORS_CHUNK,
/* <-- apply worker error codes end */
} pgactiveTrackedErrorCodes;

extern PGDLLIMPORT const char *const pgactiveErrorMessages[];

typedef struct pgactiveLastErrorInfo
{
pgactiveTrackedErrorCodes errcode;
TimestampTz errtime;
} pgactiveLastErrorInfo;

#define GET_FIRST_ARG(arg1, ...) arg1
#define GET_REST_ARGS(arg1, ...) __VA_ARGS__

/*
* Log the pgactive error message. Either call with pgactiveTrackedErrorCodes:
*
* ereport_pgactive(ERROR,
* PGACTIVE_ERRCODE_XXX,
* errmsg("...."));
*
* or just call:
*
* ereport_pgactive(ERROR,
* errmsg("...."));
*/
#define ereport_pgactive(elevel, ...) \
do { \
pgactive_set_worker_last_error_info(pgactive_worker_slot, \
GET_FIRST_ARG(__VA_ARGS__, 0)); \
ereport(elevel, GET_REST_ARGS(__VA_ARGS__)); \
} while(0)

/* Forward declaration */
struct pgactiveWorker;

extern void pgactive_set_worker_last_error_info(struct pgactiveWorker *w,
pgactiveTrackedErrorCodes errcode);
extern void pgactive_reset_worker_last_error_info(struct pgactiveWorker *w);

#endif /* pgactive_ELOG_H */
4 changes: 3 additions & 1 deletion pgactive--2.1.3--2.1.4.sql
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ CREATE FUNCTION pgactive_get_workers_info (
OUT dboid oid,
OUT worker_type text,
OUT pid int4,
OUT unregistered boolean
OUT unregistered boolean,
OUT last_error text,
OUT last_error_time timestamptz
)
RETURNS SETOF record
AS 'MODULE_PATHNAME'
Expand Down
18 changes: 13 additions & 5 deletions src/pgactive.c
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ pgactive_get_worker_pid_byid(const pgactiveNodeId * const node, pgactiveWorkerTy
Datum
pgactive_get_workers_info(PG_FUNCTION_ARGS)
{
#define pgactive_GET_WORKERS_PID_COLS 6
#define pgactive_GET_WORKERS_PID_COLS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
int i;

Expand All @@ -1783,10 +1783,6 @@ pgactive_get_workers_info(PG_FUNCTION_ARGS)
if (w->worker_type == pgactive_WORKER_EMPTY_SLOT)
continue;

/* unconnected slot */
if (w->worker_proc == NULL)
continue;

if (w->worker_type == pgactive_WORKER_APPLY)
{
pgactiveApplyWorker *aw = &w->data.apply;
Expand Down Expand Up @@ -1827,6 +1823,18 @@ pgactive_get_workers_info(PG_FUNCTION_ARGS)
values[4] = Int32GetDatum(w->worker_pid);
values[5] = BoolGetDatum(unregistered);

if (w->last_error_info.errcode != PGACTIVE_ERRCODE_NONE)
{
Assert(w->last_error_info.errtime != 0);
values[6] = CStringGetTextDatum(pgactiveErrorMessages[w->last_error_info.errcode]);
values[7] = TimestampTzGetDatum(w->last_error_info.errtime);
}
else
{
nulls[6] = true;
nulls[7] = true;
}

tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
values, nulls);
}
Expand Down
15 changes: 15 additions & 0 deletions src/pgactive_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -2615,6 +2615,7 @@ pgactive_apply_work(PGconn *streamConn)
int fd;
char *copybuf = NULL;
XLogRecPtr last_received = InvalidXLogRecPtr;
static bool first_time = true;

fd = PQsocket(streamConn);

Expand Down Expand Up @@ -2779,6 +2780,17 @@ pgactive_apply_work(PGconn *streamConn)
pgactive_send_feedback(streamConn, last_received,
GetCurrentTimestamp(), false);

if (first_time)
{
/*
* Reset if there's any last error info. We do this after applying
* at least one change. Because that is an indication of the apply
* worker's recovery from the error.
*/
pgactive_reset_worker_last_error_info(pgactive_worker_slot);
first_time = false;
}

/*
* If the user has paused replication with pgactive_apply_pause(), we
* wait on our procLatch until pgactive_apply_resume() unsets the flag
Expand Down Expand Up @@ -2983,6 +2995,9 @@ pgactive_apply_main(Datum main_arg)
}
PG_CATCH();
{
pgactive_set_worker_last_error_info(pgactive_worker_slot,
PGACTIVE_ERROR_CODE_APPLY_FAILURE);

if (IsTransactionState())
pgactive_count_rollback();
PG_RE_THROW();
Expand Down
69 changes: 69 additions & 0 deletions src/pgactive_elog.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* -------------------------------------------------------------------------
*
* pgactive_elog.c
* pgactive error reporting facility
*
* Copyright (C) 2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* pgactive_elog.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"

#include "pgactive.h"
#include "pgactive_elog.h"

/*
* Lookup table for pgactive error messages.
*/
const char *const pgactiveErrorMessages[] = {
[PGACTIVE_ERRCODE_NONE] = "none",
[PGACTIVE_ERROR_CODE_MAX_NODES_PARAM_MISMATCH] = "pgactive_max_nodes_parameter_mismatch",
[PGACTIVE_ERROR_CODE_APPLY_FAILURE] = "pgactive_apply_failure",
};

void
pgactive_set_worker_last_error_info(struct pgactiveWorker *w,
pgactiveTrackedErrorCodes errcode)
{
if (w == NULL)
return;

Assert(w->worker_type == pgactive_WORKER_PERDB ||
w->worker_type == pgactive_WORKER_APPLY);

w->last_error_info.errcode = errcode;
w->last_error_info.errtime = GetCurrentTimestamp();
}

void
pgactive_reset_worker_last_error_info(struct pgactiveWorker *w)
{
int errcode;

if (w == NULL)
return;

Assert(w->worker_type == pgactive_WORKER_PERDB ||
w->worker_type == pgactive_WORKER_APPLY);

errcode = w->last_error_info.errcode;

if (w->worker_type == pgactive_WORKER_PERDB)
{
if (!(errcode >= PGACTIVE_PERDB_WORKER_ERROR_CODE_START &&
errcode <= PGACTIVE_PERDB_WORKER_ERROR_CODE_END))
return;
}
else if (w->worker_type == pgactive_WORKER_APPLY)
{
if (!(errcode >= PGACTIVE_APPLY_WORKER_ERROR_CODE_START &&
errcode <= PGACTIVE_APPLY_WORKER_ERROR_CODE_END))
return;
}

w->last_error_info.errcode = PGACTIVE_ERRCODE_NONE;
w->last_error_info.errtime = 0;
}
25 changes: 17 additions & 8 deletions src/pgactive_perdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -975,14 +975,15 @@ check_params_ensure_error_cleanup(PGconn *conn, char *node_name)
pgactive_get_remote_nodeinfo_internal(conn, &ri);

if (pgactive_max_nodes != ri.max_nodes)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pgactive.max_nodes parameter value (%d) on local node " pgactive_NODEID_FORMAT_WITHNAME " doesn't match with remote node %s value (%d)",
pgactive_max_nodes,
pgactive_LOCALID_FORMAT_WITHNAME_ARGS,
node_name,
ri.max_nodes),
errhint("The parameter must be set to the same value on all pgactive members.")));
ereport_pgactive(ERROR,
PGACTIVE_ERROR_CODE_MAX_NODES_PARAM_MISMATCH,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pgactive.max_nodes parameter value (%d) on local node " pgactive_NODEID_FORMAT_WITHNAME " doesn't match with remote node %s value (%d)",
pgactive_max_nodes,
pgactive_LOCALID_FORMAT_WITHNAME_ARGS,
node_name,
ri.max_nodes),
errhint("The parameter must be set to the same value on all pgactive members.")));

if (prev_pgactive_skip_ddl_replication != ri.skip_ddl_replication)
ereport(ERROR,
Expand Down Expand Up @@ -1313,6 +1314,14 @@ pgactive_perdb_worker_main(Datum main_arg)
/* Launch the apply workers */
pgactive_maintain_db_workers();

/*
* Reset if there's any last error info. We do this before getting into
* the real business of the per-db worker, that is, after having performed
* all initial checks, init_replica and one round of launching apply
* workers.
*/
pgactive_reset_worker_last_error_info(pgactive_worker_slot);

while (!ProcDiePending)
{
if (ConfigReloadPending)
Expand Down
24 changes: 24 additions & 0 deletions test/t/047_verify_pgactive_guc_settings.pl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@
$logstart_b);
ok($result, "pgactive.max_nodes parameter value mismatch between local node and remote node is detected");

# Check if the per-db worker's last error was logged and reported correctly
my $res = $node_b->safe_psql($pgactive_test_dbname,
qq[SELECT COUNT(*) = 1 AS ok FROM pgactive.pgactive_get_workers_info()
WHERE worker_type = 'per-db' AND
last_error = 'pgactive_max_nodes_parameter_mismatch' AND
last_error_time IS NOT NULL;]);
is($res, 't', "pgactive error info has been reported correctly");

# Change pgactive.max_nodes value on node to make it successfully start per-db and
# apply workers.
$node_b->append_conf('postgresql.conf', qq(pgactive.max_nodes = 2));
Expand Down Expand Up @@ -171,6 +179,22 @@
is($node_1->safe_psql($pgactive_test_dbname, q[SELECT COUNT(*) FROM fruits;]),
'2', "Changes available on node_1");

# Check if the apply worker's last error was logged and reported correctly
$node_1->append_conf('postgresql.conf', q{pgactive.skip_ddl_replication = true});
$node_1->restart;

# Induce the apply error
$node_1->safe_psql($pgactive_test_dbname,
q[ALTER TABLE fruits DROP COLUMN name;]);
$node_0->safe_psql($pgactive_test_dbname,
q[INSERT INTO fruits VALUES (3, 'Mango');]);

$node_1->poll_query_until($pgactive_test_dbname,
qq[SELECT COUNT(*) = 1 AS ok FROM pgactive.pgactive_get_workers_info()
WHERE worker_type = 'apply' AND
last_error = 'pgactive_apply_failure' AND
last_error_time IS NOT NULL;]);

$node_0->stop;
$node_1->stop;

Expand Down