Skip to content

Commit

Permalink
Allow transaction isolation level to be changed
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Sabino Mullane committed Mar 1, 2013
1 parent 00d8dad commit 1342eb3
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
7 changes: 6 additions & 1 deletion Bucardo.pm
Expand Up @@ -2877,6 +2877,11 @@ sub start_kid {
## Add a note to the syncrun table ## Add a note to the syncrun table
$sth{kid_syncrun_update_status}->execute("Begin txn (KID $$)", $syncname); $sth{kid_syncrun_update_status}->execute("Begin txn (KID $$)", $syncname);


## Figure out our isolation level. Only used for Postgres
## All others are hard-coded as 'serializable'
my $isolation_level = defined $sync->{isolation_level} ? $sync->{isolation_level} :
$config{isolation_level} || 'serializable';

## Commit so our dbrun and syncrun stuff is visible to others ## Commit so our dbrun and syncrun stuff is visible to others
## This should be done just before we start transactions on all dbs ## This should be done just before we start transactions on all dbs
$maindbh->commit(); $maindbh->commit();
Expand All @@ -2893,7 +2898,7 @@ sub start_kid {
$x->{dbh}->rollback(); $x->{dbh}->rollback();


if ($x->{dbtype} eq 'postgres') { if ($x->{dbtype} eq 'postgres') {
$x->{dbh}->do('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE READ WRITE'); $x->{dbh}->do(qq{SET TRANSACTION ISOLATION LEVEL $isolation_level READ WRITE});
$self->glog(qq{Set database "$dbname" to serializable read write}, LOG_DEBUG); $self->glog(qq{Set database "$dbname" to serializable read write}, LOG_DEBUG);
} }


Expand Down
36 changes: 29 additions & 7 deletions bucardo
Expand Up @@ -3639,6 +3639,7 @@ sub add_sync {
onetimecopy onetimecopy =0|1|2 null onetimecopy onetimecopy =0|1|2 null
lifetime lifetime interval null lifetime lifetime interval null
maxkicks maxkicks numeric null maxkicks maxkicks numeric null
isolation_level|txnmode isolation_level 0 null
rebuild_index|rebuildindex rebuild_index numeric null rebuild_index|rebuildindex rebuild_index numeric null
tables tables 0 skip tables tables 0 skip
}; };
Expand All @@ -3653,6 +3654,10 @@ sub add_sync {
value => 'fullcopy', value => 'fullcopy',
new_defaults => 'autokick|F stayalive|F kidsalive|F', new_defaults => 'autokick|F stayalive|F kidsalive|F',
}, },
{
field => 'isolation_level',
dash_to_white => 1,
}
]; ];


