Permalink
Browse files

Add test for serialization failure.

Should work on Postgres 8.4 and higher, and skip all on older versions. Works
by adding a sequence and a trigger to a target table, and configuring the
trigger to fire only when the `session_replica_role` is "replica". This means
that it fires when data is `COPY`ed into the table from the source.

The trigger calls a PL/pgSQL function that consults a sequence, and thows a
serialization failure exception when the next value from the sequence is odd.
It does not throw it when its value is even. The result is that the first
attempt to copy the data into the table throws the serialization exception,
but the second attempt does not.

The test checks that the serialization failure happens by listening for the
`syncsleep` notification. If it doesn't come, there was no serialization
failure, and the test fails. If there is, it then tests that the sync finished
anyway and that the new data was copied over.

Resolves #29.

In passing, I've also changed the formatting of the message about the duration
of the sleep to use a stringification of that time, rather than turning it
into an integer, since the new default is 0.5 (and using `%f` yieled too many
decimal places). And if the sleep time is 0, we don't call sleep and send
a slightly different message.

It might make sense to send the sleep nofication only if we're going to sleep,
and then to always send another message, say serialretry, just before trying
again. I think that would be cleaner, since it does not actually always sleep.
Thoughts?
  • Loading branch information...
theory committed Nov 2, 2012
1 parent e62a132 commit 3931056f15f3f6df9b089fd439c14ec38b66d841
Showing with 154 additions and 4 deletions.
  1. +7 −3 Bucardo.pm
  2. +103 −0 t/40-serializable.t
  3. +44 −1 t/BucardoTesting.pm
