Skip to content

Commit

Permalink
Control how the stream is written.
Browse files Browse the repository at this point in the history
The new parameter 'write-in-chunks' controls how to write in the
stream. If true, write occurs per tuple; else it writes at the end
of transaction.

The default is true. Hence, the behavior is the same as prior code.
  • Loading branch information
Euler Taveira committed Aug 26, 2015
1 parent f9446d1 commit 7c9b483
Showing 1 changed file with 40 additions and 7 deletions.
47 changes: 40 additions & 7 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ typedef struct
bool include_types; /* include data types */

bool pretty_print; /* pretty-print JSON? */
bool write_in_chunks; /* write in chunks? */

/*
* LSN pointing to the end of commit record + 1 (txn->end_lsn)
Expand Down Expand Up @@ -104,6 +105,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->include_schemas = true;
data->include_types = true;
data->pretty_print = false;
data->write_in_chunks = true;
data->include_lsn = false;

data->nr_changes = 0;
Expand Down Expand Up @@ -184,6 +186,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "write-in-chunks") == 0)
{
if (elem->arg == NULL)
{
elog(LOG, "write-in-chunks argument is null");
data->write_in_chunks = true;
}
else if (!parse_bool(strVal(elem->arg), &data->write_in_chunks))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-lsn") == 0)
{
if (elem->arg == NULL)
Expand Down Expand Up @@ -264,7 +279,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
else
appendStringInfoString(ctx->out, "\"change\":[");

OutputPluginWrite(ctx, true);
if (data->write_in_chunks)
OutputPluginWrite(ctx, true);
}

/* COMMIT callback */
Expand All @@ -282,12 +298,21 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
elog(DEBUG1, "# of subxacts: %d", txn->nsubtxns);

/* Transaction ends */
OutputPluginPrepareWrite(ctx, true);
if (data->write_in_chunks)
OutputPluginPrepareWrite(ctx, true);

if (data->pretty_print)
{
/* if we don't write in chunks, we need a newline here */
if (!data->write_in_chunks)
appendStringInfoChar(ctx->out, '\n');

appendStringInfoString(ctx->out, "\t]\n}");
}
else
{
appendStringInfoString(ctx->out, "]}");
}

OutputPluginWrite(ctx, true);
}
Expand Down Expand Up @@ -634,7 +659,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

OutputPluginPrepareWrite(ctx, true);
if (data->write_in_chunks)
OutputPluginPrepareWrite(ctx, true);

/* Make sure rd_replidindex is set */
RelationGetIndexList(relation);
Expand Down Expand Up @@ -697,10 +723,16 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Change starts */
if (data->pretty_print)
{
/* if we don't write in chunks, we need a newline here */
if (!data->write_in_chunks)
appendStringInfoChar(ctx->out, '\n');

appendStringInfoString(ctx->out, "\t\t");

if (data->nr_changes > 1)
appendStringInfoString(ctx->out, "\t\t,{\n");
else
appendStringInfoString(ctx->out, "\t\t{\n");
appendStringInfoChar(ctx->out, ',');

appendStringInfoString(ctx->out, "{\n");
}
else
{
Expand Down Expand Up @@ -821,5 +853,6 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);

OutputPluginWrite(ctx, true);
if (data->write_in_chunks)
OutputPluginWrite(ctx, true);
}

0 comments on commit 7c9b483

Please sign in to comment.