diff --git a/Makefile b/Makefile index c09b4b7..3f7d920 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,8 @@ MODULES = wal2json # message test will fail for <= 9.5 REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ delete3 delete4 savepoint specialvalue toast bytea message typmod \ - filtertable selecttable include_timestamp include_lsn include_xids + filtertable selecttable include_timestamp include_lsn include_xids \ + skip_empty_xacts PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index a7fb0c8..9c6a4be 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Parameters * `write-in-chunks`: write after every change instead of every changeset. Default is _false_. * `include-lsn`: add _nextlsn_ to each changeset. Default is _false_. * `include-unchanged-toast` (deprecated): add TOAST value even if it was not modified. Since TOAST values are usually large, this option could save IO and bandwidth if it is disabled. Default is _true_. +* `skip-empty-xacts` ignore empty transactions. Default is _false_. * `filter-tables`: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. `*.foo` means table foo in all schemas and `bar.*` means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table `"public"."Foo bar"` should be specified as `public.Foo\ bar`. * `add-tables`: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from `filter-tables`. * `format-version`: defines which format to use. Default is _1_. diff --git a/expected/skip_empty_xacts.out b/expected/skip_empty_xacts.out new file mode 100644 index 0000000..37a0046 --- /dev/null +++ b/expected/skip_empty_xacts.out @@ -0,0 +1,61 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +DROP TABLE IF EXISTS tbl; +CREATE TABLE tbl (id int); +DROP VIEW IF EXISTS tbl_view; +NOTICE: view "tbl_view" does not exist, skipping +CREATE MATERIALIZED VIEW tbl_view AS select * from tbl; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +-- Now slot should have zero changes +SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + ?column? +---------- + t +(1 row) + +-- Refreshing Materialized Views generates empty transactions +REFRESH MATERIALIZED VIEW tbl_view; +-- Now slot should have one change +SELECT count(*) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + ?column? +---------- + t +(1 row) + +REFRESH MATERIALIZED VIEW tbl_view; +-- The plugin should ignore the empty transaction +SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1'); + ?column? +---------- + t +(1 row) + +REFRESH MATERIALIZED VIEW tbl_view; +-- Writing transactions in chunks should work too +SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, + 'skip-empty-xacts', '1', 'write-in-chunks', '1'); + ?column? +---------- + t +(1 row) + +REFRESH MATERIALIZED VIEW tbl_view; +-- Write in chunks still works +SELECT count(*) = 2 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'write-in-chunks', '1'); + ?column? +---------- + t +(1 row) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/sql/skip_empty_xacts.sql b/sql/skip_empty_xacts.sql new file mode 100644 index 0000000..df88790 --- /dev/null +++ b/sql/skip_empty_xacts.sql @@ -0,0 +1,38 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; + +DROP TABLE IF EXISTS tbl; +CREATE TABLE tbl (id int); + +DROP VIEW IF EXISTS tbl_view; +CREATE MATERIALIZED VIEW tbl_view AS select * from tbl; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +-- Now slot should have zero changes +SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + +-- Refreshing Materialized Views generates empty transactions +REFRESH MATERIALIZED VIEW tbl_view; + +-- Now slot should have one change +SELECT count(*) = 1 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + +REFRESH MATERIALIZED VIEW tbl_view; + +-- The plugin should ignore the empty transaction +SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1'); + +REFRESH MATERIALIZED VIEW tbl_view; + +-- Writing transactions in chunks should work too +SELECT count(*) = 0 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, + 'skip-empty-xacts', '1', 'write-in-chunks', '1'); + +REFRESH MATERIALIZED VIEW tbl_view; +-- Write in chunks still works +SELECT count(*) = 2 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'write-in-chunks', '1'); + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); \ No newline at end of file diff --git a/wal2json.c b/wal2json.c index 35f32bc..d13ceee 100644 --- a/wal2json.c +++ b/wal2json.c @@ -43,6 +43,7 @@ typedef struct bool include_type_oids; /* include data type oids */ bool include_typmod; /* include typmod in types */ bool include_not_null; /* include not-null constraints */ + bool skip_empty_xacts; /* skip empty transactions */ bool pretty_print; /* pretty-print JSON? */ bool write_in_chunks; /* write in chunks? */ @@ -141,6 +142,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is data->include_types = true; data->include_type_oids = false; data->include_typmod = true; + data->skip_empty_xacts = false; data->pretty_print = false; data->write_in_chunks = false; data->include_lsn = false; @@ -264,6 +266,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "skip-empty-xacts") == 0) + { + if (elem->arg == NULL) + { + elog(DEBUG1, "skip-empty-xacts argument is null"); + data->skip_empty_xacts = false; + } + else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else if (strcmp(elem->defname, "pretty-print") == 0) { if (elem->arg == NULL) @@ -417,14 +432,11 @@ pg_decode_shutdown(LogicalDecodingContext *ctx) MemoryContextDelete(data->context); } -/* BEGIN callback */ -static void -pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +/* Write transaction header (xid, nextlsn...) */ +static void output_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { JsonDecodingData *data = ctx->output_plugin_private; - - data->nr_changes = 0; - + /* Transaction starts */ OutputPluginPrepareWrite(ctx, true); @@ -451,6 +463,18 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginWrite(ctx, true); } +/* BEGIN callback */ +static void +pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + JsonDecodingData *data = ctx->output_plugin_private; + + data->nr_changes = 0; + + if (!data->skip_empty_xacts) + output_begin(ctx, txn); +} + /* COMMIT callback */ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -458,6 +482,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { JsonDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && data->nr_changes == 0) + return; + if (txn->has_catalog_changes) elog(DEBUG2, "txn has catalog changes: yes"); else @@ -926,6 +953,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Change counter */ data->nr_changes++; + if (data->skip_empty_xacts && data->nr_changes == 1) + output_begin(ctx, txn); + /* if we don't write in chunks, we need a newline here */ if (!data->write_in_chunks) appendStringInfo(ctx->out, "%s", data->nl);