#!/usr/bin/perl -- -*-cperl-*-
## Script to control Bucardo
##
## Copyright 2006-2009 Greg Sabino Mullane <greg@endpoint.com>
package bucardo_ctl;
use strict;
use warnings;
use 5.008003;
use DBI;
use IO::Handle;
use Getopt::Long;
use Time::HiRes 'sleep';
use POSIX qw/ceil setsid/;
use Data::Dumper;
return 1 if $ENV{BUCARDO_CTL_TEST};
our $VERSION = '3.2.7';
*STDOUT->autoflush(1);
*STDERR->autoflush(1);
use vars qw/$dbh $SQL $sth %sth $count $info/;
my $DEBUG = 0;
my $DATEFORMAT = q{Mon DD, YYYY HH24:MI:SS};
my $DEFAULT_DAYSBACK = 3;
my $WAITSLEEP = 1;
my $PROGRESS = 1;
help() unless @ARGV;
## Default arguments - most are for the bc constructor
my $bcargs = {
ctlquiet => 0,
ctlverbose => 0,
dbname => 'bucardo',
dbuser => 'bucardo',
dbpass => '',
verbose => 1, ## Highly recommended to leave this on
sendmail => 0,
extraname => '',
debugfilesep => 0,
debugdir => '.',
debugname => '',
debugstderr => 1,
debugstdout => 0,
debugsyslog => 1,
debugfile => 1,
cleandebugs => 0,
};
## Values are first read from a .bucardorc, either in the current dir, or the home dir.
## These will be overwritten by command-line args.
my $file;
if (-e '.bucardorc') {
$file = '.bucardorc';
}
elsif (-e "$ENV{HOME}/.bucardorc") {
$file = "$ENV{HOME}/.bucardorc";
}
if (defined $file) {
open my $rc, '<', $file or die qq{Could not open "$file": $!\n};
while (<$rc>) {
next if /^\s*#/;
next unless /^\s*(\w+)\s*=\s*(.+?)\s*$/o;
my ($name,$value) = ($1,$2); ## no critic (ProhibitCaptureWithoutTest)
$bcargs->{$name} = $value;
}
close $rc or die;
}
GetOptions ## no critic (ProhibitCallsToUndeclaredSubs)
($bcargs,
'ctlverbose+',
'ctlquiet+',
'notimer',
'help',
'daysback=i',
'sort=i',
'showdays',
'compress',
'retry=i',
'retrysleep=i',
## These are sent to the constructor:
'dbport=i',
'dbhost=s',
'dbname=s',
'dbuser=s',
'dbpass=s',
'verbose=i',
'sendmail=i',
'extraname=s',
'debugfilesep',
'debugname=s',
'debugstderr=i',
'debugstdout=i',
'debugsyslog=i',
'debugdir=s',
'debugfile=i',
'cleandebugs=i',
) or die "\n";
my $QUIET = delete $bcargs->{ctlquiet};
help() if $bcargs->{help};
my $VERBOSE = delete $bcargs->{ctlverbose};
my $DAYSBACK = delete $bcargs->{daysback} || $DEFAULT_DAYSBACK;
my $COMPRESS = delete $bcargs->{compress};
my $SHOWDAYS = delete $bcargs->{showdays} || 1;
my $RETRY = delete $bcargs->{retry} || 0;
my $RETRYSLEEP = delete $bcargs->{retrysleep} || 0;
my $NOTIMER = delete $bcargs->{notimer} || 0;
my $DBCONN = "User: $bcargs->{dbuser} Database: $bcargs->{dbname}";
$bcargs->{dbhost} and length $bcargs->{dbhost} and $DBCONN .= " Host: $bcargs->{dbhost}";
$bcargs->{dbport} and length $bcargs->{dbport} and $DBCONN .= " Port: $bcargs->{dbport}";
## Anything left over is the verb and noun(s)
my $verb = shift || '';
help() unless $verb;
$verb = lc $verb;
my @nouns = @ARGV;
my $nouns = join ' ' => @nouns;
## Grab current information from the bucardo_config file
my $DSN = "dbi:Pg:dbname=$bcargs->{dbname}";
$bcargs->{dbhost} and length $bcargs->{dbhost} and $DSN .= ";host=$bcargs->{dbhost}";
$bcargs->{dbport} and length $bcargs->{dbport} and $DSN .= ";port=$bcargs->{dbport}";
$dbh = DBI->connect($DSN, $bcargs->{dbuser}, $bcargs->{dbpass}, {AutoCommit=>0,RaiseError=>1,PrintError=>0});
$dbh->do("SET search_path = bucardo");
$dbh->commit();
my $REASONFILE = get_config('reason_file') or die "Invalid reason_file!\n";
my $PIDDIR = get_config('piddir') or die "Invalid piddir!\n";
my $pidfile = get_config('pidfile') or die "Invalid pidfile!\n";
my $stopfile = get_config('stopfile') or die "Invalid stopfile!\n";
my $PIDFILE = "$PIDDIR/$pidfile";
my $REASONFILE_LOG = "$REASONFILE.log";
my $STOPFILE = "$PIDDIR/$stopfile";
## Handle all the verbs
status_all() if $verb eq 'status' and ! @nouns;
status_detail() if $verb eq 'status';
start() if $verb eq 'start';
stop() if $verb eq 'stop';
reload_config() if $verb eq 'reload_config';
ping() if $verb eq 'ping';
add_item() if $verb eq 'add';
alter_item() if $verb eq 'alter';
remove_item() if $verb eq 'remove';
upgrade() if $verb eq 'upgrade';
message() if $verb eq 'message';
list() if $verb eq 'list';
config() if $verb eq 'set' or $verb eq 'show';
## For the rest, we expect a list of sync with an optional decimal "timeout"
my $adverb;
my $syncs = get_syncs();
my @syncs;
SYNCMATCH: for my $sync (@nouns) {
if ($sync =~ /^\d+$/) {
$adverb = $sync;
next SYNCMATCH;
}
if ($sync =~ /%/) {
my $tmp = $dbh->selectall_arrayref(qq{SELECT name FROM sync WHERE name LIKE '$sync'});
if ($#{ $tmp } >= 0) {
push @syncs, (map { $_->[0] } @$tmp);
next SYNCMATCH;
}
}
if (! exists $syncs->{$sync}) {
die qq{Sync "$sync" does not appear to exist\n};
}
push @syncs, $sync;
}
vate_sync() if $verb eq 'activate' or $verb eq 'deactivate';
kick() if $verb eq 'kick';
help();
sub help {
warn qq{Usage: $0 args
start <reason> ** force existing sync(s) to quit, then start
stop <reason> ** tell everyone to stop permanently, even the MCP
list <type> ** view short info about dbs, herds, goats, or syncs
add <item_type> <item_name>
** add a new database, table, sync, herd, or database group
add database <dbname> name=internal_name port=xxx host=xxx user=xxx pass=xxx service=xxx conn=xxx sourcelimit=xxx targetlimit=xxx
add table [schema].table db=internal_db_name ping=bool standard_conflict=xxx makedelta=bool
add sync syncname source=xxx targetdb=xxx targetgroup=xxx type=xxx makedelta=xxx
add herd name
add dbgroup name db1 db2 db3 ...
kick <syncname(s)> [timeout]
** kick off one or more syncs, optionally wait for result (0 = wait until done)
ping [timeout] ** ping the MCP for a response
status [--sort=col#] [--daysback=#]
** list information about all syncs
status syncname[s] [--daysback=#]
** list detailed information about one or more syncs
reload_config ** force a running Bucardo to reload the bucardo_config table
For more details, try 'man bucardo_ctl'
} unless $QUIET; ## Mostly for the test suite
exit 0;
}
sub start {
## Attempt to start Bucardo
## Refuse to go on if we get a ping response within 5 seconds
$QUIET or print "Checking for existing processes\n";
my $oldpid = ping({quiet => 1, timeout => 5, noexit => 1});
if ($oldpid) {
$QUIET or print "Cannot start, process $oldpid is already running\n";
exit 1;
}
## Create a new Bucardo instance and connect to its database
require Bucardo;
my $bc = Bucardo->new($bcargs);
my $pm_version = $bc->{version} || 'unknown';
if ($VERSION ne $pm_version) {
die "Version mismatch: bucardo_ctl is $VERSION, but Bucardo.pm is $pm_version\n";
}
set_reason();
stop_bucardo();
sleep 2; ## Give everyone a chance to notice it
## TODO: be smarter about this by scanning PIDDIR
if (-e $STOPFILE) {
print "Removing $STOPFILE\n" unless $QUIET;
unlink $STOPFILE;
}
if (-e $PIDFILE) {
print "Removing $STOPFILE\n" unless $QUIET;
unlink $PIDFILE;
}
$QUIET or print qq{Starting Bucardo\n};
$dbh->disconnect();
if (fork) {
}
else {
close STDERR;
close STDOUT;
setsid() or die;
$bc->start_mcp();
}
exit 0;
}
sub stop {
## Attempt to stop Bucardo
set_reason();
print "Creating $STOPFILE ... " unless $QUIET;
stop_bucardo();
print "Done\n" unless $QUIET;
exit 0;
}
sub reload_config {
## Reload configuration settings from the DB, restart all controllers and kids
for (@nouns) {
if (/^\d+$/) {
$adverb = $1;
last;
}
}
$QUIET or print qq{Forcing Bucardo to reload the bucardo_config table};
my $done = 'bucardo_reload_config_finished';
$dbh->do('NOTIFY bucardo_reload_config');
if (defined $adverb) {
print '...';
$dbh->do("LISTEN $done");
}
$dbh->commit();
if (!defined $adverb) {
print "\n";
exit 0;
}
sleep 0.1;
WAITIN: {
while (my $notify = $dbh->func('pg_notifies')) {
my ($name, $pid) = @$notify;
last WAITIN if $name eq $done;
}
$dbh->commit();
sleep($WAITSLEEP);
redo;
}
print "DONE!\n";
exit 0;
} ## end of reload_config
sub ping {
## See if the MCP is alive and responds to pings
## Default is to wait 15 seconds
my $arg = shift || {};
my $timeout = $arg->{timeout} || $ARGV[0] || 15;
my $quiet = $arg->{quiet} || $ARGV[1] || 0;
if (defined $nouns[0] and $nouns[0] =~ /^\d+$/) {
$timeout = $nouns[0];
}
$VERBOSE and print "Pinging MCP, timeout = $timeout\n";
$dbh->do('LISTEN bucardo_mcp_pong');
$dbh->do('NOTIFY bucardo_mcp_ping');
$dbh->commit();
my $starttime = time;
sleep 0.1;
P:{
my $notify = $dbh->func('pg_notifies');
if (defined $notify) {
my ($name, $pid) = @$notify;
$quiet or print "OK: Got response from PID $pid\n";
return $pid if $arg->{noexit};
exit 0;
}
$dbh->rollback();
sleep 0.5;
my $totaltime = time - $starttime;
if ($timeout and $totaltime >= $timeout) {
$quiet or print "CRITICAL: Timed out ($totaltime s), no ping response from MCP\n";
return 0 if $arg->{noexit};
exit 1;
}
redo;
}
return;
} ## end of ping
sub kick {
@nouns or die qq{kick requires at least one sync name\n};
my ($exitstatus, $retries, $do_retry) = (0,0,0);
RETRY: {
$dbh->rollback();
$exitstatus = 0;
SYNC: for my $sync (@syncs) {
$dbh->do(qq{NOTIFY "bucardo_kick_sync_$sync"});
my $done = "bucardo_syncdone_$sync";
my $killed = "bucardo_synckill_$sync";
if (! defined $adverb) {
$dbh->commit();
$QUIET or print qq{Kicked sync $sync\n};
next;
}
$QUIET or print qq{Kick $sync: };
$dbh->do(qq{LISTEN "$done"});
$dbh->do(qq{LISTEN "$killed"});
my $s = $syncs->{$sync};
if ($s->{targetgroup}) {
for (@{$s->{targets}}) {
$dbh->do(qq{LISTEN "bucardo_syncdone_${sync}_$_"});
$dbh->do(qq{LISTEN "bucardo_synckill_${sync}_$_"});
}
}
else {
$dbh->do(qq{LISTEN "bucardo_syncdone_${sync}_$s->{targetdb}"});
$dbh->do(qq{LISTEN "bucardo_synckill_${sync}_$s->{targetdb}"});
}
$dbh->commit();
my $time = time;
sleep 0.1;
my $timeout = (defined $adverb and $adverb > 0) ? $adverb : 0;
my $printstring = $NOTIMER ? '' : '[0 s] ';
print $printstring unless $QUIET or $NOTIMER;
my $oldtime = 0;
local $SIG{ALRM} = sub { die 'Timed out' };
$do_retry = 0;
eval {
if ($timeout) {
alarm $timeout;
}
WAITIN: {
my $lastwait = '';
if ($PROGRESS and time - $time != $oldtime) {
$oldtime = time - $time;
if (!$QUIET and !$NOTIMER) {
print "\b" x length($printstring);
$printstring =~ s/\d+/$oldtime/;
print $printstring;
}
}
W: while (my $notify = $dbh->func('pg_notifies')) {
my ($name, $pid) = @$notify;
if ($name eq $done) {
$lastwait = 'DONE!';
}
elsif ($name eq $killed) {
$lastwait = 'KILLED!';
$exitstatus = 2;
}
elsif ($name =~ /^bucardo_syncdone_${sync}_(.+)$/) {
my $new = sprintf "$1(%ds) ", ceil(time-$time);
print $new unless $QUIET;
$printstring .= $new;
}
elsif ($name =~ /^bucardo_synckill_${sync}_(.+)$/) {
my $new = sprintf "$1 KILLED (%ds) ", ceil(time-$time);
print $new unless $QUIET;
$printstring .= $new;
$exitstatus = 2;
$lastwait = " ";
}
}
$dbh->rollback();
if ($lastwait) {
print $lastwait unless $QUIET;
if ($lastwait ne 'DONE!' and $RETRY and ++$retries <= $RETRY) {
print "Retry #$retries\n";
$do_retry = 1;
die "Forcing eval to exit for retry attempt\n";
}
last WAITIN;
}
sleep($WAITSLEEP);
redo WAITIN;
}
alarm 0 if $timeout;
};
alarm 0 if $timeout;
if ($do_retry) {
$do_retry = 0;
redo RETRY;
}
if ($@) {
if ($@ =~ /Timed out/o) {
$exitstatus = 1;
warn "Timed out!\n";
}
else {
$exitstatus = 2;
warn "Error: $@\n";
}
next SYNC;
}
next SYNC if $QUIET;
if ($PROGRESS) {
print "\n";
}
else {
printf "(%ds)\n", ceil(time - $time);
}
} ## end each sync
} ## end RETRY
exit $exitstatus;
} ## end of kick
sub status_all {
## Show status of all syncs in the database
print "Days back: $DAYSBACK $DBCONN ";
## See if the MCP is running and what its PID is
if (! -e $PIDFILE) {
print "\nBucardo may not be running. No file found at $PIDFILE";
}
else {
my $fh;
if (!open $fh, '<', $PIDFILE) {
print "\nERROR: Could not open $PIDFILE: $!";
}
else {
my $pid = <$fh>;
chomp $pid;
close $fh or warn qq{Could not close $PIDFILE: $!\n};
if ($pid =~ /^\d+$/) {
print "PID of Bucardo MCP: $pid";
}
else {
print "\nERROR: $PIDFILE contained: $pid";
}
}
}
print "\n";
my $orderby = $bcargs->{sort} || '1';
if ($orderby !~ /^\+?\-?\d$/) {
die "Invalid sort option, must be +- 1 through 9\n";
}
our $syncs;
my $max;
($syncs,$max) = get_detailed_syncs();
printf qq{%-*s %-*s %-*s %-*s %-*s %-*s %-*s %-*s %-*s\n},
$max->{name},'Name',
$max->{type}, 'Type',
$max->{stat}, 'State',
$max->{pid}, 'PID',
$max->{good}, 'Last_good',
$max->{total}, 'Time',
$max->{iud}, 'I/U/D',
$max->{bad}, 'Last_bad',
$max->{totalbad}, 'Time';
printf qq{%s+%s+%s+%s+%s+%s+%s+%s+%s\n},
'=' x $max->{name},
'=' x $max->{type},
'=' x $max->{stat},
'=' x $max->{pid},
'=' x $max->{good},
'=' x $max->{total},
'=' x $max->{iud},
'=' x $max->{bad},
'=' x $max->{totalbad};
## If fancy sorting desired, call the list ourself to sort
sub sortme {
my $sortcol = $bcargs->{sort} || 1;
+1 == $sortcol and return $a cmp $b;
-1 == $sortcol and return $b cmp $a;
my ($uno,$dos) = ($syncs->{$a}, $syncs->{$b});
## Synctype
+2 == $sortcol and return ($uno->{synctype} cmp $dos->{synctype} or $a cmp $b);
-2 == $sortcol and return ($dos->{synctype} cmp $uno->{synctype} or $a cmp $b);
## Status
+3 == $sortcol and return ($uno->{state} cmp $dos->{state} or $a cmp $b);
-3 == $sortcol and return ($dos->{state} cmp $uno->{state} or $a cmp $b);
## PID
+4 == $sortcol and return $uno->{PID} <=> $dos->{PID};
-4 == $sortcol and return $dos->{PID} <=> $uno->{PID};
## Last good
+5 == $sortcol and return ($uno->{lastgoodsecs} <=> $dos->{lastgoodsecs} or $a cmp $b);
-5 == $sortcol and return ($dos->{lastgoodsecs} <=> $uno->{lastgoodsecs} or $a cmp $b);
## Good time
+6 == $sortcol and return ($uno->{lastgoodtime} <=> $dos->{lastgoodtime} or $a cmp $b);
-6 == $sortcol and return ($dos->{lastgoodtime} <=> $uno->{lastgoodtime} or $a cmp $b);
if ($sortcol == 7 or $sortcol == -7) {
my ($total1,$total2) = (0,0);
while ($uno->{iud} =~ /(\d+)/go) {
$total1 += $1;
}
while ($dos->{iud} =~ /(\d+)/go) {
$total2 += $1;
}
7 == $sortcol and return ($total1 <=> $total2 or $a cmp $b);
return ($total2 <=> $total1 or $a cmp $b);
}
## Last bad
+8 == $sortcol and return ($uno->{lastbadsecs} <=> $dos->{lastbadsecs} or $a cmp $b);
-8 == $sortcol and return ($dos->{lastbadsecs} <=> $uno->{lastbadsecs} or $a cmp $b);
## Bad time
+9 == $sortcol and return ($uno->{lastbadtime} <=> $dos->{lastbadtime} or $a cmp $b);
-9 == $sortcol and return ($dos->{lastbadtime} <=> $uno->{lastbadtime} or $a cmp $b);
return $a cmp $b;
}
for my $sync (sort sortme keys %$syncs) {
my $s = $syncs->{$sync};
my $X = '|';
printf qq{%-*s$X%-*s$X%-*s$X%-*s$X%-*s$X%-*s$X%-*s$X%-*s$X%-*s\n},
$max->{name},$sync,
$max->{type},$s->{typetext},
$max->{stat}, $s->{state},
$max->{pid}, $s->{PID} || '',
$max->{good}, $s->{lastgood},
$max->{total}, $s->{timegood},
$max->{iud}, $s->{iud},
$max->{bad}, $s->{lastbad},
$max->{totalbad}, $s->{timebad};
}
exit 0;
} ## end of status_all
sub status_detail {
## Show detailed information about one or more syncs
## Verify that all named syncs exist
my ($syncs,$max) = get_detailed_syncs({syncs => \@nouns});
my @syncs;
for my $sync (sort @nouns) {
$DEBUG and warn "Verify sync: $sync\n";
if (!exists $syncs->{$sync}) {
## If a number, skip for ease of "kick name #" toggling
$sync !~ /^\d+$/ and die "No such sync: $sync\n";
}
else {
push @syncs => $sync;
}
}
print "Days back: $DAYSBACK $DBCONN\n";
## Present each ordered by name
for my $sync (sort { lc $a cmp lc $b } @syncs) {
print '=' x 70; print "\n";
my $s = $syncs->{$sync};
## Undefined should be written as 'none'
for (qw/checktime/) {
$s->{$_} ||= 'none';
}
## Not true should be empty
for (qw/PID PIDFILE CREATED/) {
$s->{$_} ||= '';
}
my $morepid = '';
if ($s->{stayalive} or $s->{kidsalive}) {
$morepid = " (PID = $s->{PID})";
}
## Should be 'yes' or 'no'
for (qw/makedelta analyze_after_copy stayalive kidsalive ping rebuild_index do_listen usecustomselect/) {
$s->{$_} = (defined $s->{$_} and $s->{$_}) ? 'yes' : 'no ';
}
my $target;
if ($s->{targetdb}) {
$target = "Target database: $s->{targetdb}";
}
else {
$target = "Target group: $s->{targetgroup} (";
$target .= join ',' => @{$s->{targets}};
$target .= ')';
}
$s->{iud} =~ s#/# / #g;
$s->{state} =~ s/>/ > /;
my $moregood = '';
if ($s->{lastgood} ne 'unknown') {
$moregood .= " (time to run: $s->{timegood})";
$moregood .= "\nLast good time: $s->{latest_good}{last_ended_date}";
$moregood .= " Target: $s->{latest_good}{targetdb}";
}
my $morebad = '';
if ($s->{lastbad} ne 'unknown') {
$morebad = " (time to run: $s->{timebad})";
$morebad .= "\nLast bad time: $s->{latest_bad}{last_aborted_date}";
$morebad .= " Target: $s->{latest_bad}{targetdb}";
(my $why = $s->{latest_bad}{whydie}) =~ s/\s+$//;
$why =~ s/^/ /mg;
$why =~ s/^\s+//;
$morebad .= "\nLatest bad reason: $why";
}
print qq{Sync name: $sync
Current state: $s->{state}$morepid
Type: $s->{synctype}
Source database: $s->{source}
$target
Last good: $s->{lastgood}$moregood
Ins/Upd/Del: $s->{iud}
Last bad: $s->{lastbad}$morebad
PID file: $s->{PIDFILE}
PID file created: $s->{CREATED}
Status: $s->{status}
Limitdbs: $s->{limitdbs}
Priority: $s->{priority}
Checktime: $s->{checktime}
Overdue time: $s->{overdue}
Expired time: $s->{expired}
Stayalive: $s->{stayalive} Kidsalive: $s->{kidsalive}
Rebuild index: $s->{rebuild_index} Do_listen: $s->{do_listen}
Ping: $s->{ping} Makedelta: $s->{makedelta}
};
if ($s->{synctype} eq 'fullcopy'){
print qq{Custom select: $s->{usecustomselect} $s->{copyextra}\n};
print qq{Post-copy analyze: $s->{analyze_after_copy}\n};
print qq{Delete method: $s->{deletemethod}\n};
}
}
exit 0;
} ## end of status_detail
sub get_config {
my $name = shift;
$SQL = "SELECT value FROM bucardo_config WHERE lower(setting) = ?";
$sth = $dbh->prepare_cached($SQL);
$count = $sth->execute(lc $name);
if ($count < 1) {
$sth->finish();
die "Invalid bucardo_config setting: $name\n";
}
return $sth->fetchall_arrayref()->[0][0];
}
sub set_reason {
if (! length $nouns) {
warn qq{Please provide a reason. For example:\n$0 $verb "Adding new table -Greg"\n};
exit 1;
}
open my $fh, '>', $REASONFILE or die qq{Could not open "$REASONFILE": $!\n};
print $fh (scalar localtime) . " | $nouns\n";
close $fh or warn qq{Could not close $REASONFILE: $!\n};
open $fh, '>>', $REASONFILE_LOG or die qq{Could not open "$REASONFILE_LOG": $!\n};
print $fh (scalar localtime) . " | $verb | $nouns\n";
close $fh or warn qq{Could not close $REASONFILE_LOG: $!\n};
return 1;
}
sub stop_bucardo {
open my $stop, '>', $STOPFILE or die qq{Could not create "$STOPFILE": $!\n};
print {$stop} "Stopped by $0 on " . (scalar localtime) . "\n";
close $stop or warn qq{Could not close "$STOPFILE": $!\n};
return;
}
sub get_detailed_syncs {
my $arg = shift || {};
my $syncs = get_syncs();
## Now see exactly what's going on with it at this moment (according to q)
## First, we need to hard-code our time backwards
$SQL = "SELECT date (now() - '$DAYSBACK days'::interval)";
my $oldtime = $dbh->selectall_arrayref($SQL)->[0][0];
$dbh->do("SET constraint_exclusion = true");
## We want information about the last time it ran successfully
$SQL = qq{
SELECT *,
TO_CHAR(ended,'$DATEFORMAT') AS last_ended_date,
TO_CHAR(aborted,'$DATEFORMAT') AS last_aborted_date,
round(extract(epoch FROM ended-started)) AS total_time_ended,
round(extract(epoch FROM aborted-started)) AS total_time_aborted,
round(extract(epoch FROM now()-ended)) AS last_ended_secs,
round(extract(epoch FROM now()-aborted)) AS last_aborted_secs
FROM
(SELECT * FROM q WHERE sync = \$1 AND cdate >= '$oldtime'
UNION ALL
SELECT * FROM freezer.master_q WHERE sync = \$1 AND cdate >= '$oldtime') AS foo
WHERE ended is NOT NULL AND aborted IS NULL
ORDER BY ended DESC LIMIT 1;
};
$sth{latest_good} = $dbh->prepare($SQL);
## If the last one was an abort, find the latest good run:
$SQL =~ s/IS NULL/IS NOT NULL/;
$sth{latest_bad} = $dbh->prepare($SQL);
## Is it running right now?
$SQL = qq{
SELECT *,
round(extract(epoch FROM now()-cdate)) AS q1,
round(extract(epoch FROM now()-started)) AS q2
FROM q
WHERE sync = \$1
AND ended IS NULL
AND aborted IS NULL
ORDER BY started DESC LIMIT 1
};
$sth{active_now} = $dbh->prepare($SQL);
my %max = (
name => 4,
type => 5,
stat => 5,
pid => 3,
q1 => 6,
q2 => 6,
iud => 5,
good => 9,
total => 5,
bad => 8,
totalbad => 4,
);
for my $sync (sort keys %$syncs) {
if ($arg->{syncs}) {
next unless grep { $_ eq $sync } @{$arg->{syncs}}; ## no critic (ProhibitBooleanGrep)
}
$DEBUG and warn "Reading sync $sync...\n";
my $s = $syncs->{$sync};
## Normal types are too long
$s->{typetext} = ' ' . uc(substr $s->{synctype},0,1);
## Set some basic lengths
$max{name} = length($sync) if length($sync) > $max{name};
$s->{PID} = 0 if ! defined $s->{PID};
$max{pid} = length($s->{PID}) if length($s->{PID}) > $max{pid};
## Grab information from the q tables
$sth{latest_good}->execute($sync);
$s->{latest_good} = $sth{latest_good}->fetchall_arrayref({})->[0];
$sth{latest_bad}->execute($sync);
$s->{latest_bad} = $sth{latest_bad}->fetchall_arrayref({})->[0];
$sth{active_now}->execute($sync);
$s->{active_now} = $sth{active_now}->fetchall_arrayref({});
## Pretty up the times
$s->{lastgoodsecs} = $s->{lastgoodtime} = 0;
$s->{lastbadsecs} = $s->{lastbadtime} = 0;
$s->{lastgood} = $s->{lastbad} = 'unknown';
$s->{timegood} = $s->{timebad} = '';
$s->{iud} = '';
$s->{is_overdue} = $s->{is_expired} = '?';
if (defined $s->{latest_good}) {
my $g = $s->{latest_good};
$s->{iud} = "$g->{inserts}/$g->{updates}/$g->{deletes}";
$max{iud} = length($s->{iud}) if length($s->{iud}) > $max{iud};
$s->{timegood} = pretty_time($g->{total_time_ended});
$max{total} = length($s->{timegood}) if length($s->{timegood}) > $max{total};
$s->{lastgoodtime} = $g->{total_time_ended};
$s->{lastgood} = pretty_time($g->{last_ended_secs});
$max{good} = length($s->{lastgood}) if length($s->{lastgood}) > $max{good};
$s->{lastgoodsecs} = $g->{last_ended_secs};
$s->{is_overdue} = ($g->{last_ended_secs} > $s->{overdue_secs}) ? 'yes' : 'no';
$s->{is_expired} = ($g->{last_ended_secs} > $s->{expired_secs}) ? 'yes' : 'no';
}
$s->{is_expired} = 'no' if $s->{expired_secs} < 1;
$s->{is_overdue} = 'no' if $s->{overdue_secs} < 1;
if (defined $s->{latest_bad}) {
$s->{timebad} = pretty_time($s->{latest_bad}{total_time_aborted});
$max{totalbad} = length($s->{timebad}) if length($s->{timebad}) > $max{totalbad};
$s->{lastbadtime} = $s->{latest_bad}{total_time_aborted};
$s->{lastbad} = pretty_time($s->{latest_bad}{last_aborted_secs});
$max{bad} = length($s->{lastbad}) if length($s->{lastbad}) > $max{bad};
$s->{lastbadsecs} = $s->{latest_bad}{last_aborted_secs};
}
## Check for current activity
$s->{timeq1} = $s->{timeq2} = '';
for my $row (@{$s->{active_now}}) {
$s->{timeq} = $s->{timeq1} = pretty_time($row->{q1});
if (defined $row->{q2}) {
$s->{timeq} = $s->{timeq2} = pretty_time($row->{q2});
}
my $len = length('WAIT:') + length($s->{timeq});
if (length $s->{timeq2}) {
$len += (1 + length($s->{targetdb}));
}
$max{stat} = $len if $len > $max{stat};
last;
}
$s->{state} = length($s->{timeq2}) ? "RUN:$s->{timeq}>$s->{targetdb}" :
length($s->{timeq1}) ? "WAIT:$s->{timeq}" :
($s->{status} eq 'active') ? 'idle' : 'off';
## Is anything overdue or expired?
if ($s->{is_expired} eq 'yes') {
$s->{typetext} .= ' E!'
}
elsif ($s->{is_overdue} eq 'yes') {
$s->{typetext} .= ' O!';
}
elsif ($s->{is_expired} ne 'no') {
$s->{typetext} .= ' ?';
}
}
return $syncs, \%max;
} ## end of get_detailed_syncs
sub list {
my $msg = "Need to specify what to list: db, goat, herd, or sync\n";
if (!@nouns) {
die $msg;
}
my $thing = shift @nouns;
list_dbs() if $thing =~ /db/i or $thing eq 'd';
list_goats() if $thing =~ /goat/i or $thing eq 'g';
list_herds() if $thing =~ /herd/i or $thing eq 'h';
list_syncs() if $thing =~ /sync/i or $thing eq 's';
die $msg;
exit;
} ## end of list
sub list_dbs {
## Show information about all or some subset of the 'db' table
## If any nouns left, treat as wildcards
my $WHERE = '';
for my $term (@nouns) {
$WHERE .= sprintf qq{%s lower(name) ~ %s},
$WHERE ? ' OR' : 'WHERE',
$dbh->quote(lc $term);
}
$SQL = "SELECT * FROM db $WHERE ORDER BY name";
$sth = $dbh->prepare($SQL);
$count = $sth->execute();
if ($count < 1) {
$sth->finish();
printf "There are no%s entries in the 'db' table.\n",
$WHERE ? ' matching' : '';
exit;
}
$info = $sth->fetchall_arrayref({});
for my $row (@$info) {
print "Database: $row->{name} Status: $row->{status}\n";
if ($row->{sourcelimit} != 0 or $row->{targetlimit} != 0) {
print "Limits: source:$row->{sourcelimit} / target:$row->{targetlimit}\n";
}
print "Conn: psql -h $row->{dbhost} -p $row->{dbport} -U $row->{dbuser} -d $row->{dbname}\n";
print "\n";
}
exit;
} ## end of list_dbs
sub list_goats {
## Show information about all or some subset of the 'goat' table
## Keep to three lines or less if possible
## If any nouns left, treat as wildcards
my $WHERE = '';
for my $term (@nouns) {
$WHERE .= sprintf qq{%s lower(tablename) ~ %s},
$WHERE ? ' OR' : 'WHERE',
$dbh->quote(lc $term);
}
$SQL = "SELECT * FROM goat $WHERE ORDER BY schemaname, tablename";
$sth = $dbh->prepare($SQL);
$count = $sth->execute();
if ($count < 1) {
$sth->finish();
printf "There are no%s entries in the 'goat' table.\n",
$WHERE ? ' matching' : '';
exit;
}
$info = $sth->fetchall_arrayref({});
for my $row (@$info) {
printf "Table $row->{id}: $row->{schemaname}.$row->{tablename}%s\n",
$row->{ghost} ? ' GHOST TABLE' : '';
if ($row->{pkey}) {
print " PK: $row->{pkey} ($row->{pkeytype})\n";
}
if ($row->{customselect}) {
print " Custom select: $row->{customselect}\n";
}
printf " DB: %s | Ping: %s | Makedelta: %s | Has delta: %s\n",
$row->{db},
$row->{ping} ? 'Yes' : 'No',
$row->{makedelta} ? 'Yes' : 'No',
$row->{hasdelta} ? 'Yes' : 'No';
}
exit;
} ## end of list_goats
sub list_herds {
## Show information about all or some subset of the 'herd' table
$SQL = 'SELECT herd, schemaname, tablename FROM herdmap h JOIN goat g ON (g.id = h.goat)';
$sth = $dbh->prepare($SQL);
$sth->execute();
my $map = $sth->fetchall_arrayref({});
my $herdmap;
for my $row (@$map) {
push @{$herdmap->{$row->{herd}}}, "$row->{schemaname}.$row->{tablename}";
}
## If any nouns left, treat as wildcards
my $WHERE = '';
for my $term (@nouns) {
$WHERE .= sprintf qq{%s lower(name) ~ %s},
$WHERE ? ' OR' : 'WHERE',
$dbh->quote(lc $term);
}
$SQL = "SELECT * FROM herd $WHERE ORDER BY name";
$sth = $dbh->prepare($SQL);
$count = $sth->execute();
if ($count < 1) {
$sth->finish();
printf "There are no%s entries in the 'herd' table.\n",
$WHERE ? ' matching' : '';
exit;
}
$info = $sth->fetchall_arrayref({});
my ($namelen,$goatlen) = (5,1);
for my $row (@$info) {
my $num = @{$herdmap->{$row->{name}}};
$namelen = length $row->{name} if length $row->{name} > $namelen;
$goatlen = length $num if length $num > $goatlen;
}
for my $row (@$info) {
printf "Herd: %-*s Goats: %*d\n",
$namelen, $row->{name},
$goatlen, scalar @{$herdmap->{$row->{name}}};
}
## If requesting (or matching) a single herd, show the goats in detail
if (@nouns and $count == 1) {
for my $name (@{$herdmap->{$info->[0]{name}}}) {
print " $name\n";
}
}
exit;
} ## end of list_herds
sub list_syncs {
## Show information about all or some subset of the 'sync' table
## If any nouns left, treat as wildcards
my $WHERE = '';
for my $term (@nouns) {
$WHERE .= sprintf qq{%s lower(name) ~ %s},
$WHERE ? ' OR' : 'WHERE',
$dbh->quote(lc $term);
}
$SQL = "SELECT * FROM sync $WHERE ORDER BY name";
$sth = $dbh->prepare($SQL);
$count = $sth->execute();
if ($count < 1) {
$sth->finish();
printf "There are no%s entries in the 'sync' table.\n",
$WHERE ? ' matching' : '';
exit;
}
$info = $sth->fetchall_arrayref({});
my ($namelen,$goatlen) = (5,1);
for my $row (@$info) {
$namelen = length $row->{name} if length $row->{name} > $namelen;
}
for my $row (@$info) {
printf "Sync: %-*s %s %s=>%s %s\n",
$namelen,
$row->{name},
$row->{synctype},
$row->{source},
$row->{targetdb} ? $row->{targetdb} : $row->{targetgroup},
$row->{status} eq 'active' ? '' : "$row->{status}";
}
exit;
} ## end of list_syncs
sub pretty_time {
my $secs = shift;
return '?' if ! defined $secs or $secs !~ /^\-?\d+$/o or $secs < 0;
my ($D,$H,$M,$S) = (0,0,0,0);
if ($bcargs->{showdays}) {
if ($secs > 60*60*24) {
$D = int $secs/(60*60*24);
$secs -= $D*60*60*24;
}
}
if ($secs > 60*60) {
$H = int $secs/(60*60);
$secs -= $H*60*60;
}
if ($secs > 60) {
$M = int $secs/60;
$secs -= $M*60;
}
$secs = int $secs;
my $answer = sprintf "%s%s%s${secs}s",$D ? "${D}d " : '',$H ? "${H}h " : '',$M ? "${M}m " : '';
## Detailed listings get compressed
if ((defined $COMPRESS and $COMPRESS) or (!defined $COMPRESS and !@nouns)) {
$answer =~ s/ //g;
}
return $answer;
}
sub get_syncs {
my %dbgroup;
$SQL = "SELECT dbgroup, db FROM dbmap ORDER BY priority, db";
for my $row (@{$dbh->selectall_arrayref($SQL)}) {
push @{$dbgroup{$row->[0]}}, $row->[1];
}
$SQL = qq{
SELECT *,
COALESCE(EXTRACT(epoch FROM checktime),0) AS checksecs,
now()-overdue AS overdue_time,
now()-expired AS expired_time,
extract(epoch FROM overdue) AS overdue_secs,
extract(epoch FROM expired) AS expired_secs
FROM bucardo.sync
ORDER BY priority DESC, name DESC
};
$sth = $dbh->prepare($SQL);
$sth->execute();
my $sync = $sth->fetchall_hashref("name");
## Expand any targetgroups in use
for (keys %$sync) {
my $s = $sync->{$_};
if (defined $s->{targetgroup}) {
$s->{targets} = $dbgroup{$s->{targetgroup}};
}
}
## Check what exists in the pid directory
opendir my $sdir, $PIDDIR or die qq{Could not opendir "$PIDDIR": $!\n};
my $pidfile;
while (defined ($pidfile = readdir($sdir))) {
next if $pidfile =~ /^\.\.?$/
or "$PIDDIR/$pidfile" eq $STOPFILE
or "$PIDDIR/$pidfile" eq $PIDFILE;
if ($pidfile !~ /^bucardo_sync_(.+)\.pid$/) {
warn "Skipping unknown file in pid directory: $pidfile\n";
next;
}
my $syncname = $1; ## no critic (ProhibitCaptureWithoutTest)
## Is this a valid syncname?
if (! exists $sync->{$syncname}) {
warn qq{Invalid pid file found: $PIDDIR/$pidfile - removing it\n};
unlink "$PIDDIR/$pidfile";
next;
}
my $cdate = localtime ($^T - (-C "$PIDDIR/$pidfile")*24*60*60);
$sync->{$syncname}{PIDFILE} = "$PIDDIR/$pidfile";
$sync->{$syncname}{CREATED} = $cdate;
## Does it contain a pid?
open my $fh, '<', "$PIDDIR/$pidfile" or die qq{Could not open "$PIDDIR/$pidfile": $!\n};
my $pid = <$fh>;
chomp $pid;
if (! defined $pid) { $pid = ''; }
close $fh or warn qq{Could not close $PIDDIR/$pidfile: $!\n};
if ($pid !~ /^\d+$/) {
$sync->{$syncname}{NOPID} = 1;
}
else {
$sync->{$syncname}{PID} = $pid;
$sync->{$syncname}{PIDPING} = kill 0, $pid;
}
}
return $sync;
} ## end of get_syncs
sub vate_sync {
my $name = lc $verb;
my $ucname = ucfirst $name;
@nouns or die qq{${name}_sync requires at least one sync name\n};
my $wait = (defined $adverb and $adverb eq '0') ? 1 : 0;
for my $sync (@syncs) {
$QUIET or print qq{$ucname sync $sync};
my $done = "bucardo_${name}d_sync_$sync";
$dbh->do(qq{NOTIFY "bucardo_${name}_sync_$sync"});
if ($wait) {
print "...";
$dbh->do(qq{LISTEN "$done"});
}
$dbh->commit();
if (!$wait) {
print "\n";
next;
}
sleep 0.1;
WAITIN: {
while (my $notify = $dbh->func('pg_notifies')) {
my ($name, $pid) = @$notify;
last WAITIN if $name eq $done;
}
$dbh->commit();
sleep($WAITSLEEP);
redo;
}
print "OK\n";
} ## end each sync
exit 0;
}
sub add_item {
my $self = shift;
if (!@nouns) {
warn "Usage: add <item_type> <item_name>\n";
exit 1;
}
## First word must be a type we know about
my $type = shift @nouns;
$type = lc $type;
if ($type eq 'db' or $type eq 'database') {
add_database($type);
}
elsif ($type eq 'table' or $type eq 'goat') {
add_table($type);
}
elsif ($type eq 'sync') {
add_sync($type);
}
elsif ($type eq 'herd') {
add_herd($type);
}
elsif ($type eq 'dbgroup') {
add_dbgroup($type);
}
else {
warn "Cannot add: unknown type\n";
}
exit 1;
} ## end of add_item
sub alter_item {
my $self = shift;
if (!@nouns) {
warn "Usage: alter <item_type> <item_name>\n";
exit 1;
}
## First word must be a type we know about
my $type = shift @nouns;
$type = lc $type;
if ($type eq 'sync') {
alter_sync($type);
}
else {
warn "Cannot alter: unknown type\n";
}
exit 1;
} ## end of alter_item
sub remove_item {
my $self = shift;
if (!@nouns) {
warn "Usage: remove <item_type> <item_name>\n";
exit 1;
}
## First word must be a type we know about
my $type = shift @nouns;
$type = lc $type;
if ($type eq 'db' or $type eq 'database') {
remove_database($type);
}
elsif ($type eq 'table' or $type eq 'goat') {
remove_table($type);
}
elsif ($type eq 'sync') {
remove_sync($type);
}
elsif ($type eq 'herd') {
remove_herd($type);
}
else {
warn "Cannot add: unknown type\n";
}
exit 1;
} ## end of remove_item
sub clog {
my $msg = shift;
chomp $msg;
warn "$msg\n";
} ## end of clog
sub schema_exists {
my ($schema) = @_;
my $SQL = 'SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ?';
my $sth = $dbh->prepare_cached($SQL);
my $count = $sth->execute($schema);
$sth->finish();
return $count < 1 ? 0 : 1;
} ## end of schema_exists
sub relation_exists {
## Checks if a relation exists. Returns the oid or 0
my ($schema,$name) = @_;
my $SQL = 'SELECT c.oid FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n '.
'WHERE n.oid=c.relnamespace AND n.nspname = ? AND c.relname = ?';
my $sth = $dbh->prepare_cached($SQL);
my $count = $sth->execute($schema,$name);
if ($count == 1) {
return $sth->fetchall_arrayref()->[0][0];
}
$sth->finish();
return 0;
} ## end of relation_exists
sub constraint_exists {
my ($schema,$table,$constraint) = @_;
my $SQL = 'SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n, pg_catalog.pg_constraint o '.
'WHERE n.oid=c.relnamespace AND c.oid=o.conrelid AND n.nspname = ? AND c.relname = ? AND o.conname = ?';
my $sth = $dbh->prepare_cached($SQL);
my $count = $sth->execute($schema,$table,$constraint);
$sth->finish();
return $count < 1 ? 0 : 1;
} ## end of constraint_exists
sub column_exists {
my ($schema,$table,$column) = @_;
my $SQL = 'SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n, '.
'pg_catalog.pg_attribute a WHERE n.oid=c.relnamespace AND n.nspname = ? AND c.relname = ? '.
'AND a.attname = ? AND a.attrelid = c.oid';
my $sth = $dbh->prepare_cached($SQL);
my $count = $sth->execute($schema,$table,$column);
$sth->finish();
return $count < 1 ? 0 : 1;
} ## end of column_exists
sub trigger_exists {
my $name = shift;
my $SQL = 'SELECT 1 FROM pg_catalog.pg_trigger WHERE tgname = ?';
my $sth = $dbh->prepare_cached($SQL);
my $count = $sth->execute($name);
$sth->finish();
return $count < 1 ? 0 : 1;
} ## end of trigger_exists
sub column_default {
my ($schema,$table,$column) = @_;
my $SQL = 'SELECT pg_get_expr(adbin,adrelid) FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n, '.
'pg_catalog.pg_attribute a, pg_attrdef d '.
'WHERE n.oid=c.relnamespace AND n.nspname = ? AND c.relname = ? '.
'AND a.attname = ? AND a.attrelid = c.oid AND d.adnum = a.attnum AND d.adrelid = a.attrelid';
my $sth = $dbh->prepare_cached($SQL);
my $count = $sth->execute($schema,$table,$column);
if ($count eq '0E0') {
$sth->finish();
return '';
}
return $sth->fetchall_arrayref()->[0][0];
} ## end of column_default
sub column_value {
my ($schema,$table,$column,$where) = @_;
my $SQL = "SELECT $column FROM $schema.$table WHERE $where";
return $dbh->selectall_arrayref($SQL)->[0][0];
} ## end of column_value
sub table_definition {
## Pull the complete table definition from the bucardo.schema file
## Returns an arrayref of sequences, and the textual table def
my $name = shift;
my $def = '';
my $file = 'bucardo.schema';
open my $fh, '<', $file or die qq{Could not open "$file": $!\n};
my @seq;
while (<$fh>) {
if (!$def) {
if (/^CREATE TABLE $name/) {
$def .= $_;
}
}
else {
$def .= $_;
last if /^\);/;
}
}
close $fh or die qq{Could not close "$file": $!\n};
while ($def =~ /nextval\('(.+?)'/g) {
push @seq => $1;
}
return \@seq, $def;
} ## end of table_definition
sub upgrade_and_log {
my $action = shift;
my $short = shift || $action;
eval {
$dbh->do($action);
};
if ($@) {
my $line = (caller)[2];
die "From line $line, action $action\n$@\n";
}
$SQL = "INSERT INTO upgrade_log(action,version,summary) VALUES (?,?,?)";
$sth = $dbh->prepare($SQL);
$sth->execute($action,$VERSION,$short);
} ## end of upgrade_and_log
sub function_exists {
my ($schema,$name,$args) = @_;
$name = lc $name;
$SQL = "SELECT md5(prosrc) FROM pg_proc p, pg_language l ".
"WHERE p.prolang = l.oid AND proname = ? AND oidvectortypes(proargtypes) = ?";
$sth = $dbh->prepare($SQL);
$count = $sth->execute($name,$args);
if ($count eq '0E0') {
$sth->finish();
return '';
}
return $sth->fetchall_arrayref()->[0][0];
} ## end of function_exists
sub upgrade {
## Make upgrades to an existing schema to match the current version
## This is generally not reversible, so we throw plenty of warnings
my $self = shift;
my $schema = 'bucardo';
$dbh->do("SET escape_string_warning = 'OFF'");
my $changes = 0;
## Make sure the upgrade_log table is in place
if (!relation_exists($schema, 'upgrade_log')) {
my ($seqlist, $tabledef) = table_definition('upgrade_log');
upgrade_and_log($tabledef,"CREATE TABLE upgrade_log");
$dbh->commit();
}
my @new_sequences = (
'audit_pid_id_seq',
);
my @old_sequences = (
'dbgroup_id_seq',
);
my @new_tables = (
['bucardo', 'bucardo_log_message'],
['bucardo', 'bucardo_rate'],
);
my @new_columns = (
['bucardo', 'audit_pid', 'id', q{INTEGER NOT NULL DEFAULT nextval('audit_pid_id_seq')}],
['bucardo', 'audit_pid', 'parentid', q{INTEGER NULL}],
['bucardo', 'audit_pid', 'familyid', q{INTEGER NULL}],
['bucardo', 'audit_pid', 'source', q{TEXT NULL}],
['bucardo', 'audit_pid', 'target', q{TEXT NULL}],
['bucardo', 'audit_pid', 'master_backend', q{INT NOT NULL DEFAULT pg_backend_pid()}],
['bucardo', 'audit_pid', 'source_backend', q{INT NULL}],
['bucardo', 'audit_pid', 'target_backend', q{INT NULL}],
['bucardo', 'db', 'dbservice', q{TEXT NULL}],
['bucardo', 'goat', 'qpkey', q{TEXT NULL}],
['bucardo', 'sync', 'strict_checking', q{BOOLEAN NOT NULL DEFAULT 'true'}],
['bucardo', 'sync', 'track_rates', q{BOOLEAN NOT NULL DEFAULT 'false'}],
['freezer', 'old_audit_pid', 'id', q{INTEGER}],
['freezer', 'old_audit_pid', 'parentid', q{INTEGER}],
['freezer', 'old_audit_pid', 'familyid', q{INTEGER}],
['freezer', 'old_audit_pid', 'source', q{TEXT} ],
['freezer', 'old_audit_pid', 'target', q{TEXT} ],
['freezer', 'old_audit_pid', 'master_backend', q{INT} ],
['freezer', 'old_audit_pid', 'source_backend', q{INT} ],
['freezer', 'old_audit_pid', 'target_backend', q{INT} ],
);
my @altered_columns = (
['bucardo', 'goat', 'schemaname', 'NO DEFAULT'],
);
my @old_columns = (
['bucardo', 'sync', 'disable_rules'],
['bucardo', 'sync', 'disable_triggers'],
);
my @row_values = (
['bucardo_config','about',q{setting = 'log_showtime'}, 1,
'Show timestamp in the log output? 0=off 1=seconds since epoch 2=scalar gmtime 3=scalar localtime'],
);
my @old_constraints = (
['bucardo', 'goat', 'goat_pkeytype_check'],
['bucardo', 'sync', 'sync_replica_allornone'],
['bucardo', 'sync', 'sync_disable_triggers_method'],
['bucardo', 'sync', 'sync_disable_rules_method'],
);
my @new_constraints = (
);
my @old_functions = (
['create_child_q', 'text'],
);
my @old_indexes = (
['bucardo', 'sync', 'sync_source_targetdb'],
['bucardo', 'sync', 'sync_source_targetgroup'],
);
my @new_indexes = (
['bucardo', 'sync', 'sync_source_targetdb_type', 'UNIQUE source,targetdb,synctype'],
['bucardo', 'sync', 'sync_source_targetgroup_type', 'UNIQUE source,targetgroup,synctype'],
);
my @drop_all_rules = (
['freezer','master_q'],
);
## Drop all existing rules from this table:
for my $row (@drop_all_rules) {
my ($schema,$table) = @$row;
print "Checking $schema and $table\n";
my $oid = relation_exists($schema,$table);
if (!$oid) {
warn "Could not find table $schema.$table to check!\n";
next;
}
$SQL = "SELECT rulename FROM pg_rewrite WHERE ev_class = ? ORDER BY rulename";
$sth = $dbh->prepare($SQL);
$count = $sth->execute($oid);
if ($count < 1) {
$sth->finish();
next;
}
for my $rule (map { $_->[0] } @{$sth->fetchall_arrayref()}) {
upgrade_and_log(qq{DROP RULE "$rule" ON $schema.$table});
clog "Dropped rule $rule on table $schema.$table";
$changes++;
}
}
for my $con (@old_constraints) {
my ($schema, $table, $constraint) = @$con;
if (!constraint_exists($schema, $table, $constraint)) {
clog " Constraint $constraint does not exist";
}
else {
upgrade_and_log(qq{ALTER TABLE $schema.$table DROP CONSTRAINT "$constraint"});
clog "Dropped constraint $constraint ON $schema.$table";
$changes++;
}
}
for my $sequence (@new_sequences) {
if (relation_exists($schema, $sequence)) {
clog " Sequence already exists: $sequence";
}
else {
upgrade_and_log("CREATE SEQUENCE $schema.$sequence");
clog "Created sequence: $sequence";
$changes++;
}
}
for my $sequence (@old_sequences) {
if (!relation_exists($schema, $sequence)) {
clog " Sequence does not exist: $sequence";
}
else {
upgrade_and_log("DROP SEQUENCE $schema.$sequence");
clog "Dropped sequence: $sequence";
$changes++;
}
}
for my $row (@new_tables) {
my ($schema,$table) = @$row;
if (relation_exists($schema, $table)) {
clog " Table already exists: $table\n";
}
else {
my ($seqlist, $tabledef) = table_definition($table);
for my $sequence (@$seqlist) {
if (!relation_exists($schema, $sequence)) {
upgrade_and_log("CREATE SEQUENCE $schema.$sequence");
clog "Created sequence: $sequence";
}
}
upgrade_and_log($tabledef, "CREATE TABLE $table");
clog "Created table: $table\n";
$changes++;
}
}
for my $row (@new_columns) {
my ($schema,$table,$column,$def) = @$row;
if (column_exists($schema, $table, $column)) {
clog " Column already exists: $schema.$table.$column";
}
else {
$def =~ s/\s+/ /g;
$dbh->do("ALTER TABLE $schema.$table ADD COLUMN $column $def");
clog "Created column: $schema.$table.$column $def";
$changes++;
}
}
for my $row (@altered_columns) {
my ($schema,$table,$column,$change) = @$row;
next if ! column_exists($schema, $table, $column);
if ($change eq 'NO DEFAULT') {
my $def = column_default($schema, $table, $column);
if (!$def) {
clog " Column $schema.$table.$column already has no DEFAULT";
}
else {
upgrade_and_log("ALTER TABLE $schema.$table ALTER COLUMN $column DROP DEFAULT");
clog "Removed DEFAULT ($def) from $schema.$table.$column";
$changes++;
}
}
elsif ($change =~ /^RENAME\s+(\w+)/) {
my $newname = $1;
upgrade_and_log("ALTER TABLE $schema.$table RENAME COLUMN $column TO $newname");
clog("Renamed $schema.$table.$column to $newname");
$changes++;
}
elsif ($change =~ /^DEFAULT\s+(.+)/) {
my $newname = $1;
my $oldname = column_default($schema, $table, $column);
if ($newname eq $oldname) {
clog " Column $schema.$table.$column already has a DEFAULT of $newname";
}
else {
upgrade_and_log("ALTER TABLE $schema.$table ALTER COLUMN $column SET DEFAULT $newname");
clog("Changed DEFAULT on $schema.$table.$column to $newname");
$changes++;
}
}
else {
die qq{Do not know how to handle altered column spec of "$change"};
}
}
for my $row (@old_columns) {
my ($schema,$table,$column) = @$row;
if (!column_exists($schema, $table, $column)) {
clog " Column already removed: $schema.$table.$column";
}
else {
$dbh->do("ALTER TABLE $schema.$table DROP COLUMN $column");
clog "Dropped column: $schema.$table.$column";
$changes++;
}
}
for my $row (@new_constraints) {
my ($schema,$table,$name,$def) = @$row;
if (constraint_exists($schema, $table, $name)) {
clog " Contraint $name on $schema.$table already exists";
}
else {
$dbh->do("ALTER TABLE $schema.$table ADD CONSTRAINT $name $def");
clog "Added constraint $name to $schema.$table";
$changes++;
}
}
for my $row (@old_indexes) {
my ($schema,$table,$name) = @$row;
if (!relation_exists($schema, $name)) {
clog " Index already removed: $schema.$table.$name";
}
else {
$dbh->do("DROP INDEX $name");
clog "Dropped index $name";
$changes++;
}
}
for my $row (@new_indexes) {
my ($schema,$table,$name,$def) = @$row;
if (relation_exists($schema, $name)) {
clog " Index already exists: $schema.$table.$name";
}
else {
my $unique = $def =~ s/^UNIQUE\s*// ? 1 : 0;
my $COM = sprintf 'CREATE %sINDEX %s ON %s.%s (%s)',
$unique ? 'UNIQUE ' : '',
$name,
$schema,
$table,
$def;
$dbh->do($COM);
clog "Created index $name on $name";
$changes++;
}
}
## Check for any new config items
$SQL = "SELECT value FROM bucardo_config WHERE lower(setting) = ?";
my $cfgsth = $dbh->prepare($SQL);
$SQL = 'INSERT INTO bucardo_config(setting,value,about) VALUES (?,?,?)';
my $newcfg = $dbh->prepare($SQL);
my $file = 'bucardo.schema';
open my $fh, '<', $file or die qq{Could not open "$file": $!\n};
my %config;
my $inside = 0;
while (<$fh>) {
if (!$inside) {
if (/^WITH DELIMITER/) {
$inside = 1;
}
next;
}
if (/^\\/) {
$inside = 0;
next;
}
## Scoop
my ($setting,$value,$about) = split /\|/ => $_;
$config{$setting} = [$value,$about];
$count = $cfgsth->execute($setting);
$cfgsth->finish();
if ($count eq '0E0') {
clog "Added new bucardo_config setting: $setting";
$changes++;
$newcfg->execute($setting,$value,$about);
}
}
close $fh or die qq{Could not close file "$file": $!\n};
for my $row (@row_values) {
my ($table,$column,$where,$force,$value) = @$row;
my $val = column_value($schema,$table,$column,$where);
if (!defined $val) {
die "Failed to find $table.$column where $where!\n";
}
if ($val eq $value) {
clog " Value of $table.$column where $where is already correct";
}
else {
$SQL = sprintf "UPDATE $schema.$table SET $column=%s WHERE $where",
$dbh->quote($value);
upgrade_and_log($SQL);
clog "New value set for $schema.$table.$column WHERE $where";
$changes++;
}
}
for my $row (@old_functions) {
my ($name, $args) = @$row;
if (function_exists('bucardo', $name, $args)) {
clog "Dropping deprecated function $name($args)";
upgrade_and_log(qq{DROP FUNCTION bucardo."$name"($args)});
$changes++;
}
else {
clog " Deprecated function $name($args) does not exist";
}
}
## Now we check all the functions, triggers, and indexes within bucardo.schema
$file = 'bucardo.schema';
open $fh, '<', $file or die qq{Could not open "$file": $!\n};
my (@flist, @tlist, @ilist);
my ($fname,$args,$fbody) = ('','','');
my ($tname,$tbody) = ('','');
while (<$fh>) {
if ($fbody) {
if (/^(\$bc\$;)/) {
$fbody .= $1;
push @flist, [$fname, $args, $fbody];
$fbody = $fname = $args = '';
}
else {
$fbody .= $_;
}
next;
}
if ($tbody) {
$tbody .= $_;
if (/;/) {
push @tlist, [$tname, $tbody];
$tbody = $tname = '';
}
next;
}
if (/^CREATE (?:OR REPLACE )?FUNCTION\s+(\S+)/) {
$fname = $1;
$fbody .= $_;
$fname =~ s/bucardo\.//;
$fname =~ s/\((.*)\)// or die "No args found for function: $_\n";
$args = $1;
$args =~ s/,(\S)/, $1/g;
next;
}
if (/^CREATE TRIGGER (\S+)/) {
$tname = $1;
$tbody .= $_;
next;
}
if (/^CREATE INDEX (\S+)/) {
push @ilist, [$1, $_];
next;
}
}
close $fh or die qq{Could not close file "$file": $!\n};
$SQL = "SELECT md5(?)";
my $md5sth = $dbh->prepare($SQL);
for my $row (@flist) {
my ($name,$args,$body) = @$row;
my $oldbody = function_exists($schema,$name,$args);
if (!$oldbody) {
$body =~ s/FUNCTION /FUNCTION $schema./;
upgrade_and_log($body,"CREATE FUNCTION $schema.$name($args)");
clog "Added function $schema.$name($args)";
$changes++;
next;
}
my $realbody = $body;
$realbody =~ s/.*?\$bc\$(.+)\$bc\$;/$1/sm;
$md5sth->execute($realbody);
my $newbody = $md5sth->fetchall_arrayref()->[0][0];
if ($oldbody eq $newbody) {
clog " Function unchanged: $schema.$name($args)";
next;
}
$body =~ s/^CREATE FUNCTION/CREATE OR REPLACE FUNCTION/;
$body =~ s/FUNCTION /FUNCTION $schema./;
(my $short = $body) =~ s/^(.+?)\n.*/$1/s;
$dbh->do("SAVEPOINT bucardo_upgrade");
eval { upgrade_and_log($body,$short); };
if ($@) {
$dbh->do("ROLLBACK TO bucardo_upgrade");
(my $dropbody = $short) =~ s/CREATE OR REPLACE/DROP/;
$dropbody .= ' CASCADE';
upgrade_and_log($dropbody);
upgrade_and_log($body,$short);
}
else {
$dbh->do("RELEASE bucardo_upgrade");
}
clog "Updated function: $schema.$name($args)";
$changes++;
}
## Check for any added triggers
for my $row (@tlist) {
my ($name,$body) = @$row;
if (trigger_exists($name)) {
clog " Trigger $name already exists";
}
else {
upgrade_and_log($body);
clog "Created trigger $name";
$changes++;
}
}
## Check for any added indexes
for my $row (@ilist) {
my ($name,$body) = @$row;
if (relation_exists('bucardo',$name)) {
clog " Index $name already exists";
}
else {
warn "DOES NOT EXIST? $name";
upgrade_and_log($body);
clog "Created index $name";
$changes++;
}
}
## The freezer.q_staging table is no longer needed, but we must empty it before dropping
if (relation_exists('freezer','q_staging')) {
upgrade_and_log("INSERT INTO freezer.master_q SELECT * FROM freezer.q_staging");
upgrade_and_log("DROP TABLE freezer.q_staging");
clog 'Dropped deprecated table freezer.q_staging';
$changes++;
}
if ($changes) {
printf "Okay to commit $changes %s? ", $changes==1 ? 'change' : 'changes';
exit if <STDIN> !~ /Y/i;
$dbh->commit();
print "Changes have been commited\n";
}
exit 1;
} ## end of upgrade
sub add_database {
my $type = shift;
## Usage: add database <dbname> name=internal_name port=xxx host=xxx user=xxx pass=xxx
## service=xxx conn=xxx sourcelimit=xxx targetlimit=xxx
## The first word is the name of the database we connect to (column dbname)
my $item_name = shift @nouns || '';
if (!length $item_name) {
warn "Usage: add $type <name>";
exit 1;
}
## The rest is the options. If not there, we fill in the defaults
my $info = join ' ' => @nouns;
my $delim = $info =~ /\|/ ? "\\s*\\|\\s*" : '\\s+';
## Command line name, default value, database column name
my %arg = (
name => [$item_name, ''],
user => ['bucardo', 'dbuser'],
port => ['', 'dbport'],
host => ['', 'dbhost'],
pass => ['', 'dbpass'],
status => ['', ''],
conn => ['', 'dbconn'],
service => ['', 'dbservice'],
sourcelimit => ['', ''],
targetlimit => ['', ''],
);
for my $arg (split /$delim/ => $info) {
if ($arg !~ /(.+)=(.+)/o) {
die qq{Invalid argument\n};
}
my ($name,$value) = ($1,$2);
if (!exists $arg{$name}) {
die "Unknown option '$name'\n";
}
$arg{$name}[0] = $value;
}
$arg{dbname} = [$item_name,''];
## Lots of checks go here...
my ($cols,$phs) = ('','');
my @vals;
for my $col (sort keys %arg) {
next if ! length $arg{$col}[0];
my $name = $arg{$col}[1] || $col;
$cols .= "$name,";
$phs .= '?,';
push @vals, $arg{$col}[0];
}
$cols =~ s/,$//;
$phs =~ s/,$//;
## Add this to the database
$SQL = "INSERT INTO bucardo.db ($cols) VALUES ($phs)";
$DEBUG and warn "SQL: $SQL\n";
$DEBUG and warn Dumper \@vals;
$sth = $dbh->prepare($SQL);
eval {
$count = $sth->execute(@vals);
};
if ($@) {
die "Fail: $@\n";
}
## Prompt for final go-ahead
$dbh->commit();
warn "Database added: $arg{name}[0]\n";
exit 0;
} ## end of add_database
sub add_table {
my $type = shift;
## Usage: add table [schema].table db=internal_db_name ping=bool standard_conflict=xxx makedelta=bool
## TODO: Make this smarter
my $DEFAULT_SCHEMA = 'public';
## Any single words are table names.
my (@newnouns,@tables);
for (@nouns) {
if (/=/) {
push @newnouns => $_;
}
else {
if (/^(\w*?)?\.?(\w+)$/) {
push @tables => length $1 ? [$1,$2] : [$DEFAULT_SCHEMA,$2];
}
else {
warn "Invalid table name: $_\n";
exit 1;
}
}
}
@nouns = @newnouns;
if (! @tables) {
warn "Usage: add $type <name>";
exit 1;
}
## The rest is the options. If not there, we fill in the defaults
my $info = join ' ' => @nouns;
my $delim = $info =~ /\|/ ? "\\s*\\|\\s*" : '\\s+';
## Command line name, default value, database column name
my %arg = (
db => ['', ''],
ping => ['', ''],
standard_conflict => ['', ''],
herd => ['', ''],
makedelta => ['', ''],
);
for my $arg (split /$delim/ => $info) {
if ($arg !~ /(.+)=(.+)/o) {
die qq{Invalid argument: ($arg)\n};
}
my ($name,$value) = ($1,$2);
if (!exists $arg{$name}) {
die "Unknown option '$name'\n";
}
$arg{$name}[0] = $value;
}
if (! length $arg{db}[0]) {
die "Please specify a database with db=<name>\n";
}
## Lots of checks go here...
my $herd;
if ($arg{herd}->[0]) {
$herd = $arg{herd}->[0];
delete $arg{herd};
## Does this herd exist?
$SQL = 'SELECT count(*) FROM herd WHERE name = ?';
$sth = $dbh->prepare($SQL);
$count = $sth->execute($herd);
$sth->finish();
if ($count != 1) {
die "Cannot add to non-existent herd: $herd\n";
}
}
my ($cols,$phs) = ('','');
my @vals;
for my $col (sort keys %arg) {
next if ! length $arg{$col}[0];
my $name = $arg{$col}[1] || $col;
$cols .= "$name,";
$phs .= '?,';
push @vals, $arg{$col}[0] if $arg{$col}[0];
}
$cols =~ s/,$//;
$phs =~ s/,$//;
## Add these to the database
$SQL = "INSERT INTO bucardo.goat ($cols,schemaname,tablename) VALUES ($phs,?,?) RETURNING id";
if ($DEBUG) {
warn "SQL: $SQL\n";
warn Dumper \@tables;
}
$sth = $dbh->prepare($SQL);
my @id;
for (@tables) {
my ($schema,$table) = @$_;
eval {
if ($DEBUG >= 2) {
warn "SQL: $SQL\n";
my $args = join ',' => @vals;
my $ss = defined $schema ? $schema : '<undef>';
warn "Args: $args, $ss, $table\n";
}
$count = $sth->execute(@vals,$schema,$table);
push @id => $sth->fetch()->[0];
};
if ($@) {
die "Table add failed: $@\n";
}
$DEBUG and warn "Insert count: $count\n";
}
## Add them to the herd if it was specificed
if (defined $herd) {
$SQL = 'INSERT INTO herdmap (goat,herd,priority) VALUES (?,?,?)';
$sth = $dbh->prepare($SQL);
for (@id) {
$sth->execute($_,$herd,0);
}
}
## Prompt for final go-ahead
$dbh->commit();
my $tablelist = sprintf "%s added: ", @tables > 1 ? 'Tables' : 'Table';
for (@tables) {
$tablelist .= sprintf "%s%s, ", $_->[0] ? "$_->[0]." : '', $_->[1];
}
$tablelist =~ s/, $//;
warn "$tablelist\n";
exit 0;
} ## end of add_table
sub add_sync {
my $type = shift;
## Usage: add sync syncname options
## The first word is the sync name
my $item_name = shift @nouns || '';
if (!length $item_name) {
warn "Usage: add $type <name>";
exit 1;
}
## The rest is the options. If not there, we fill in the defaults
my $info = join ' ' => @nouns;
my $delim = $info =~ /\|/ ? "\\s*\\|\\s*" : '\\s+';
## Command line name, default value, database column name
my %arg = (
source => ['', ''],
targetdb => ['', ''],
targetgroup => ['', ''],
type => ['', 'synctype'],
makedelta => ['', ''],
usecustomselect => ['', ''],
);
for my $arg (split /$delim/ => $info) {
if ($arg !~ /(.+)=(.+)/o) {
die qq{Invalid argument\n};
}
my ($name,$value) = ($1,$2);
if (!exists $arg{$name}) {
die "Unknown option '$name'\n";
}
$arg{$name}[0] = $value;
}
$arg{name} = [$item_name];
## Lots of checks go here...
my ($cols,$phs) = ('','');
my @vals;
for my $col (sort keys %arg) {
next if ! length $arg{$col}[0];
my $name = $arg{$col}[1] || $col;
$cols .= "$name,";
$phs .= '?,';
push @vals, $arg{$col}[0];
}
$cols =~ s/,$//;
$phs =~ s/,$//;
if ($arg{type}[0] eq 'fullcopy') {
$cols .= ',stayalive,kidsalive,ping';
$phs .= q{,'f','f','f'};
}
## Add this to the database
$SQL = "INSERT INTO bucardo.sync ($cols) VALUES ($phs)";
if ($DEBUG) {
warn "SQL: $SQL\n";
warn Dumper \@vals;
}
$sth = $dbh->prepare($SQL);
eval {
$count = $sth->execute(@vals);
};
if ($@) {
die "Failed to add sync: $@\n";
}
$dbh->commit();
warn "Sync added: $item_name\n";
exit 0;
} ## end of add_sync
sub alter_sync {
## Modify an existing sync
## Usage: alter sync syncname options
## targetgroup=foo - add/change the targetgroup
## targetdb=foo - add/change the targetdb
## If a target already exists of other type, it is set to null
my $type = shift;
## The first word is the sync name
my $item_name = shift @nouns || '';
if (!length $item_name) {
warn "Usage: alter $type <name>";
exit 1;
}
## The rest is the options.
my $info = join ' ' => @nouns;
my $delim = $info =~ /\|/ ? "\\s*\\|\\s*" : '\\s+';
## Command line name, value
my %arg = (
targetdb => '',
targetgroup => '',
);
for my $arg (split /$delim/ => $info) {
if ($arg !~ /(.+)=(.+)/o) {
die qq{Invalid argument\n};
}
my ($name,$value) = ($1,$2);
if (!exists $arg{$name}) {
die "Unknown option '$name'\n";
}
$arg{$name} = $value;
}
$arg{name} = $item_name;
## Make sure this sync exists and gather current information from it
$SQL = 'SELECT * FROM bucardo.sync WHERE name = ?';
$sth = $dbh->prepare($SQL);
$count = $sth->execute($item_name);
if ($count != 1) {
warn "No such sync: $item_name\n";
exit 1;
}
my $sync = $sth->fetchall_arrayref({})->[0];
my %set;
## Do we have a targetgroup?
if (length $arg{targetgroup}) {
my $group = $arg{targetgroup};
if (defined $sync->{targetgroup} and $group eq $sync->{targetgroup}) {
warn qq{targetgroup "$group" already exists: no change made\n};
}
else {
$set{'targetgroup'} = $group;
if (defined $sync->{targetdb}) {
warn qq{Removing targetdb "$sync->{targetdb}"\n};
$set{'targetdb'} = undef;
}
}
}
elsif (length $arg{targetdb}) {
my $db = $arg{targetdb};
if (defined $sync->{targetdb} and $db eq $sync->{targetdb}) {
warn qq{targetdb "$db" already exists: no change made\n};
}
else {
$set{'targetdb'} = $db;
if (defined $sync->{targetgroup}) {
warn qq{Removing targetgroup "$sync->{targetgroup}"\n};
$set{'targetgroup'} = undef;
}
}
}
if (! keys %set) {
warn "No change made\n";
exit 0;
}
$SQL = 'UPDATE sync SET ';
my @args;
for my $c (sort keys %set) {
$SQL .= "$c = ?, ";
push @args, $set{$c};
}
$SQL =~ s/, $//;
$SQL .= ' WHERE name = ?';
$sth = $dbh->prepare($SQL);
eval {
$count = $sth->execute(@args,$item_name);
};
## Prompt for final go-ahead
$dbh->commit();
warn "Sync updated: $item_name ($SQL)\n";
# warn about not forgetting to reload the sync if active?
exit 0;
} ## end of alter_sync
sub remove_sync {
my $type = shift;
## Usage: remove sync syncname
## The first word is the sync name
my $item_name = shift @nouns || '';
if (!length $item_name) {
warn "Usage: remove $type <name>";
exit 1;
}
## Does this sync exist?
$SQL = 'SELECT * FROM bucardo.sync WHERE name = ?';
$sth = $dbh->prepare($SQL);
$count = $sth->execute($item_name);
if ($count != 1) {
warn "No such sync: $item_name\n"; ## XXX Suggest alternatives?
exit 1;
}
## Is it active?
## Is it in use?
## Verify removal
## Remove vs inactivate
$SQL = 'DELETE FROM bucardo.sync WHERE name = ?';
$sth = $dbh->prepare($SQL);
$count = $sth->execute($item_name);
if ($count != 1) {
warn "Could not delete from sync table!\n";
$dbh->rollback();
exit 1;
}
$dbh->commit();
warn "Sync removed: $item_name\n";
exit 0;
} ## end of remove_sync
sub add_herd {
my $type = shift;
## Usage: add herd name (goat1, goat2, ...)
my $item_name = shift @nouns || '';
if (!length $item_name) {
warn "Usage: add $type <name> goats";
exit 1;
}
## The rest is the options. If not there, we fill in the defaults
my $info = join ' ' => @nouns;
my $delim = $info =~ /\|/ ? "\\s*\\|\\s*" : '\\s+';
## Command line name, default value, database column name
my %arg = (
);
my @goatlist;
for my $arg (split /$delim/ => $info) {
if ($arg !~ /(.+)=(.+)/o) {
push @goatlist, $arg;
next;
}
my ($name,$value) = ($1,$2);
if (!exists $arg{$name}) {
die "Unknown option '$name'\n";
}
$arg{$name}[0] = $value;
}
$arg{name} = [$item_name];
## Lots of checks go here...
my ($cols,$phs) = ('','');
my @vals;
for my $col (sort keys %arg) {
next if ! length $arg{$col}[0];
my $name = $arg{$col}[1] || $col;
$cols .= "$name,";
$phs .= '?,';
push @vals, $arg{$col}[0];
}
$cols =~ s/,$//;
$phs =~ s/,$//;
## Add this to the database
$SQL = "INSERT INTO bucardo.herd ($cols) VALUES ($phs)";
$sth = $dbh->prepare($SQL);
eval {
$count = $sth->execute(@vals);
};
if ($@) {
die "Failed to add herd: $@\n";
}
warn "Herd added: $item_name\n";
if (@goatlist) {
$SQL = "INSERT INTO bucardo.herdmap(herd,goat) VALUES (?,(SELECT id FROM bucardo.goat WHERE tablename=?))";
$sth = $dbh->prepare($SQL);
for my $goat (@goatlist) {
eval {
$sth->execute($item_name, $goat);
};
if ($@) {
die "Failed to add goat to herdmap: $@\n";
}
print qq{Added goat "$goat" to new herd\n};
}
}
$dbh->commit();
exit 0;
} ## end of add_herd
sub add_dbgroup {
my $type = shift;
## Usage: add dbgroup name db1 db2 db3 ...
my $name = shift @nouns || '';
if (!length $name) {
warn "Usage: add $type <name>";
exit 1;
}
## Create the group if it does not exist
$SQL = 'SELECT 1 FROM bucardo.dbgroup WHERE name = ?';
$sth = $dbh->prepare($SQL);
$count = $sth->execute($name);
$sth->finish();
if ($count != 1) {
$SQL = 'INSERT INTO dbgroup(name) VALUES (?)';
$sth = $dbh->prepare($SQL);
$sth->execute($name);
}
## Add any dbs to this group, if they are not already there
if (@nouns) {
$SQL = 'SELECT 1 FROM bucardo.db WHERE name = ?';
my $gotdb = $dbh->prepare($SQL);
$SQL = 'SELECT priority FROM dbmap WHERE db=? AND dbgroup=?';
my $getpri = $dbh->prepare($SQL);
$SQL = 'INSERT INTO dbmap(db,dbgroup,priority) VALUES (?,?,?)';
my $addrow = $dbh->prepare($SQL);
$SQL = 'UPDATE dbmap SET priority = ? WHERE db=? AND dbgroup=?';
my $changepri = $dbh->prepare($SQL);
for my $db (@nouns) {
my $newpri = 1;
if ($db =~ s/\=(\d+)//) { $newpri = $1; }
$count = $gotdb->execute($db);
$gotdb->finish();
if ($count != 1) {
warn "No such database: $db\n";
exit 1;
}
$count = $getpri->execute($db,$name);
if ($count == 1) {
my $oldpri = $getpri->fetchall_arrayref()->[0][0];
if ($oldpri != $newpri) {
$changepri->execute($newpri,$db,$name);
}
}
else {
## Does not exist
$addrow->execute($db,$name,$newpri);
}
}
}
$dbh->commit();
warn "Group updated\n";
exit 0;
} ## end of add_dbgroup
sub config {
## View or change a value inside the bucardo_config table
my $setusage = "Usage: $0 set setting=value [setting=value ...]\n";
if (!@nouns) {
$verb eq 'set' and die $setusage;
die "Usage: $0 show <all|setting1> [settting2 ...]\n";
}
$SQL = 'SELECT * FROM bucardo.bucardo_config';
$sth = $dbh->prepare($SQL);
$sth->execute();
my $config = $sth->fetchall_hashref('setting');
if ($verb eq 'show') {
my $all = $nouns[0] =~ /\ball\b/i ? 1 : 0;
my $maxsize = 3;
for my $s (keys %$config) {
next if ! $all and ! grep { $s =~ /$_/i } @nouns;
$maxsize = length $s if length $s > $maxsize;
}
for my $s (sort keys %$config) {
next if ! $all and ! grep { $s =~ /$_/i } @nouns;
printf "%-*s = %s\n", $maxsize, $s, $config->{$s}{value};
}
exit;
}
$SQL = 'UPDATE bucardo.bucardo_config SET value = ? WHERE setting = ?';
$sth = $dbh->prepare($SQL);
for my $noun (@nouns) {
$noun =~ /(\w+)=(.+)/ or die $setusage;
my ($set,$val) = (lc $1,$2);
if (! exists $config->{$set}) {
die qq{Unknown setting "$set"\n};
}
$sth->execute($val,$set);
print qq{Set "$set" to "$val"\n};
}
$dbh->commit();
exit 0;
} ## end of config
sub message {
## Add a message to the Bucardo logs, via the bucardo_log_message table
## Note: If no MCP processes are listening, the message will hang out until an MCP processes it
if (! length($nouns)) {
die qq{Usage: bucardo_ctl message "Some message to send to the logs"\n};
}
$SQL = "INSERT INTO bucardo_log_message(msg) VALUES (?)";
$sth = $dbh->prepare($SQL);
$sth->execute($nouns);
$dbh->commit();
$VERBOSE and print "Message added\n";
exit;
} ## end of message
__END__
=head1 NAME
bucardo_ctl - utility script for controlling the Bucardo program
=head1 VERSION
This documents describes bucardo_ctl version 3.2.7
=head1 SYNOPSIS
./bucardo_ctl start "Starting up - Greg"
./bucardo_ctl stop "Bringing down for debugging - Raul E."
./bucardo_ctl list dbs
./bucardo_ctl add sync testsync source=herd1 type=pushdelta targetdb=B
./bucardo_ctl add database newdb name=internal_name port=5432 host=myserver user=postgres
./bucardo_ctl add table some_schema.some_table db=internal_db_name ping=bool standard_conflict=source
./bucardo_ctl add herd newherd
./bucardo_ctl add dbgroup name db1 db2 db3 ...
./bucardo_ctl alter sync syncname targetgroup=foo targetdb=foo
./bucardo_ctl ping
./bucardo_ctl status
./bucardo_ctl status sync1 sync2
./bucardo_ctl kick sync1 sync2
./bucardo_ctl kick sync1 0 --retry=10
./bucardoctl reload_config
=head1 DESCRIPTION
The bucardo_ctl script is used to control the operation of a Bucardo process. Primarily it is
used for stopping and starting Bucardo, viewing current information, and for kicking off named
syncs. The script needs to know how to connect to the Bucardo database: this information can
be passed as arguments or hard-coded into the script itself. If you have multiple Bucardos
running, it is recommended that you create aliases for each database.
=head1 COMMANDS
=over 4
=item B<start>
Usage: ./bucardo_ctl start "Reason --name"
Restarts Bucardo cleanly by first issuing the equivalent of a stop to ask any existing Bucardo
proceses to exit, and then starting a new Bucardo MCP process. A short reason and name should
be provided - these are logged in the reason_file file and sent in the email sent when Bucardo
has been started up.
Before attempting to kill any old processes, a ping command with a timeout of 5 seconds is issued.
If this returns successfully (indicating an active MCP process already running), the script will
exit with a return value of 2.
=item B<stop>
Usage: ./bucardo_ctl stop "Reason --name"
Forces Bucardo to quit by creating a stop file which all MCP, CTL, and KID processes should
detect and cause them to exit. Note that active syncs will not exit right away, as they
will not look for the stop file until they have finished their current run. Typically,
you should scan the list of processes after running this program to make sure that all Bucardo
processes have stopped. One should also provide a reason for issuing the stop - usually
this is a short explanation and your name. This is logged in the reason_file file and
is also used by Bucardo when it exits and sends out mail about its death.
=item B<list>
Usage: ./bucardo_ctl list <type> <regex>
Lists summary information about databases, goats, herds, or syncs. Adding anything after the
type will look up all matching entries.
=item B<add>
Usage: add <item_type> <item_name>
Usage: add database <dbname> name=internal_name port=xxx host=xxx user=xxx pass=xxx service=xxx conn=xxx sourcelimit=xxx targetlimit=xxx
Usage: add table [schema].table db=internal_db_name ping=bool standard_conflict=xxx makedelta=bool
Usage: add sync syncname options
Usage: add herd name
Usage: add dbgroup name db1 db2 db3 ...
Tells Bucardo about new objects it should know about. These commands can
replace direct manipulation of the tables in the bucardo schema for the
supported object types (you'll still need to add things like the mappings between objects on your own).
=item B<alter>
Usage: alter sync syncname [targetgroup=foo | targetdb=foo]
Alters an existing sync, allowing modification to the target database or target database group
=item B<kick>
Usage: ./bucardo_ctl kick <syncname(s)> [timeout]
Tells one or more named syncs to fire as soon as possible. Note that this simply sends a request that
the sync fire: it may not start right away if the same sync is already running, or if the source or
target database has exceeded the number of allowed Bucardo connections. If the final argument is a
number, it is treated as a timeout. If this number is zero, the bucardo_ctl command will not return
until the sync has finished. For any other number, the sync will wait at most that number of seconds.
If any sync has not finished before the timeout, a false value is returned. In all other cases, a
true value is returned.
If a timeout is given, the total completion time in seconds is also displayed. If the sync is going to
multiple targets, the time that each target takes from the start of the kick is also shown as each
target finishes.
=item B<reload_config>
Forces Bucardo to reload the bucardo_config file, and then restart all processes to ensure that the new
information is loaded.
=item B<show>
Usage: ./bucardo_ctl show <all|setting1> [setting2..]
Shows the current values in the bucardo_config table. Use the keyword 'all' to see all the settings, or
specify one or more search terms.
=item B<set>
Usage: ./bucardo_ctl set setting1=value [setting2=value]
Sets one or more items inside the bucardo_config table. Setting names are case-insensitive.
=item B<ping>
Sends a ping notice to the MCP process to see if it will respond. By default, it will wait 15 seconds. A
numeric argument will change this timeout. Using a 0 as the timeout indicates waiting forever. If a response
was returned, the program will exit with a value of 0. If it times out, the value will be 1.
=item B<status>
Usage: ./bucardo_ctl status [syncname(s)] [--sort=#] [--daysback=#] [--showdays]
Shows the current status of all known syncs in a tabular format. If given one or more syncnames,
shows detailed information for each one.
When showing all syncs, the columns are:
=over 8
=item 1. B<Name>
The name of the sync
=item 2. B<Type>
The type of the sync. C<F> = fullcopy, C<S> = swap, C<P> = pushdelta. In addition, if a sync is overdue, a C<O!> will
appear, and if it is expired, a C<E!> will appear.
=item 3. B<State>
The current status of this sync. If no sync is running, C<idle> will appear. If a sync has been rquested, but has not
started yet, C<WAIT> will appear, along with how long since the sync was requested. If a sync is
currently running, C<RUN> will appear, followed by the amount of time the sync has been running, followed by which
target the sync is running against. Note that syncs running to more than one database at a time will only show
the one most recently started.
=item 4. B<PID>
The PID of the current sync's controller (CTL). Note that if this is not a persistent sync and the state is C<idle>,
this is merely a historical record and does not represent an active process.
=item 5. B<Last_good>
How long since this sync last ran succesfully. Remember that this is affected by the --daysback parameter.
=item 6. B<Time>
The amount of time the last successful sync took to run.
=item 7. B<I/U/D>
The number of inserts. updates, and deletes performed by the last successful sync.
=item 8. B<Last_bad>
How long since this sync failed to run successfully. Strongly affected by the --daysback parameter.
=item 9. B<Time>
The amount of time the last failed sync took before it was aborted.
=back
=item B<activate> syncname [syncname2 syncname3 ...] [timeout]
Activates one or more named syncs. If given a timeout argument, it will wait until it has received
confirmation from Bucardo that each sync has been successfully activated.
=item B<deactivate> syncname [syncname2 syncname3 ...] [timeout]
Deactivates one or more named syncs. If given a timeout argument, it will wait until it has received
confirmation from Bucardo that the sync has been successfully deactivated.
=item B<upgrade>
Performs an upgrade of Bucardo itself, by making sure the connected Bucardo database has a copy of the
latest schema. This should always be run after upgrading Bucardo, and will prompt you before making
any permanent changes.
=item B<message>
Adds a message to the running Bucardo logs. This message will appear prefixed with "MESSAGE: ". If
Bucardo is not running, the message will go to the logs the next time Bucardo is running and someone
adds another message.
=back
=head1 OPTIONS
It is usually easier to set most of these options at the top of the script, or make an alias for them,
as they will not change very often if at all.
=over 4
=item B<--dbport=number>
=item B<--dbhost=string>
=item B<--dbname=string>
=item B<--dbuser=string>
=item B<--dbpass=string>
The port, host, and name of the Bucardo database, the user to connect as, and the password to use.
=item B<--verbose>
Tells Bucardo (not bucardo_ctl) to run in verbose mode. Default is on. Turning this off is not recommended.
=item B<--ctlverbose>
Makes bucardo_ctl run verbosely. Default is off.
=item B<--ctlquiet>
Tells bucardo_ctl to be as quiet as possible. Default is off.
=item B<--help>
Shows a brief summary of usage for bucardo_ctl.
=back
=head2 Kick arguments
The following arguments are only used with the 'kick' command:
=over 4
=item B<--retry=#>
The number of times to retry a sync if it fails. Defaults to 0.
=item B<--retrysleep>
How long to sleep, in seconds, between each retry attempt.
=item B<--notimer>
By default, kicks with a timeout argument give a running real-time summary of time elapsed by
using the backspace character. This may not be wanted if running a kick, for example,
via a cronjob, so turning --notimer on will simply print the entire message without backspaces.
=back
=head2 Status arguments
The following arguments are only used with the 'status' command:
=over 4
=item B<--daysback=#>
Sets how many days backwards to search the old 'q' logs for information. Defaults to 3 days.
=item B<--showdays>
Specifies whether or not do list the time interval with days, or simply show the hours. For example,
"3d 12h 6m 3s" vs. "48h 6m 3s"
=item B<--compress>
Specifies whether or not to compress the time interval by removing spaces. Mostly used to limit
the width of the 'status' display.
=item B<--sort=#>
Requests sorting of the 'status' output by one of the nine columns. Use a negative number to reverse
the sort order.
=back
=head2 Startup arguments
The following arguments are only applicable when using the "start" command:
=over 4
=item B<--sendmail>
Tells Bucardo whether or not to send mail on interesting events: startup, shutdown, and errors. Default is on.
Only applicable when using ./bucardo_ctl start.
=item B<--extraname=string>
A short string that will be appended to the version string as output by the Bucardo process names. Mostly
useful for debugging.
=item B<--debugfilesep>
Forces creation of separate log files for each Bucardo process of the form "log.bucardo.X.Y",
where X is the type of process (MCP, CTL, or KID), and Y is the process ID.
=item B<--debugstderr>
Sends all log messages to standard error. Off by default.
=item B<--debugstdout>
Sends all log messages to standard out. Off by default.
=item B<--debugsyslog>
Sends all log messages to the syslog daemon. On by default. The facility used is controlled by
the row "syslog_facility" in the bucardo_config table, and defaults to "LOG_LOCAL1".
=item B<--debugfile>
If set, writes detailed debugging information to one or more files.
=item B<--debugdir=directory name>
Directory where the debug files should go.
=item B<--debugname=string>
Appends the given string to the end of the default debug file name, "log.bucardo". A dot is added
before the name as well, so a debugname of "rootdb" would produce a log file named "log.bucardo.rootdb".
=item B<--cleandebugs>
Forces removal of all old debug files before running.
=back
=head1 FILES
In addition to command-line configurations, you can put any options inside of a file. The file F<.bucardorc> in
the current directory will be used if found. If not found, then the file F<~/.bucardorc> will be used. The format
of the file is option = value, one per line. Any line starting with a '#' will be skipped. Any values loaded
from a .bucardorc file will be overwritten by command-line options.
=head1 ENVIRONMENT VARIABLES
The bucardo_ctl script uses I<$ENV{HOME}> to look for a F<.bucardorc> file.
=head1 BUGS
The 'status' command does not yet return current information, and the start time in particular should be
taken with a grain of salt.
Bug reports and feature requests are always welcome, please visit http://bucardo.org or email bucardo-general@bucardo.org.
=head1 SEE ALSO
Bucardo
=head1 COPYRIGHT
Copyright 2006-2009 Greg Sabino Mullane <greg@endpoint.com>
This program is free to use, subject to the limitations in the LICENSE file.
=cut