Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain indexes when updating pglogical.sequence_state. #435

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ SCRIPTS_built = pglogical_create_subscriber

REGRESS = preseed infofuncs init_fail init preseed_check basic extended conflict_secondary_unique \
toasted replication_set add_table matview bidirectional primary_key \
interfaces foreign_key functions copy triggers parallel row_filter \
interfaces foreign_key functions copy sequence triggers parallel row_filter \
row_filter_sampling att_list column_filter apply_delay multiple_upstreams \
node_origin_cascade drop

Expand Down
57 changes: 57 additions & 0 deletions expected/sequence.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-- like bt_index_check('pglogical.sequence_state', true)
CREATE FUNCTION heapallindexed() RETURNS void AS $$
DECLARE
count_seqscan int;
count_idxscan int;
BEGIN
count_seqscan := (SELECT count(*) FROM pglogical.sequence_state);
SET enable_seqscan = off;
count_idxscan := (SELECT count(*) FROM pglogical.sequence_state);
RESET enable_seqscan;
IF count_seqscan <> count_idxscan THEN
RAISE 'seqscan found % rows, but idxscan found % rows',
count_seqscan, count_idxscan;
END IF;
END
$$ LANGUAGE plpgsql;
-- Replicate one sequence.
CREATE SEQUENCE stress;
SELECT * FROM pglogical.create_replication_set('stress_seq');
create_replication_set
------------------------
2261733486
(1 row)

SELECT * FROM pglogical.replication_set_add_sequence('stress_seq', 'stress');
replication_set_add_sequence
------------------------------
t
(1 row)

SELECT pglogical.synchronize_sequence('stress');
synchronize_sequence
----------------------
t
(1 row)

SELECT heapallindexed();
heapallindexed
----------------

(1 row)

-- Sync it 400 times in one transaction, to cross a pglogical.sequence_state
-- page boundary and get a non-HOT update.
DO $$
BEGIN
FOR i IN 1..400 LOOP
PERFORM pglogical.synchronize_sequence('stress');
END LOOP;
END;
$$;
SELECT heapallindexed();
heapallindexed
----------------

(1 row)

18 changes: 2 additions & 16 deletions pglogical_sequences.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ synchronize_sequences(void)
char *nspname;
char *relname;
StringInfoData json;
#if PG_VERSION_NUM >= 160000
TU_UpdateIndexes updateIndexes = TU_None;
#endif

CHECK_FOR_INTERRUPTS();

Expand All @@ -141,11 +138,7 @@ synchronize_sequences(void)
newseq->cache_size * 2);

newseq->last_value = last_value + newseq->cache_size;
#if PG_VERSION_NUM >= 160000
simple_heap_update(rel, &tuple->t_self, newtup, &updateIndexes);
#else
simple_heap_update(rel, &tuple->t_self, newtup);
#endif
CatalogTupleUpdate(rel, &tuple->t_self, newtup);

repsets = get_seq_replication_sets(local_node->node->id,
oldseq->seqoid);
Expand Down Expand Up @@ -203,9 +196,6 @@ synchronize_sequence(Oid seqoid)
char *relname;
StringInfoData json;
PGLogicalLocalNode *local_node = get_local_node(true, false);
#if PG_VERSION_NUM >= 160000
TU_UpdateIndexes updateIndexes = TU_None;
#endif

/* Check if the oid points to actual sequence. */
seqrel = table_open(seqoid, AccessShareLock);
Expand Down Expand Up @@ -241,11 +231,7 @@ synchronize_sequence(Oid seqoid)
last_value = sequence_get_last_value(seqoid);

newseq->last_value = last_value + newseq->cache_size;
#if PG_VERSION_NUM >= 160000
simple_heap_update(rel, &tuple->t_self, newtup, &updateIndexes);
#else
simple_heap_update(rel, &tuple->t_self, newtup);
#endif
CatalogTupleUpdate(rel, &tuple->t_self, newtup);

repsets = get_seq_replication_sets(local_node->node->id, seqoid);
repset_names = NIL;
Expand Down
34 changes: 34 additions & 0 deletions sql/sequence.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- like bt_index_check('pglogical.sequence_state', true)
CREATE FUNCTION heapallindexed() RETURNS void AS $$
DECLARE
count_seqscan int;
count_idxscan int;
BEGIN
count_seqscan := (SELECT count(*) FROM pglogical.sequence_state);
SET enable_seqscan = off;
count_idxscan := (SELECT count(*) FROM pglogical.sequence_state);
RESET enable_seqscan;
IF count_seqscan <> count_idxscan THEN
RAISE 'seqscan found % rows, but idxscan found % rows',
count_seqscan, count_idxscan;
END IF;
END
$$ LANGUAGE plpgsql;

-- Replicate one sequence.
CREATE SEQUENCE stress;
SELECT * FROM pglogical.create_replication_set('stress_seq');
SELECT * FROM pglogical.replication_set_add_sequence('stress_seq', 'stress');
SELECT pglogical.synchronize_sequence('stress');
SELECT heapallindexed();

-- Sync it 400 times in one transaction, to cross a pglogical.sequence_state
-- page boundary and get a non-HOT update.
DO $$
BEGIN
FOR i IN 1..400 LOOP
PERFORM pglogical.synchronize_sequence('stress');
END LOOP;
END;
$$;
SELECT heapallindexed();