Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolves #106: Implementing skip-empty-xacts. #120

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ MODULES = wal2json
# message test will fail for <= 9.5
REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable include_timestamp include_lsn include_xids
filtertable selecttable include_timestamp include_lsn include_xids \
skip_empty_xacts

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Parameters
* `write-in-chunks`: write after every change instead of every changeset. Default is _false_.
* `include-lsn`: add _nextlsn_ to each changeset. Default is _false_.
* `include-unchanged-toast` (deprecated): add TOAST value even if it was not modified. Since TOAST values are usually large, this option could save IO and bandwidth if it is disabled. Default is _true_.
* `skip-empty-xacts` ignore empty transactions. Default is _false_.
* `filter-tables`: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. `*.foo` means table foo in all schemas and `bar.*` means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table `"public"."Foo bar"` should be specified as `public.Foo\ bar`.
* `add-tables`: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from `filter-tables`.
* `format-version`: defines which format to use. Default is _1_.
Expand Down
61 changes: 61 additions & 0 deletions expected/skip_empty_xacts.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
\set VERBOSITY terse
-- predictability
SET synchronous_commit = on;
DROP TABLE IF EXISTS tbl;
CREATE TABLE tbl (id int);
DROP VIEW IF EXISTS tbl_view;
NOTICE: view "tbl_view" does not exist, skipping
CREATE MATERIALIZED VIEW tbl_view AS select * from tbl;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
init
(1 row)

-- Now slot should have zero changes
SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
?column?
----------
t
(1 row)

-- Refreshing Materialized Views generates empty transactions
REFRESH MATERIALIZED VIEW tbl_view;
-- Now slot should have one change
SELECT count(*) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
?column?
----------
t
(1 row)

REFRESH MATERIALIZED VIEW tbl_view;
-- The plugin should ignore the empty transaction
SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
?column?
----------
t
(1 row)

REFRESH MATERIALIZED VIEW tbl_view;
-- Writing transactions in chunks should work too
SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'skip-empty-xacts', '1', 'write-in-chunks', '1');
?column?
----------
t
(1 row)

REFRESH MATERIALIZED VIEW tbl_view;
-- Write in chunks still works
SELECT count(*) = 2 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'write-in-chunks', '1');
?column?
----------
t
(1 row)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)

38 changes: 38 additions & 0 deletions sql/skip_empty_xacts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
\set VERBOSITY terse

-- predictability
SET synchronous_commit = on;

DROP TABLE IF EXISTS tbl;
CREATE TABLE tbl (id int);

DROP VIEW IF EXISTS tbl_view;
CREATE MATERIALIZED VIEW tbl_view AS select * from tbl;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

-- Now slot should have zero changes
SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);

-- Refreshing Materialized Views generates empty transactions
REFRESH MATERIALIZED VIEW tbl_view;

-- Now slot should have one change
SELECT count(*) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);

REFRESH MATERIALIZED VIEW tbl_view;

-- The plugin should ignore the empty transaction
SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');

REFRESH MATERIALIZED VIEW tbl_view;

-- Writing transactions in chunks should work too
SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'skip-empty-xacts', '1', 'write-in-chunks', '1');

REFRESH MATERIALIZED VIEW tbl_view;
-- Write in chunks still works
SELECT count(*) = 2 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'write-in-chunks', '1');

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
42 changes: 36 additions & 6 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct
bool include_type_oids; /* include data type oids */
bool include_typmod; /* include typmod in types */
bool include_not_null; /* include not-null constraints */
bool skip_empty_xacts; /* skip empty transactions */

bool pretty_print; /* pretty-print JSON? */
bool write_in_chunks; /* write in chunks? */
Expand Down Expand Up @@ -141,6 +142,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->include_types = true;
data->include_type_oids = false;
data->include_typmod = true;
data->skip_empty_xacts = false;
data->pretty_print = false;
data->write_in_chunks = false;
data->include_lsn = false;
Expand Down Expand Up @@ -264,6 +266,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
{
if (elem->arg == NULL)
{
elog(DEBUG1, "skip-empty-xacts argument is null");
data->skip_empty_xacts = false;
}
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "pretty-print") == 0)
{
if (elem->arg == NULL)
Expand Down Expand Up @@ -417,14 +432,11 @@ pg_decode_shutdown(LogicalDecodingContext *ctx)
MemoryContextDelete(data->context);
}

/* BEGIN callback */
static void
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
/* Write transaction header (xid, nextlsn...) */
static void output_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
JsonDecodingData *data = ctx->output_plugin_private;

data->nr_changes = 0;


/* Transaction starts */
OutputPluginPrepareWrite(ctx, true);

Expand All @@ -451,13 +463,28 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
OutputPluginWrite(ctx, true);
}

/* BEGIN callback */
static void
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
JsonDecodingData *data = ctx->output_plugin_private;

data->nr_changes = 0;

if (!data->skip_empty_xacts)
output_begin(ctx, txn);
}

/* COMMIT callback */
static void
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
JsonDecodingData *data = ctx->output_plugin_private;

if (data->skip_empty_xacts && data->nr_changes == 0)
return;

if (txn->has_catalog_changes)
elog(DEBUG2, "txn has catalog changes: yes");
else
Expand Down Expand Up @@ -926,6 +953,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Change counter */
data->nr_changes++;

if (data->skip_empty_xacts && data->nr_changes == 1)
output_begin(ctx, txn);

/* if we don't write in chunks, we need a newline here */
if (!data->write_in_chunks)
appendStringInfo(ctx->out, "%s", data->nl);
Expand Down