diff --git a/lib/extensions/ar_adapter/ar_pglogical/pglogical_raw.rb b/lib/extensions/ar_adapter/ar_pglogical/pglogical_raw.rb index ed0c7c16185..49a29c4e98b 100644 --- a/lib/extensions/ar_adapter/ar_pglogical/pglogical_raw.rb +++ b/lib/extensions/ar_adapter/ar_pglogical/pglogical_raw.rb @@ -302,6 +302,18 @@ def tables_in_replication_set(set_name) SQL end + def with_replication_set_lock(set_name) + connection.transaction(:requires_new => true) do + typed_exec(<<-SQL, set_name) + SELECT * + FROM pglogical.replication_set + WHERE set_name = $1 + FOR UPDATE + SQL + yield + end + end + private def typed_exec(sql, *params) diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index 97f635b4287..c2638313dc3 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -80,19 +80,17 @@ def create_replication_set # Aligns the contents of the 'miq' replication set with the currently configured vmdb excludes def refresh_excludes - # remove newly excluded tables from replication set - newly_excluded_tables.each do |table| - _log.info("Removing #{table} from #{REPLICATION_SET_NAME} replication set") - pglogical.replication_set_remove_table(REPLICATION_SET_NAME, table) - end + pglogical.with_replication_set_lock(REPLICATION_SET_NAME) do + # remove newly excluded tables from replication set + newly_excluded_tables.each do |table| + _log.info("Removing #{table} from #{REPLICATION_SET_NAME} replication set") + pglogical.replication_set_remove_table(REPLICATION_SET_NAME, table) + end - # add tables to the set which are no longer excluded (or new) - newly_included_tables.each do |table| - _log.info("Adding #{table} to #{REPLICATION_SET_NAME} replication set") - begin + # add tables to the set which are no longer excluded (or new) + newly_included_tables.each do |table| + _log.info("Adding #{table} to #{REPLICATION_SET_NAME} replication set") pglogical.replication_set_add_table(REPLICATION_SET_NAME, table, true) - rescue PG::UniqueViolation => e - _log.warn("Caught unique constraint error while adding #{table} to the replication set: #{e.message}") end end end diff --git a/spec/replication/util/ar_pglogical_spec.rb b/spec/replication/util/ar_pglogical_spec.rb index b35cfa42a56..4f227fb7510 100644 --- a/spec/replication/util/ar_pglogical_spec.rb +++ b/spec/replication/util/ar_pglogical_spec.rb @@ -226,6 +226,22 @@ expect(connection.pglogical.tables_in_replication_set(set_name)).to eq(["test"]) end end + + describe "#with_replication_set_lock" do + it "takes a lock on the replication set table" do + connection.pglogical.with_replication_set_lock(set_name) do + result = connection.exec_query(<<-SQL) + SELECT 1 + FROM pg_locks JOIN pg_class + ON pg_locks.relation = pg_class.oid + WHERE + pg_class.relname = 'replication_set' AND + pg_locks.mode = 'RowShareLock' + SQL + expect(result.count).to eq(1) + end + end + end end end end diff --git a/spec/replication/util/miq_pglogical_spec.rb b/spec/replication/util/miq_pglogical_spec.rb index 4a94bc1d127..df4e9bea36c 100644 --- a/spec/replication/util/miq_pglogical_spec.rb +++ b/spec/replication/util/miq_pglogical_spec.rb @@ -100,11 +100,6 @@ subject.refresh_excludes expect(subject.included_tables).to include(table) end - - it "continues if we attempt to add a table twice" do - expect(subject).to receive(:newly_included_tables).and_return([subject.included_tables.first]) - expect { subject.refresh_excludes }.not_to raise_error - end end end