View
@@ -4578,8 +4578,12 @@ sub start_kid {
$err_handler->($err) unless first { $sync->{db}{$_}{dbh}->state eq '40001' } @dbs_dbi;
## We have a serialization failure and need to sleep on it.
- $self->glog((sprintf "Could not serialize, will sleep for %d %s",
- $sleeptime, 1==$sleeptime ? 'second' : 'seconds'), LOG_TERSE);
+ if ($sleeptime) {
+ $self->glog((sprintf "Could not serialize, will sleep for %s %s",
+ $sleeptime, 1==$sleeptime ? 'second' : 'seconds'), LOG_TERSE);
+ } else {
+ $self->glog('Could not serialize, will try again', LOG_TERSE);
+ }
## Roll everyone back
for my $dbname (@dbs_dbi) {
@@ -4596,7 +4600,7 @@ sub start_kid {
$maindbh->commit;
## Sleep and try again.
- sleep $sleeptime;
+ sleep $sleeptime if $sleeptime;
$kicked = 1;
redo RUNKID;
}
View
@@ -0,0 +1,103 @@
+#!/usr/bin/perl -w
+
+use strict;
+use warnings;
+use lib 't';
+use Test::More;
+use BucardoTesting;
+
+my $bct = BucardoTesting->new({location => 'postgres'})
+ or BAIL_OUT 'Creation of BucardoTesting object failed';
+
+END { $bct->stop_bucardo if $bct }
+
+my $dbh = $bct->connect_database('A');
+END { $dbh->disconnect if $dbh }
+
+# Skip the tests if we can't mock the serialization failure.
+plan skip_all => "Cannot mock serialization failure on Postgres $dbh->{pg_server_version}"
+ if $dbh->{pg_server_version} < 80400;
+
+# We are a go!
+plan tests => 27;
+$dbh->disconnect;
+$dbh = undef;
+
+ok my $dbhA = $bct->repopulate_cluster('A'), 'Populate cluster A';
+ok my $dbhB = $bct->repopulate_cluster('B'), 'Populate cluster B';
+ok my $dbhX = $bct->setup_bucardo('A'), 'Set up Bucardo';
+
+END { $_->disconnect for grep { $_ } $dbhA, $dbhB, $dbhX }
+
+# Teach Bucardo about the databases.
+for my $db (qw(A B)) {
+ my ($user, $port, $host) = $bct->add_db_args($db);
+ like $bct->ctl(
+ "bucardo add db $db dbname=bucardo_test user=$user port=$port host=$host"
+ ), qr/Added database "$db"/, qq{Add database "$db" to Bucardo};
+}
+
+# Let's just deal with table bucardo_test1 and bucardo_test2.
+for my $num (1, 2) {
+ like $bct->ctl("bucardo add table bucardo_test$num db=A relgroup=myrels"),
+ qr/Added the following tables/, "Add table bucardo_test$num";
+}
+
+# Create a new database group going from A to B
+like $bct->ctl('bucardo add dbgroup serial1 A:source B:target'),
+ qr/Created database group "serial1"/, 'Create relgroup serial1';
+
+# Create a sync for this group.
+like $bct->ctl('bucardo add sync serialtest1 relgroup=myrels dbs=serial1'),
+ qr/Added sync "serialtest1"/, 'Create sync "serialtest1"';
+
+# Set up a rule to mock a serialization failure on B.bucardo_test2.
+ok $bct->mock_serialization_failure($dbhB, 'bucardo_test2'),
+ 'Mock serialization failure on bucardo_test2';
+END {
+ $bct->unmock_serialization_failure($dbhB, 'bucardo_test2')
+ if $bct && $dbhB;
+}
+
+# Listen in on things.
+ok $dbhX->do('LISTEN bucardo_syncdone_serialtest1'),
+ 'Listen for syncdone';
+ok $dbhX->do('LISTEN bucardo_syncsleep_serialtest1'),
+ 'Listen for syncsleep';
+
+# Start up Bucardo.
+ok $bct->restart_bucardo($dbhX), 'Bucardo should start';
+
+ok $bct->wait_for_notice($dbhX, 'bucardo_syncdone_serialtest1'),
+ 'The sync should finish';
+
+# Should have no rows.
+$bct->check_for_row([], [qw(A B)], undef, 'test[12]$');
+
+# Let's add some data into A.bucardo_test1.
+ok $dbhA->do(q{INSERT INTO bucardo_test1 (id, data1) VALUES (1, 'foo')}),
+ 'Insert a row into test1';
+$dbhA->commit;
+
+ok $bct->wait_for_notice($dbhX, 'bucardo_syncdone_serialtest1'),
+ 'Second sync should finish';
+
+# The row should be in both databases.
+is_deeply $dbhB->selectall_arrayref(
+ 'SELECT id, data1 FROM bucardo_test1'
+), [[1, 'foo']], 'Should have the test1 row in B';
+
+# Excellent. Now let's insert into test2.
+ok $dbhA->do(q{INSERT INTO bucardo_test2 (id, data1) VALUES (2, 'foo')}),
+ 'Insert a row into test2';
+$dbhA->commit;
+
+ok $bct->wait_for_notice($dbhX, 'bucardo_syncsleep_serialtest1'),
+ 'Should get a syncsleep message';
+
+ok $bct->wait_for_notice($dbhX, 'bucardo_syncdone_serialtest1'),
+ 'Then the third sync should finish';
+
+is_deeply $dbhB->selectall_arrayref(
+ 'SELECT id, data1 FROM bucardo_test2'
+), [[2, 'foo']], 'Should have the B test2 row despite serialization failure';
View
@@ -814,6 +814,48 @@ ALTER TABLE bucardo_fkey1
} ## end of add_test_schema
+sub mock_serialization_failure {
+ my ($self, $dbh, $table) = @_;
+ return if $dbh->{pg_server_version} < 80401;
+ $table ||= 'bucardo_test1';
+
+ # Mock a serialization failure on every other INSERT. Runs only when
+ # `session_replica_role` is "replica", which it true for Bucardo targets.
+ $dbh->do(qq{
+ DROP SEQUENCE IF EXISTS serial_seq;
+ CREATE SEQUENCE serial_seq;
+
+ CREATE OR REPLACE FUNCTION mock_serial_fail(
+ ) RETURNS trigger LANGUAGE plpgsql AS \$_\$
+ BEGIN
+ IF nextval('serial_seq') % 2 = 0 THEN RETURN NEW; END IF;
+ RAISE EXCEPTION 'Serialization error'
+ USING ERRCODE = 'serialization_failure';
+ END;
+ \$_\$;
+
+ CREATE TRIGGER mock_serial_fail AFTER INSERT ON "$table"
+ FOR EACH ROW EXECUTE PROCEDURE mock_serial_fail();
+ ALTER TABLE "$table" ENABLE REPLICA TRIGGER mock_serial_fail;
+ });
+ $dbh->commit;
+
+ return 1;
+} ## end of mock_serialization_failure
+
+sub unmock_serialization_failure {
+ my ($self, $dbh, $table) = @_;
+ return if $dbh->{pg_server_version} < 80401;
+ $table ||= 'bucardo_test1';
+
+ $dbh->do(qq{
+ DROP TRIGGER IF EXISTS mock_serial_fail ON "$table";
+ DROP FUNCTION IF EXISTS mock_serial_fail();
+ DROP SEQUENCE IF EXISTS serial_seq;
+ });
+
+ return 1;
+} ## end of unmock_serialization_failure
sub add_test_databases {
@@ -1102,6 +1144,7 @@ sub wait_for_notice {
## 2. Seconds until we give up
## 3. Seconds we sleep between checks
## 4. Boolean: bail out if not found (defaults to true)
+ ## Returns true if the NOTIFY was recieved.
my $self = shift;
my $dbh = shift;
@@ -1144,7 +1187,7 @@ sub wait_for_notice {
return;
}
}
- return;
+ return 1;
} ## end of wait_for_notice

0 comments on commit 3931056

Please sign in to comment.