Skip to content

Commit

Permalink
Include LSN position for the next transaction.
Browse files Browse the repository at this point in the history
This feature was discussed with @ethansf and is useful for tool that
consumes WAL using pg_recvlogical and similar tools. The LSN is
important to mark the apply progress. It stores the LSN pointing to the
end of commit record + 1. Hence, if the consuming tool crashes while
applying a transaction (JSON document), it can restart the
pg_recvlogical streaming from the last saved LSN position. This is
essential for such consuming tool to work properly.

Although, I did not have used Ethans' patch, this piece of code is
based on his idea.
  • Loading branch information
Euler Taveira committed Apr 10, 2015
1 parent 543b08c commit dc6e390
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
Expand All @@ -44,6 +45,12 @@ typedef struct
bool include_schemas; /* qualify tables */
bool include_types; /* include data types */

/*
* LSN pointing to the end of commit record + 1 (txn->end_lsn)
* It is useful for tools that wants a position to restart from.
*/
bool include_lsn; /* include LSNs */

uint64 nr_changes; /* # of passes in pg_decode_change() */
/* FIXME replace with txn->nentries */
} JsonDecodingData;
Expand Down Expand Up @@ -94,6 +101,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->include_timestamp = false;
data->include_schemas = true;
data->include_types = true;
data->include_lsn = false;

data->nr_changes = 0;

Expand Down Expand Up @@ -160,6 +168,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-lsn") == 0)
{
if (elem->arg == NULL)
{
elog(LOG, "include-lsn argument is null");
data->include_lsn = true;
}
else if (!parse_bool(strVal(elem->arg), &data->include_lsn))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else
{
elog(WARNING, "option %s = %s is unknown",
Expand Down Expand Up @@ -194,6 +215,15 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
if (data->include_xids)
appendStringInfo(ctx->out, "\t\"xid\": %u,\n", txn->xid);

if (data->include_lsn)
{
char *lsn_str = DatumGetCString(DirectFunctionCall1(pg_lsn_out, txn->end_lsn));

appendStringInfo(ctx->out, "\t\"next lsn\": \"%s\",\n", lsn_str);

pfree(lsn_str);
}

if (data->include_timestamp)
appendStringInfo(ctx->out, "\t\"timestamp\": \"%s\",\n", timestamptz_to_str(txn->commit_time));

Expand Down

0 comments on commit dc6e390

Please sign in to comment.