my ( $dbcols, $cols, $phs, $vals ) = process_simple_args({ my ( $dbcols, $cols, $phs, $vals ) = process_simple_args({
Expand Down Expand Up @@ -3998,7 +4003,10 @@ sub update_sync {
die "Invalid setting: $setting\n"; die "Invalid setting: $setting\n";
} }


## Normalize things like t/f 0/1 others? ## Do any magic we need
if ($setting eq 'isolation_level') {
$value =~ s/_/ /g;
}


## Try setting it ## Try setting it
$SQL = "UPDATE sync SET $setting=? WHERE name = ?"; $SQL = "UPDATE sync SET $setting=? WHERE name = ?";
Expand Down Expand Up @@ -8102,11 +8110,15 @@ sub process_simple_args {
for my $mline (@{$arg->{morph}}) { for my $mline (@{$arg->{morph}}) {
if (exists $mline->{field}) { if (exists $mline->{field}) {
next unless exists $dbcol{$mline->{field}}; next unless exists $dbcol{$mline->{field}};
next if $dbcol{$mline->{field}} ne $mline->{value}; if (exists $mline->{new_defaults}) {
for my $change (split /\s+/ => $mline->{new_defaults}) { for my $change (split /\s+/ => $mline->{new_defaults}) {
my ($f,$v) = split /\|/ => $change; my ($f,$v) = split /\|/ => $change;
next if exists $dbcol{$f}; next if exists $dbcol{$f};
$dbcol{$f} = $v; $dbcol{$f} = $v;
}
}
if (exists $mline->{dash_to_white}) {
$dbcol{$mline->{field}} =~ s/_/ /g;
} }
} }
else { else {
Expand Down Expand Up @@ -10077,7 +10089,7 @@ what to replicate from where to where. The supported parameters are:
=item C<name> =item C<name>
The name of the sync. The name of the sync. Required.
=item C<dbs> =item C<dbs>
Expand All @@ -10104,6 +10116,11 @@ Number of seconds a KID can live before being reaped.
Number of times a KID may be kicked before being reaped. Number of times a KID may be kicked before being reaped.
=item C<isolation_level>
The transation isolation level this sync should use.
Only choices are "serializable" and "repeatable read"
=item C<conflict_strategy> =item C<conflict_strategy>
The conflict resolution strategy to use in the sync. Supported values: The conflict resolution strategy to use in the sync. Supported values:
Expand Down Expand Up @@ -10449,6 +10466,11 @@ Directory to store the flatfile output inside of. Default: C<.>.
Regex to make sure we don't accidentally run where we should not. Default: None. Regex to make sure we don't accidentally run where we should not. Default: None.
=item C<isolation_level>
The transaction isolation level all sync should use. Defaults to 'serializable'.
The only other valid option is 'repeatable read'
=item C<kid_deadlock_sleep> =item C<kid_deadlock_sleep>
How long to sleep in seconds if we hit a deadlock error. Default: C<0.5>. How long to sleep in seconds if we hit a deadlock error. Default: C<0.5>.
Expand Down
5 changes: 5 additions & 0 deletions bucardo.schema
Expand Up @@ -161,6 +161,7 @@ default_conflict_strategy|bucardo_latest|Default conflict strategy for all syncs
email_debug_file||File to save a copy of all outgoing emails to email_debug_file||File to save a copy of all outgoing emails to
flatfile_dir|.|Directory to store the flatfile output inside of flatfile_dir|.|Directory to store the flatfile output inside of
host_safety_check||Regex to make sure we don't accidentally run where we should not host_safety_check||Regex to make sure we don't accidentally run where we should not
isolation_level|serializable|Default isolation level: can be serializable or repeatable read
piddir|/var/run/bucardo|Directory holding Bucardo PID files piddir|/var/run/bucardo|Directory holding Bucardo PID files
reason_file|bucardo.restart.reason.txt|File to hold reasons for stopping and starting reason_file|bucardo.restart.reason.txt|File to hold reasons for stopping and starting
semaphore_table|bucardo_status|Table to let apps know a sync is ongoing semaphore_table|bucardo_status|Table to let apps know a sync is ongoing
Expand Down Expand Up @@ -405,6 +406,7 @@ CREATE TABLE bucardo.sync (
onetimecopy SMALLINT NOT NULL DEFAULT 0, onetimecopy SMALLINT NOT NULL DEFAULT 0,
lifetime INTERVAL NULL, -- force controller and kids to restart lifetime INTERVAL NULL, -- force controller and kids to restart
maxkicks INTEGER NOT NULL DEFAULT 0, -- force controller and kids to restart maxkicks INTEGER NOT NULL DEFAULT 0, -- force controller and kids to restart
isolation_level TEXT NULL DEFAULT 'serializable',
cdate TIMESTAMPTZ NOT NULL DEFAULT now() cdate TIMESTAMPTZ NOT NULL DEFAULT now()
); );
COMMENT ON TABLE bucardo.sync IS $$Defines a single replication event from a herd to one or more target databases$$; COMMENT ON TABLE bucardo.sync IS $$Defines a single replication event from a herd to one or more target databases$$;
Expand All @@ -419,6 +421,9 @@ ALTER TABLE bucardo.dbgroup ADD CONSTRAINT dbgroup_name_sane CHECK (name ~ E'^[a
ALTER TABLE bucardo.sync ADD CONSTRAINT sync_name_sane ALTER TABLE bucardo.sync ADD CONSTRAINT sync_name_sane
CHECK (name ~ E'^[a-zA-Z]\\w*$' AND lower(name) NOT IN ('pushdelta','fullcopy','swap')); CHECK (name ~ E'^[a-zA-Z]\\w*$' AND lower(name) NOT IN ('pushdelta','fullcopy','swap'));


ALTER TABLE bucardo.sync ADD CONSTRAINT sync_isolation_level
CHECK (isolation_level IS NULL OR (LOWER(isolation_level) IN ('serializable', 'repeatable read')));

CREATE SEQUENCE bucardo.customcode_id_seq; CREATE SEQUENCE bucardo.customcode_id_seq;
CREATE TABLE bucardo.customcode ( CREATE TABLE bucardo.customcode (
id INTEGER NOT NULL DEFAULT nextval('customcode_id_seq'), id INTEGER NOT NULL DEFAULT nextval('customcode_id_seq'),
Expand Down

0 comments on commit 1342eb3

Please sign in to comment.