Skip to content
10401 lines (8198 sloc) 384 KB
#!perl
# -*-mode:cperl; indent-tabs-mode: nil; cperl-indent-level: 4-*-
## The main Bucardo program
##
## This script should only be called via the 'bucardo' program
##
## Copyright 2006-2016 Greg Sabino Mullane <greg@endpoint.com>
##
## Please visit http://bucardo.org for more information
package Bucardo;
use 5.008003;
use strict;
use warnings;
use utf8;
use open qw( :std :utf8 );
our $VERSION = '5.4.0';
use DBI 1.51; ## How Perl talks to databases
use DBD::Pg 2.0 qw( :async ); ## How Perl talks to Postgres databases
use DBIx::Safe '1.2.4'; ## Filter out what DB calls customcode may use
use sigtrap qw( die normal-signals ); ## Call die() on HUP, INT, PIPE, or TERM
use Config qw( %Config ); ## Used to map signal names
use File::Spec qw( ); ## For portable file operations
use Data::Dumper qw( Dumper ); ## Used to dump information in email alerts
use POSIX qw( strftime strtod ); ## For grabbing the local timezone, and forcing to NV
use Sys::Hostname qw( hostname ); ## Used for host safety check, and debugging/mail sending
use IO::Handle qw( autoflush ); ## Used to prevent stdout/stderr buffering
use Sys::Syslog qw( openlog syslog ); ## In case we are logging via syslog()
use Net::SMTP qw( ); ## Used to send out email alerts
use boolean qw( true false ); ## Used to send truthiness to MongoDB
use List::Util qw( first ); ## Better than grep
use MIME::Base64 qw( encode_base64
decode_base64 ); ## For making text versions of bytea primary keys
use Time::HiRes qw( sleep gettimeofday
tv_interval ); ## For better resolution than the built-in sleep
## and for timing of events
## Formatting of Data::Dumper() calls:
$Data::Dumper::Varname = 'BUCARDO';
$Data::Dumper::Indent = 1;
## Common variables we don't want to declare over and over:
use vars qw($SQL %SQL $sth %sth $count $info);
## Logging verbosity control
## See also the 'log_level_number' inside the config hash
use constant {
LOG_WARN => 0, ## Always shown
LOG_TERSE => 1, ## Bare minimum
LOG_NORMAL => 2, ## Normal messages
LOG_VERBOSE => 3, ## Many more details
LOG_DEBUG => 4, ## Firehose: rarely needed
LOG_DEBUG2 => 5, ## Painful level of detail
};
## Map system signal numbers to standard names
## This allows us to say kill $signumber{HUP} => $pid
my $i = 0;
my %signumber;
for (split(' ', $Config{sig_name})) {
$signumber{$_} = $i++;
}
## Prevent buffering of output:
*STDOUT->autoflush(1);
*STDERR->autoflush(1);
## Configuration of DBIx::Safe
## Specify exactly what database handles are allowed to do within custom code
## Here, 'strict' means 'inside the main transaction that Bucardo uses to make changes'
my $strict_allow = 'SELECT INSERT UPDATE DELETE quote quote_identifier';
my $nostrict_allow = "$strict_allow COMMIT ROLLBACK NOTIFY SET pg_savepoint pg_release pg_rollback_to";
my %dbix = (
source => {
strict => {
allow_command => $strict_allow,
allow_attribute => '',
allow_regex => '', ## Must be qr{} if not empty
deny_regex => '',
},
notstrict => {
allow_command => $nostrict_allow,
allow_attribute => 'RaiseError PrintError',
allow_regex => [qr{CREATE TEMP TABLE},qr{CREATE(?: UNIQUE)? INDEX}],
deny_regex => '',
},
},
target => {
strict => {
allow_command => $strict_allow,
allow_attribute => '',
allow_regex => '', ## Must be qr{} if not empty
deny_regex => '',
},
notstrict => {
allow_command => $nostrict_allow,
allow_attribute => 'RaiseError PrintError',
allow_regex => [qr{CREATE TEMP TABLE}],
deny_regex => '',
},
}
);
## Grab our full and shortened host name:
## Used for the host_safety_check as well as for emails
my $hostname = hostname;
my $shorthost = $hostname;
$shorthost =~ s/^(.+?)\..*/$1/;
## Items pulled from bucardo_config and shared everywhere:
our %config;
our %config_about;
## Set a default in case we call glog before we load the configs:
$config{log_level_number} = LOG_NORMAL;
## Sequence columns we care about and how to change them via ALTER:
my @sequence_columns = (
['last_value' => ''],
['start_value' => 'START WITH'],
['increment_by' => 'INCREMENT BY'],
['max_value' => 'MAXVALUE'],
['min_value' => 'MINVALUE'],
['is_cycled' => 'BOOL CYCLE'],
['is_called' => ''],
);
my $sequence_columns = join ',' => map { $_->[0] } @sequence_columns;
## Default statement chunk size in case config does not have it
my $default_statement_chunk_size = 10_000;
## Output messages per language
our %msg = (
'en' => {
'time-day' => q{day},
'time-days' => q{days},
'time-hour' => q{hour},
'time-hours' => q{hours},
'time-minute' => q{minute},
'time-minutes' => q{minutes},
'time-month' => q{month},
'time-months' => q{months},
'time-second' => q{second},
'time-seconds' => q{seconds},
'time-week' => q{week},
'time-weeks' => q{weeks},
'time-year' => q{year},
'time-years' => q{years},
},
'fr' => {
'time-day' => q{jour},
'time-days' => q{jours},
'time-hour' => q{heure},
'time-hours' => q{heures},
'time-minute' => q{minute},
'time-minutes' => q{minutes},
'time-month' => q{mois},
'time-months' => q{mois},
'time-second' => q{seconde},
'time-seconds' => q{secondes},
'time-week' => q{semaine},
'time-weeks' => q{semaines},
'time-year' => q{année},
'time-years' => q{années},
},
'de' => {
'time-day' => q{Tag},
'time-days' => q{Tag},
'time-hour' => q{Stunde},
'time-hours' => q{Stunden},
'time-minute' => q{Minute},
'time-minutes' => q{Minuten},
'time-month' => q{Monat},
'time-months' => q{Monate},
'time-second' => q{Sekunde},
'time-seconds' => q{Sekunden},
'time-week' => q{Woche},
'time-weeks' => q{Woche},
'time-year' => q{Jahr},
'time-years' => q{Jahr},
},
'es' => {
'time-day' => q{día},
'time-days' => q{días},
'time-hour' => q{hora},
'time-hours' => q{horas},
'time-minute' => q{minuto},
'time-minutes' => q{minutos},
'time-month' => q{mes},
'time-months' => q{meses},
'time-second' => q{segundo},
'time-seconds' => q{segundos},
'time-week' => q{semana},
'time-weeks' => q{semanas},
'time-year' => q{año},
'time-years' => q{años},
},
);
## use critic
## Figure out which language to use for output
our $lang = $ENV{LC_ALL} || $ENV{LC_MESSAGES} || $ENV{LANG} || 'en';
$lang = substr($lang,0,2);
##
## Everything else is subroutines
##
sub new {
## Create a new Bucardo object and return it
## Takes a hashref of options as the only argument
my $class = shift;
my $params = shift || {};
## The hash for this object, with default values:
my $self = {
created => scalar localtime,
mcppid => $$,
verbose => 1,
quickstart => 0,
logdest => ['.'],
warning_file => '',
logseparate => 0,
logextension => '',
logclean => 0,
dryrun => 0,
sendmail => 1,
extraname => '',
logprefix => 'BC!',
version => $VERSION,
listening => {},
pidmap => {},
exit_on_nosync => 0,
sqlprefix => "/* Bucardo $VERSION */",
};
## Add any passed-in parameters to our hash:
for (keys %$params) {
$self->{$_} = $params->{$_};
}
## Transform our hash into a genuine 'Bucardo' object:
bless $self, $class;
## Remove any previous log files if requested
if ($self->{logclean} && (my @dirs = grep {
$_ !~ /^(?:std(?:out|err)|none|syslog)/
} @{ $self->{logdest} }) ) {
## If the dir does not exists, silently proceed
for my $dir (@dirs) {
opendir my $dh, $dir or next;
## We look for any files that start with 'log.bucardo' plus another dot
for my $file (grep { /^log\.bucardo\./ } readdir $dh) {
my $fullfile = File::Spec->catfile( $dir => $file );
unlink $fullfile or warn qq{Could not remove "$fullfile": $!\n};
}
closedir $dh or warn qq{Could not closedir "$dir": $!\n};
}
}
## Zombie stopper
$SIG{CHLD} = 'IGNORE';
## Basically, dryrun does a rollback instead of a commit at the final sync step
## This is not 100% safe, if (for example) you have custom code that reaches
## outside the database to do things.
if (exists $ENV{BUCARDO_DRYRUN}) {
$self->{dryrun} = 1;
}
if ($self->{dryrun}) {
$self->glog(q{** DRYRUN - Syncs will not be committed! **}, LOG_WARN);
}
## This gets appended to the process description ($0)
if ($self->{extraname}) {
$self->{extraname} = " ($self->{extraname})";
}
## Connect to the main Bucardo database
$self->{masterdbh} = $self->connect_database();
## Load in the configuration information
$self->reload_config_database();
## Figure out if we are writing emails to a file
$self->{sendmail_file} = $ENV{BUCARDO_EMAIL_DEBUG_FILE} || $config{email_debug_file} || '';
## Where to store our PID:
$self->{pid_file} = File::Spec->catfile( $config{piddir} => 'bucardo.mcp.pid' );
## The file to ask all processes to stop:
$self->{stop_file} = File::Spec->catfile( $config{piddir} => $config{stopfile} );
## Send all log lines starting with "Warning" to a separate file
$self->{warning_file} ||= $config{warning_file};
## Make sure we are running where we are supposed to be
## This prevents items in bucardo.db that reference production
## systems from getting run on QA!
## ...or at least makes sure people have to work a lot harder
## to shoot themselves in the foot.
if (length $config{host_safety_check}) {
my $safe = $config{host_safety_check};
my $osafe = $safe;
my $ok = 0;
## Regular expression
if ($safe =~ s/^~//) {
$ok = 1 if $hostname =~ qr{$safe};
}
## Set of choices
elsif ($safe =~ s/^=//) {
for my $string (split /,/ => $safe) {
if ($hostname eq $string) {
$ok=1;
last;
}
}
}
## Simple string
elsif ($safe eq $hostname) {
$ok = 1;
}
if (! $ok) {
warn qq{Cannot start: configured to only run on "$osafe". This is "$hostname"\n};
warn qq{ This is usually done to prevent a configured Bucardo from running\n};
warn qq{ on the wrong host. Please verify the 'db' settings by doing:\n};
warn qq{bucardo list dbs\n};
warn qq{ Once you are sure the bucardo.db table has the correct values,\n};
warn qq{ you can adjust the 'host_safety_check' value\n};
exit 2;
}
}
return $self;
} ## end of new
sub start_mcp {
## Start the Bucardo daemon. Called by bucardo after setsid()
## Arguments: one
## 1. Arrayref of command-line options.
## Returns: never (exit 0 or exit 1)
my ($self, $opts) = @_;
## Store the original invocation string, then modify it
my $old0 = $0;
## May not work on all platforms, of course, but we're gonna try
$0 = "Bucardo Master Control Program v$VERSION.$self->{extraname}";
## Prefix all lines in the log file with this TLA (until overriden by a forked child)
$self->{logprefix} = 'MCP';
## If the standard pid file [from new()] already exists, cowardly refuse to run
if (-e $self->{pid_file}) {
## Grab the PID from the file if we can for better output
my $extra = '';
## Failing to open is not fatal here, just means no PID shown
my $oldpid;
if (open my $fh, '<', $self->{pid_file}) {
if (<$fh> =~ /(\d+)/) {
$oldpid = $1;
$extra = " (PID=$oldpid)";
}
close $fh or warn qq{Could not close "$self->{pid_file}": $!\n};
}
## Output to the logfile, to STDERR, then exit
if ($oldpid != $$) {
my $msg = qq{File "$self->{pid_file}" already exists$extra: cannot run until it is removed};
$self->glog($msg, LOG_WARN);
warn $msg;
exit 1;
}
}
## We also refuse to run if the global stop file exists
if (-e $self->{stop_file}) {
my $msg = qq{Cannot run while this file exists: "$self->{stop_file}"};
$self->glog($msg, LOG_WARN);
warn $msg;
## Failure to open this file is not fatal
if (open my $fh, '<', $self->{stop_file}) {
## Read in up to 10 lines from the stopfile and output them
while (<$fh>) {
$msg = "Line $.: $_";
$self->glog($msg, LOG_WARN);
warn $msg;
last if $. > 10;
}
close $fh or warn qq{Could not close "$self->{stop_file}": $!\n};
}
exit 1;
}
## We are clear to start. Output a quick hello and version to the logfile
$self->glog("Starting Bucardo version $VERSION", LOG_WARN);
$self->glog("Log level: $config{log_level}", LOG_WARN);
## Close unused file handles.
unless (grep { $_ eq 'stderr' } @{ $self->{logdest} }) {
close STDERR or warn "Could not close STDERR\n";
}
unless (grep { $_ eq 'stdout' } @{ $self->{logdest} }) {
close STDOUT or warn "Could not close STDOUT\n";
}
## Create a new (but very temporary) PID file
## We will overwrite later with a new PID once we do the initial fork
$self->create_mcp_pid_file($old0);
## Send an email message with details about this invocation
if ($self->{sendmail} or $self->{sendmail_file}) {
## Create a pretty Dumped version of the current $self object, with the password elided
## Squirrel away the old password
my $oldpass = $self->{dbpass};
## Set to something else
$self->{dbpass} = '<not shown>';
## Dump the entire object with Data::Dumper (with custom config variables)
my $dump = Dumper $self;
## Put the password back in place
$self->{dbpass} = $oldpass;
## Prepare to send an email letting people know we have started up
my $body = qq{
Master Control Program $$ was started on $hostname
Args: $old0
Version: $VERSION
};
my $subject = qq{Bucardo $VERSION started on $shorthost};
## If someone left a message in the reason file, append it, then delete the file
my $reason = get_reason('delete');
if ($reason) {
$body .= "Reason: $reason\n";
$subject .= " ($reason)";
}
## Strip leading whitespace from the body (from the qq{} above)
$body =~ s/^\s+//gsm;
## Send out the email (if sendmail or sendmail_file is enabled)
$self->send_mail({ body => "$body\n\n$dump", subject => $subject });
}
## Drop the existing database connection, fork, and get a new one
## This self-fork helps ensure our survival
my $disconnect_ok = 0;
eval {
## This connection was set in new()
$self->{masterdbh}->disconnect();
$disconnect_ok = 1;
};
$disconnect_ok or $self->glog("Warning! Disconnect failed $@", LOG_WARN);
my $seeya = fork;
if (! defined $seeya) {
die q{Could not fork mcp!};
}
## Immediately close the child process (one side of the fork)
if ($seeya) {
exit 0;
}
## Now that we've forked, overwrite the PID file with our new value
$self->create_mcp_pid_file($old0);
## Reconnect to the master database
($self->{mcp_backend}, $self->{masterdbh}) = $self->connect_database();
my $masterdbh = $self->{masterdbh};
## Let any listeners know we have gotten this far
## (We do this nice and early for impatient watchdog programs)
$self->db_notify($masterdbh, 'boot', 1);
## Store the function to use to generate clock timestamps
## We greatly prefer clock_timestamp,
## but fallback to timeofday() for 8.1 and older
$self->{mcp_clock_timestamp} =
$masterdbh->{pg_server_version} >= 80200
? 'clock_timestamp()'
: 'timeofday()::timestamptz';
## Start outputting some interesting things to the log
$self->show_db_version_and_time($masterdbh, $self->{mcp_backend}, 'Master DB ');
$self->glog("PID: $$", LOG_WARN);
$self->glog('Postgres library version: ' . $masterdbh->{pg_lib_version}, LOG_WARN);
$self->glog("bucardo: $old0", LOG_WARN);
$self->glog('Bucardo.pm: ' . $INC{'Bucardo.pm'}, LOG_WARN);
$self->glog((sprintf 'OS: %s Perl: %s %vd', $^O, $^X, $^V), LOG_WARN);
## Get an integer version of the DBD::Pg version, for later comparisons
if ($DBD::Pg::VERSION !~ /(\d+)\.(\d+)\.(\d+)/) {
die "Could not parse the DBD::Pg version: was $DBD::Pg::VERSION\n";
}
$self->{dbdpgversion} = int (sprintf '%02d%02d%02d', $1,$2,$3);
$self->glog((sprintf 'DBI version: %s DBD::Pg version: %s (%d) DBIx::Safe version: %s',
$DBI::VERSION,
$DBD::Pg::VERSION,
$self->{dbdpgversion},
$DBIx::Safe::VERSION),
LOG_WARN);
## Attempt to print the git hash to help with debugging if running a dev version
if (-d '.git') {
my $COM = 'git log -1';
my $log = '';
eval { $log = qx{$COM}; };
if ($log =~ /^commit ([a-f0-9]{40}).+Date:\s+(.+?)$/ms) {
$self->glog("Last git commit sha and date: $1 $2", LOG_NORMAL);
}
}
## Store some PIDs for later debugging use
$self->{pidmap}{$$} = 'MCP';
$self->{pidmap}{$self->{mcp_backend}} = 'Bucardo DB';
## Get the maximum key length of the "self" hash for pretty formatting
my $maxlen = 5;
for (keys %$self) {
$maxlen = length($_) if length($_) > $maxlen;
}
## Print each object, aligned, and show 'undef' for undefined values
## Yes, this prints things like HASH(0x8fbfc84), but we're okay with that
$Data::Dumper::Indent = 0;
$Data::Dumper::Terse = 1;
my $objdump = "Bucardo object:\n";
for my $key (sort keys %$self) {
my $value = $key eq 'dbpass' ? '<not shown>' : $self->{$key};
$objdump .= sprintf " %-*s => %s\n", $maxlen, $key,
(defined $value) ?
(ref $value eq 'ARRAY') ? Dumper($value)
: qq{'$value'} : 'undef';
}
$Data::Dumper::Indent = 1;
$Data::Dumper::Terse = 0;
$self->glog($objdump, LOG_TERSE);
## Dump all configuration variables to the log
$self->log_config();
## Any other files we find in the piddir directory should be considered old
## Thus, we can remove them
my $piddir = $config{piddir};
opendir my $dh, $piddir or die qq{Could not opendir "$piddir": $!\n};
## Nothing else should really be in here, but we will limit with a regex anyway
my @pidfiles = grep { /^bucardo.*\.pid$/ } readdir $dh;
closedir $dh or warn qq{Could not closedir "$piddir" $!\n};
## Loop through and remove each file found, making a note in the log
for my $pidfile (sort @pidfiles) {
my $fullfile = File::Spec->catfile( $piddir => $pidfile );
## Do not erase our own file
next if $fullfile eq $self->{pid_file};
## Everything else can get removed
if (-e $fullfile) {
if (unlink $fullfile) {
$self->glog("Warning: removed old pid file $fullfile", LOG_VERBOSE);
}
else {
## This will cause problems, but we will drive on
$self->glog("Warning: failed to remove pid file $fullfile", LOG_TERSE);
}
}
}
## We use a USR2 signal to indicate that the logs should be reopened
local $SIG{USR2} = sub {
$self->glog("Received USR2 from pid $$, who is a $self->{logprefix}", LOG_DEBUG);
## Go through and reopen anything that needs reopening
## For now, that is only plain text files
for my $logdest (sort keys %{$self->{logcodes}}) {
my $loginfo = $self->{logcodes}{$logdest};
next if $loginfo->{type} ne 'textfile';
my $filename = $loginfo->{filename};
## Reopen the same (named) file with a new filehandle
my $newfh;
if (! open $newfh, '>>', $filename) {
$self->glog("Warning! Unable to open new filehandle for $filename", LOG_WARN);
next;
}
## Turn off buffering on this handle
$newfh->autoflush(1);
## Overwrite the old sub and point to the new filehandle
my $oldfh = $loginfo->{filehandle};
$self->glog("Switching to new filehandle for log file $filename", LOG_NORMAL);
$loginfo->{code} = sub { print {$newfh} @_, $/ };
$self->glog("Completed reopen of file $filename", LOG_NORMAL);
## Close the old filehandle, then remove it from our records
close $oldfh or warn "Could not close old filehandle for $filename: $!\n";
$loginfo->{filehandle} = $newfh;
}
}; ## end of handling USR2 signals
## From this point forward, we want to die gracefully
## We setup our own subroutine to catch any die signals
local $SIG{__DIE__} = sub {
## Arguments: one
## 1. The error message
## Returns: never (exit 1 or exec new process)
my $msg = shift;
my $line = (caller)[2];
$self->glog("Warning: Killed (line $line): $msg", LOG_WARN);
## Was this a database problem?
## We can carefully handle certain classes of errors
if ($msg =~ /DBI|DBD/) {
## How many bad databases we found
my $bad = 0;
for my $db (sort keys %{ $self->{sdb} }) { ## need a better name!
if (! exists $self->{sdb}{$db}{dbh} ) {
$self->glog("Database $db has no database handle", LOG_NORMAL);
$bad++;
}
elsif (! $self->{sdb}{$db}{dbh}->ping()) {
$self->glog("Database $db failed ping check", LOG_NORMAL);
$msg = 'Ping failed';
$bad++;
}
}
if ($bad) {
my $changes = $self->check_sync_health();
if ($changes) {
## If we already made a MCP label, go there
## Else fallthrough and assume our bucardo.sync changes stick!
if ($self->{mcp_loop_started}) {
$self->glog('Going to restart the MCP loop, as syncs have changed', LOG_VERBOSE);
die 'We are going to redo the MCP loop'; ## goes to end of mcp main eval
}
}
}
}
## The error message determines if we try to resurrect ourselves or not
my $respawn = (
$msg =~ /DBI connect/ ## From DBI
or $msg =~ /Ping failed/ ## Set below
) ? 1 : 0;
## Sometimes we don't want to respawn at all (e.g. during some tests)
if (! $config{mcp_dbproblem_sleep}) {
$self->glog('Database problem, but will not attempt a respawn due to mcp_dbproblem_sleep=0', LOG_TERSE);
$respawn = 0;
}
## Create some output for the mail message
my $diesubject = "Bucardo MCP $$ was killed";
my $diebody = "MCP $$ was killed: $msg";
## Most times we *do* want to respawn
if ($respawn) {
$self->glog("Database problem, will respawn after a short sleep: $config{mcp_dbproblem_sleep}", LOG_TERSE);
$diebody .= " (will attempt respawn in $config{mcp_dbproblem_sleep} seconds)";
$diesubject .= ' (respawning)';
}
## Callers can prevent an email being sent by setting this before they die
if (! $self->{clean_exit}) {
$self->send_mail({ body => $diebody, subject => $diesubject });
}
## Kill kids, remove pidfile, update tables, etc.
$self->cleanup_mcp("Killed: $msg");
## If we are not respawning, simply exit right now
exit 1 if ! $respawn;
## We will attempt a restart, but sleep a while first to avoid constant restarts
$self->glog("Sleep time: $config{mcp_dbproblem_sleep}", LOG_TERSE);
sleep($config{mcp_dbproblem_sleep});
## Do a quick check for a stopfile
## Bail if the stopfile exists
if (-e $self->{stop_file}) {
$self->glog(qq{Found stopfile "$self->{stop_file}": exiting}, LOG_WARN);
my $message = 'Found stopfile';
## Grab the reason, if it exists, so we can propagate it onward
my $mcpreason = get_reason(0);
if ($mcpreason) {
$message .= ": $mcpreason";
}
## Stop controllers, disconnect, remove PID file, etc.
$self->cleanup_mcp("$message\n");
$self->glog('Exiting', LOG_WARN);
exit 0;
}
## We assume this is bucardo, and that we are in same directory as when called
my $RUNME = $old0;
## Check to see if $RUNME is executable as is, before we assume we're in the same directory
if (! -x $RUNME) {
$RUNME = "./$RUNME" if index ($RUNME,'.') != 0;
}
my $mcpreason = 'Attempting automatic respawn after MCP death';
$self->glog("Respawn attempt: $RUNME @{ $opts } start '$mcpreason'", LOG_TERSE);
## Replace ourselves with a new process running this command
{ exec $RUNME, @{ $opts }, 'start', $mcpreason };
$self->glog("Could not exec $RUNME: $!", LOG_WARN);
}; ## end SIG{__DIE__} handler sub
## This resets listeners, kills kids, and loads/activates syncs
my $active_syncs = $self->reload_mcp();
if (!$active_syncs && $self->{exit_on_nosync}) {
## No syncs means no reason for us to hang around, so we exit
$self->glog('No active syncs were found, so we are exiting', LOG_WARN);
$self->db_notify($masterdbh, 'nosyncs', 1);
$self->cleanup_mcp('No active syncs');
exit 1;
}
## Report which syncs are active
$self->glog("Active syncs: $active_syncs", LOG_TERSE);
## We want to reload everything if someone HUPs us
local $SIG{HUP} = sub {
$self->reload_mcp();
};
## We need KIDs to tell us their PID so we can deregister them
$self->{kidpidlist} = {};
## Let any listeners know we have gotten this far
$self->db_notify($masterdbh, 'started', 1);
## For optimization later on, we need to know which syncs are 'fullcopy'
for my $syncname (keys %{ $self->{sync} }) {
my $s = $self->{sync}{$syncname};
## Skip inactive or paused syncs
next if !$s->{mcp_active} or $s->{paused};
## Walk through each database and check the roles, discarding inactive dbs
my %rolecount;
for my $db (values %{ $s->{db} }) {
next if $db->{status} ne 'active';
$rolecount{$db->{role}}++;
}
## Default to being fullcopy
$s->{fullcopy} = 1;
## We cannot be a fullcopy sync if:
if ($rolecount{'target'} ## there are any target dbs
or $rolecount{'source'} > 1 ## there is more than one source db
or ! $rolecount{'fullcopy'}) { ## there are no fullcopy dbs
$s->{fullcopy} = 0;
}
}
## Because a sync may have gotten a notice while we were down,
## we auto-kick all eligible syncs
## We also need to see if we can prevent the VAC daemon from running,
## if there are no databases with bucardo schemas
$self->{needsvac} = 0;
for my $syncname (keys %{ $self->{sync} }) {
my $s = $self->{sync}{$syncname};
## Default to starting in a non-kicked mode
$s->{kick_on_startup} = 0;
## Skip inactive or paused syncs
next if !$s->{mcp_active} or $s->{paused};
## Skip fullcopy syncs
next if $s->{fullcopy};
## Right now, the vac daemon is only useful for source Postgres databases
## Of course, it is not needed for fullcopy syncs
for my $db (values %{ $s->{db} }) {
if ($db->{status} eq 'active'
and $db->{dbtype} eq 'postgres'
and $db->{role} eq 'source') {
## We need to increment it for any matches in sdb, regardless of which sync initially set it!
$self->{sdb}{ $db->{name} }{needsvac} = 2;
$self->{needsvac} = 1;
}
}
## Skip if autokick is false
next if ! $s->{autokick};
## Kick it!
$s->{kick_on_startup} = 1;
}
## Start the main loop
{
my $value = $self->mcp_main();
redo if $value;
}
return; ## no critic
} ## end of start_mcp
sub create_mcp_pid_file {
## Create a file containing the PID of the current MCP,
## plus a few other details
## Arguments: one
## 1. Message (usually just the original invocation line)
## Returns: undef
my $self = shift;
my $message = shift || '';
open my $pidfh, '>', $self->{pid_file}
or die qq{Cannot write to $self->{pid_file}: $!\n};
## Inside our newly created PID file, print out PID on the first line
## - print how the script was originally invoked on the second line (old $0),
## - print the current time on the third line
my $now = scalar localtime;
print {$pidfh} "$$\n$message\n$now\n";
close $pidfh or warn qq{Could not close "$self->{pid_file}": $!\n};
return;
} ## end of create_mcp_pid_file
sub mcp_main {
## The main MCP process
## Arguments: none
## Returns: undef (but almost always just exits with 0 or 1)
my $self = shift;
my $maindbh = $self->{masterdbh};
my $sync = $self->{sync};
## Used to gather up and handle any notices received via the listen/notify system
my $notice;
## Used to keep track of the last time we pinged the databases
my $lastpingcheck = 0;
## Keep track of how long since we checked on the VAC daemon
my $lastvaccheck = 0;
$self->glog('Entering main loop', LOG_TERSE);
$self->{mcp_loop_started} = 1;
MCP: {
## We eval the whole loop so we can cleanly redo it if needed
eval {
## Bail if the stopfile exists
if (-e $self->{stop_file}) {
$self->glog(qq{Found stopfile "$self->{stop_file}": exiting}, LOG_WARN);
my $msg = 'Found stopfile';
## Grab the reason, if it exists, so we can propagate it onward
my $mcpreason = get_reason(0);
if ($mcpreason) {
$msg .= ": $mcpreason";
}
## Stop controllers, disconnect, remove PID file, etc.
$self->cleanup_mcp("$msg\n");
$self->glog('Exiting', LOG_WARN);
exit 0;
}
## Startup the VAC daemon as needed
## May be off via user configuration, or because of no valid databases
if ($config{bucardo_vac} and $self->{needsvac}) {
## Check on it occasionally (different than the running time)
if (time() - $lastvaccheck >= $config{mcp_vactime}) {
## Is it alive? If not, spawn
my $pidfile = "$config{piddir}/bucardo.vac.pid";
if (! -e $pidfile) {
$self->fork_vac();
}
$lastvaccheck = time();
} ## end of time to check vac
} ## end if bucardo_vac
## Every once in a while, make sure our database connections are still there
if (time() - $lastpingcheck >= $config{mcp_pingtime}) {
## This message must have "Ping failed" to match the $respawn above
$maindbh->ping or die qq{Ping failed for main database!\n};
## Check each (pingable) remote database in undefined order
for my $dbname (keys %{ $self->{sdb} }) {
my $d = $self->{sdb}{$dbname};
next if $d->{dbtype} =~ /flat|mongo|redis/o;
my $try_reconnect = 0;
if ($d->{status} eq 'stalled') {
$self->glog("Trying to connect to stalled database $dbname", LOG_VERBOSE);
$try_reconnect = 1;
}
elsif (! $d->{dbh}->ping) {
$self->glog("Ping failed for database $dbname, trying to reconnect", LOG_NORMAL);
}
if ($try_reconnect) {
## Sleep a hair so we don't reloop constantly
sleep 0.5;
undef $d->{backend};
{
local $SIG{__DIE__} = 'IGNORE';
eval {
($d->{backend}, $d->{dbh}) = $self->connect_database($dbname);
};
}
if (defined $d->{backend}) {
$self->show_db_version_and_time($d->{dbh}, $d->{backend}, qq{Database "$dbname" });
$d->{status} = 'active'; ## In case it was stalled
}
else {
$self->glog("Unable to reconnect to database $dbname!", LOG_WARN);
## We may want to throw an exception if this keeps happening
## We may also want to adjust lastpingcheck so we check more often
}
}
}
## Reset our internal counter to 'now'
$lastpingcheck = time();
} ## end of checking database connections
## Add in any messages from the main database and reset the notice hash
## Ignore things we may have sent ourselves
$notice = $self->db_get_notices($maindbh, $self->{mcp_backend});
## Add in any messages from each remote database
for my $dbname (keys %{ $self->{sdb} }) {
my $d = $self->{sdb}{$dbname};
next if $d->{dbtype} ne 'postgres';
next if $d->{status} eq 'stalled';
my $nlist = $self->db_get_notices($d->{dbh});
$d->{dbh}->rollback();
for my $name (keys %{ $nlist } ) {
if (! exists $notice->{$name}) {
$notice->{$name} = $nlist->{$name};
}
else {
for my $pid (keys %{ $nlist->{$name}{pid} }) {
$notice->{$name}{pid}{$pid}++;
}
}
}
}
## Handle each notice one by one
for my $name (sort keys %{ $notice }) {
my $npid = $notice->{$name}{firstpid};
## Request to stop everything
if ('mcp_fullstop' eq $name) {
$self->glog("Received full stop notice from PID $npid, leaving", LOG_TERSE);
$self->cleanup_mcp("Received stop NOTICE from PID $npid");
exit 0;
}
## Request that a named sync get kicked
elsif ($name =~ /^kick_sync_(.+)/o) {
my $syncname = $1;
## Prepare to send some sort of log message
my $msg = '';
## We will not kick if this sync does not exist or it is inactive
if (! exists $self->{sync}{$syncname}) {
$msg = qq{Warning: Unknown sync to be kicked: "$syncname"\n};
}
elsif (! $self->{sync}{$syncname}{mcp_active}) {
$msg = qq{Cannot kick inactive sync "$syncname"};
}
elsif ($self->{sync}{$syncname}{paused}) {
$msg = qq{Cannot kick paused sync "$syncname"};
}
## We also won't kick if this was created by a kid
## This can happen as our triggerkicks may be set to 'always'
elsif (exists $self->{kidpidlist}{$npid}) {
$self->glog(qq{Not kicking sync "$syncname" as it came from KID $npid}, LOG_DEBUG);
}
else {
## Kick it!
$sync->{$syncname}{kick_on_startup} = 1;
}
if ($msg) {
$self->glog($msg, $msg =~ /Unknown/ ? LOG_TERSE : LOG_VERBOSE);
## As we don't want people to wait around for a syncdone...
$self->db_notify($maindbh, "syncerror_$syncname", 1);
}
}
## A sync has finished
elsif ($name =~ /^syncdone_(.+)/o) {
my $syncdone = $1;
$self->glog("Sync $syncdone has finished", LOG_DEBUG);
## Echo out to anyone listening
$self->db_notify($maindbh, $name, 1);
## If this was a onetimecopy sync, flip it off
$sync->{$syncdone}{onetimecopy} = 0;
}
## A sync has been killed
elsif ($name =~ /^synckill_(.+)/o) {
my $syncdone = $1;
$self->glog("Sync $syncdone has been killed", LOG_DEBUG);
## Echo out to anyone listening
$self->db_notify($maindbh, $name, 1);
## Check on the health of our databases, in case that was the reason the sync was killed
$self->check_sync_health();
}
## Request to pause a sync
elsif ($name =~ /^pause_sync_(.+)/o) {
my $syncname = $1;
my $msg;
## We will not pause if this sync does not exist or it is inactive
if (! exists $self->{sync}{$syncname}) {
$msg = qq{Warning: Unknown sync to be paused: "$syncname"\n};
}
elsif (! $self->{sync}{$syncname}{mcp_active}) {
$msg = qq{Cannot pause inactive sync "$syncname"};
}
else {
## Mark it as paused, stop the kids and controller
$sync->{$syncname}{paused} = 1;
my $stopsync = "stopsync_$syncname";
$self->db_notify($maindbh, "kid_$stopsync");
$self->db_notify($maindbh, "ctl_$stopsync");
$maindbh->commit();
$self->glog(qq{Set sync "$syncname" as paused}, LOG_VERBOSE);
}
if (defined $msg) {
$self->glog($msg, LOG_TERSE);
}
}
## Request to resume a sync
elsif ($name =~ /^resume_sync_(.+)/o) {
my $syncname = $1;
my $msg;
## We will not resume if this sync does not exist or it is inactive
if (! exists $self->{sync}{$syncname}) {
$msg = qq{Warning: Unknown sync to be resumed: "$syncname"\n};
}
elsif (! $self->{sync}{$syncname}{mcp_active}) {
$msg = qq{Cannot resume inactive sync "$syncname"};
}
else {
## Mark it as resumed
my $s = $sync->{$syncname};
$s->{paused} = 0;
## Since we may have accumulated deltas while pasued, set to autokick if needed
if (!$s->{fullcopy} and $s->{autokick}) {
$s->{kick_on_startup} = 1;
}
$self->glog(qq{Set sync "$syncname" as resumed}, LOG_VERBOSE);
## MCP will restart the CTL on next loop around
}
if (defined $msg) {
$self->glog($msg, LOG_TERSE);
}
}
## Request to reload the configuration file
elsif ('reload_config' eq $name) {
$self->glog('Reloading configuration table', LOG_TERSE);
$self->reload_config_database();
## Output all values to the log file again
$self->log_config();
## We need to reload ourself as well
## XXX Not needed for some items! e.g. mcp_pingtime
$self->reload_mcp();
## Let anyone listening know we are done
$self->db_notify($maindbh, 'reload_config_finished', 1);
}
## Request to reload the MCP
elsif ('mcp_reload' eq $name) {
$self->glog('Reloading MCP', LOG_TERSE);
$self->reload_mcp();
## Let anyone listening know we are done
$self->db_notify($maindbh, 'reloaded_mcp', 1);
}
## Request for a ping via listen/notify
elsif ('mcp_ping' eq $name) {
$self->glog("Got a ping from PID $npid, issuing pong", LOG_DEBUG);
$self->db_notify($maindbh, 'mcp_pong', 1);
}
## Request that we parse and empty the log message table
elsif ('log_message' eq $name) {
$self->glog('Checking for log messages', LOG_DEBUG);
$SQL = 'SELECT msg,cdate FROM bucardo_log_message ORDER BY cdate';
my $sth = $maindbh->prepare_cached($SQL);
$count = $sth->execute();
if ($count ne '0E0') {
for my $row (@{$sth->fetchall_arrayref()}) {
$self->glog("MESSAGE ($row->[1]): $row->[0]", LOG_TERSE);
}
$maindbh->do('DELETE FROM bucardo_log_message');
$maindbh->commit();
}
else {
$sth->finish();
}
}
## Request that a named sync get reloaded
elsif ($name =~ /^reload_sync_(.+)/o) {
my $syncname = $1;
my $succeeded = 0;
## Skip if the sync does not exist or is inactive
if (! exists $sync->{$syncname}) {
$self->glog(qq{Invalid sync reload: "$syncname"}, LOG_TERSE);
}
elsif (!$sync->{$syncname}{mcp_active}) {
$self->glog(qq{Cannot reload: sync "$syncname" is not active}, LOG_TERSE);
}
else {
## reload overrides a pause
if ($sync->{$syncname}{paused}) {
$self->glog(qq{Resuming paused sync "$syncname"}, LOG_TERSE);
$sync->{$syncname}{paused} = 0;
}
$self->glog(qq{Deactivating sync "$syncname"}, LOG_TERSE);
$self->deactivate_sync($sync->{$syncname});
## Reread from the database
$SQL = q{SELECT *, }
. q{COALESCE(EXTRACT(epoch FROM checktime),0) AS checksecs, }
. q{COALESCE(EXTRACT(epoch FROM lifetime),0) AS lifetimesecs }
. q{FROM bucardo.sync WHERE name = ?};
my $sth = $maindbh->prepare($SQL);
$count = $sth->execute($syncname);
if ($count eq '0E0') {
$sth->finish();
$self->glog(qq{Warning! Cannot reload sync "$syncname": no longer in the database!}, LOG_WARN);
$maindbh->commit();
next; ## Handle the next notice
}
## XXX: Actually do a full disconnect and redo all the items in here
my $info = $sth->fetchall_arrayref({})->[0];
$maindbh->commit();
## Only certain things can be changed "on the fly"
for my $val (qw/checksecs stayalive deletemethod status autokick
analyze_after_copy vacuum_after_copy targetgroup targetdb
onetimecopy lifetimesecs maxkicks rebuild_index
conflict_strategy/) {
$sync->{$syncname}{$val} = $self->{sync}{$syncname}{$val} = $info->{$val};
}
## XXX: Todo: Fix those double assignments
## Empty all of our custom code arrays
for my $key (grep { /^code_/ } sort keys %{ $self->{sync}{$syncname} }) {
$sync->{$syncname}{$key} = $self->{sync}{$syncname}{$key} = [];
}
sleep 2; ## XXX TODO: Actually wait somehow, perhaps fork
$self->glog("Reactivating sync $syncname", LOG_TERSE);
$sync->{$syncname}{mcp_active} = 0;
if (! $self->activate_sync($sync->{$syncname})) {
$self->glog(qq{Warning! Reactivation of sync "$syncname" failed}, LOG_WARN);
}
else {
## Let anyone listening know the sync is now ready
$self->db_notify($maindbh, "reloaded_sync_$syncname", 1);
$succeeded = 1;
}
$maindbh->commit();
$self->glog("Succeeded: $succeeded", LOG_WARN);
}
$self->db_notify($maindbh, "reload_error_sync_$syncname", 1)
if ($succeeded != 1);
}
## Request that a named sync get activated
elsif ($name =~ /^activate_sync_(.+)/o) {
my $syncname = $1;
if (! exists $sync->{$syncname}) {
$self->glog(qq{Invalid sync activation: "$syncname"}, LOG_TERSE);
}
elsif ($sync->{$syncname}{mcp_active}) {
$self->glog(qq{Sync "$syncname" is already activated}, LOG_TERSE);
$self->db_notify($maindbh, "activated_sync_$syncname", 1);
}
elsif ($self->activate_sync($sync->{$syncname})) {
$sync->{$syncname}{mcp_active} = 1;
## Just in case:
$sync->{$syncname}{paused} = 0;
$maindbh->do(
'UPDATE sync SET status = ? WHERE name = ?',
undef, 'active', $syncname
);
}
}
## Request that a named sync get deactivated
elsif ($name =~ /^deactivate_sync_(.+)/o) {
my $syncname = $1;
if (! exists $sync->{$syncname}) {
$self->glog(qq{Invalid sync "$syncname"}, LOG_TERSE);
}
elsif (! $sync->{$syncname}{mcp_active}) {
$self->glog(qq{Sync "$syncname" is already deactivated}, LOG_TERSE);
$self->db_notify($maindbh, "deactivated_sync_$syncname", 1);
}
elsif ($self->deactivate_sync($sync->{$syncname})) {
$sync->{$syncname}{mcp_active} = 0;
$maindbh->do(
'UPDATE sync SET status = ? WHERE name = ?',
undef, 'inactive', $syncname
);
}
}
# Serialization/deadlock problems; now the child is gonna sleep.
elsif ($name =~ /^syncsleep_(.+)/o) {
my $syncname = $1;
$self->glog("Sync $syncname could not serialize, will sleep", LOG_DEBUG);
## Echo out to anyone listening
$self->db_notify($maindbh, $name, 1);
}
## A kid reporting in. We just store the PID
elsif ('kid_pid_start') {
for my $lpid (keys %{ $notice->{$name}{pid} }) {
$self->{kidpidlist}{$lpid} = 1;
}
}
## A kid leaving. We remove the stored PID.
elsif ('kid_pid_stop') {
for my $lpid (keys %{ $notice->{$name}{pid} }) {
delete $self->{kidpidlist}{$lpid};
}
}
## Someone giving us a hint that a database may be down
elsif ($name =~ /dead_db_(.+)/) {
my $dbname = $1;
$self->glog(qq{Got a hint that database "$dbname" may be down. Let's check it out!}, LOG_NORMAL);
my $changes = $self->check_sync_health($dbname);
}
## Should not happen, but let's at least log it
else {
$self->glog("Warning: received unknown message $name from $npid!", LOG_TERSE);
}
} ## end each notice
$maindbh->commit();
## Just in case this changed behind our back:
$sync = $self->{sync};
## Startup controllers for all eligible syncs
SYNC: for my $syncname (keys %$sync) {
my $s = $sync->{$syncname};
## Skip if this sync has not been activated
next if ! $s->{mcp_active};
## Skip if this one is paused
next if $s->{paused};
## Skip is this one is stalled
next if $s->{status} eq 'stalled';
## If this is not a stayalive, AND is not being kicked, skip it
next if ! $s->{stayalive} and ! $s->{kick_on_startup};
## If this is a fullcopy sync, skip unless it is being kicked
next if $s->{fullcopy} and ! $s->{kick_on_startup};
## If this is a previous stayalive, see if it is active, kick if needed
if ($s->{stayalive} and $s->{controller}) {
$count = kill 0 => $s->{controller};
## If kill 0 returns nothing, the controller is gone, so create a new one
if (! $count) {
$self->glog("Could not find controller $s->{controller}, will create a new one. Kicked is $s->{kick_on_startup}", LOG_TERSE);
$s->{controller} = 0;
}
else { ## Presume it is alive and listening to us, restart and kick as needed
if ($s->{kick_on_startup}) {
## See if controller needs to be killed, because of time limit or job count limit
my $restart_reason = '';
## We can kill and restart a controller after a certain number of kicks
if ($s->{maxkicks} > 0 and $s->{ctl_kick_counts} >= $s->{maxkicks}) {
$restart_reason = "Total kicks ($s->{ctl_kick_counts}) >= limit ($s->{maxkicks})";
}
## We can kill and restart a controller after a certain amount of time
elsif ($s->{lifetimesecs} > 0) {
my $thistime = time();
my $timediff = $thistime - $s->{start_time};
if ($thistime - $s->{start_time} > $s->{lifetimesecs}) {
$restart_reason = "Time is $timediff, limit is $s->{lifetimesecs} ($s->{lifetime})";
}
}
if ($restart_reason) {
## Kill and restart controller
$self->glog("Restarting controller for sync $syncname. $restart_reason", LOG_TERSE);
kill $signumber{USR1} => $s->{controller};
## Create a new controller
$self->fork_controller($s, $syncname);
}
else {
## Perform the kick
my $notify = "ctl_kick_$syncname";
$self->db_notify($maindbh, $notify);
$self->glog(qq{Sent a kick to controller $s->{controller} for sync "$syncname"}, LOG_VERBOSE);
}
## Reset so we don't kick the next round
$s->{kick_on_startup} = 0;
## Track how many times we've kicked
$s->{ctl_kick_counts}++;
}
next SYNC;
}
}
## At this point, we are either:
## 1. Not a stayalive
## 2. A stayalive that has not been run yet
## 3. A stayalive that has been run but is not responding
## Make sure there is nothing out there already running
my $syncname = $s->{name};
my $pidfile = "$config{piddir}/bucardo.ctl.sync.$syncname.pid";
if ($s->{mcp_changed}) {
$self->glog(qq{Checking for existing controllers for sync "$syncname"}, LOG_VERBOSE);
}
if (-e $pidfile and ! $s->{mcp_problemchild}) {
$self->glog("File exists staylive=$s->{stayalive} controller=$s->{controller}", LOG_TERSE);
my $pid;
if (!open $pid, '<', $pidfile) {
$self->glog(qq{Warning: Could not open file "$pidfile": $!}, LOG_WARN);
$s->{mcp_problemchild} = 1;
next SYNC;
}
my $oldpid = <$pid>;
chomp $oldpid;
close $pid or warn qq{Could not close "$pidfile": $!\n};
## We don't need to know about this every time
if ($s->{mcp_changed}) {
$self->glog(qq{Found previous controller $oldpid from "$pidfile"}, LOG_TERSE);
}
if ($oldpid !~ /^\d+$/) {
$self->glog(qq{Warning: Invalid pid found inside of file "$pidfile" ($oldpid)}, LOG_WARN);
$s->{mcp_changed} = 0;
$s->{mcp_problemchild} = 2;
next SYNC;
}
## Is it still alive?
$count = kill 0 => $oldpid;
if ($count==1) {
if ($s->{mcp_changed}) {
$self->glog(qq{Skipping sync "$syncname", seems to be already handled by $oldpid}, LOG_VERBOSE);
## Make sure this kid is still running
$count = kill 0 => $oldpid;
if (!$count) {
$self->glog(qq{Warning! PID $oldpid was not found. Removing PID file}, LOG_WARN);
unlink $pidfile or $self->glog("Warning! Failed to unlink $pidfile", LOG_WARN);
$s->{mcp_problemchild} = 3;
next SYNC;
}
$s->{mcp_changed} = 0;
}
if (! $s->{stayalive}) {
$self->glog(qq{Non stayalive sync "$syncname" still active - sending it a notify}, LOG_NORMAL);
}
my $notify = "ctl_kick_$syncname";
$self->db_notify($maindbh, $notify);
$s->{kick_on_startup} = 0;
next SYNC;
}
$self->glog("No active pid $oldpid found. Killing just in case, and removing file", LOG_TERSE);
$self->kill_bucardo_pid($oldpid => 'normal');
unlink $pidfile or $self->glog("Warning! Failed to unlink $pidfile", LOG_WARN);
$s->{mcp_changed} = 1;
} ## end if pidfile found for this sync
## We may have found an error in the pid file detection the first time through
$s->{mcp_problemchild} = 0;
## Fork off the controller, then clean up the $s hash
$self->{masterdbh}->commit();
$self->fork_controller($s, $syncname);
$s->{kick_on_startup} = 0;
$s->{mcp_changed} = 1;
} ## end each sync
sleep $config{mcp_loop_sleep};
redo MCP;
}; # end of eval
## We may want to redo if the error was not *that* fatal
if ($@ =~ /redo/) {
$self->glog('Going to restart the main MCP loop', LOG_VERBOSE);
redo MCP;
}
} ## end of MCP loop
return;
} ## end of mcp_main
sub check_sync_health {
## Check every database used by a sync
## Typically called on demand when we know something is wrong
## Marks any unreachable databases, and their syncs, as stalled
## Arguments: zero or one
## 1. Optional name of database to hone in on
## Returns: number of bad databases detected
my $self = shift;
my $dbnamematch = shift || '';
$self->glog('Starting check_sync_health', LOG_NORMAL);
## How many bad databases did we find?
my $bad_dbs = 0;
## No need to check databases more than once, as they can span across syncs
my $db_checked = {};
## Do this at the sync level, rather than 'sdb', as we don't
## want to check non-active syncs at all
SYNC: for my $syncname (sort keys %{ $self->{sync} }) {
my $sync = $self->{sync}{$syncname};
if ($sync->{status} ne 'active') {
$self->glog("Skipping $sync->{status} sync $syncname", LOG_NORMAL);
next SYNC;
}
## Walk through each database used by this sync
DB: for my $dbname (sort keys %{ $sync->{db} }) {
## Only check each database (by name) once
next if $db_checked->{$dbname}++;
## If limiting to a single database, only check that one
next if $dbnamematch and $dbnamematch ne $dbname;
$self->glog("Checking database $dbname for sync $syncname", LOG_DEBUG);
my $dbinfo = $sync->{db}{$dbname};
## We only bother checking ones that are currently active
if ($dbinfo->{status} ne 'active') {
$self->glog("Skipping $dbinfo->{status} database $dbname for sync $syncname", LOG_NORMAL);
next DB;
}
## Is this database valid or not?
my $isbad = 0;
my $dbh = $dbinfo->{dbh};
if (! ref $dbh) {
$self->glog("Database handle for database $dbname does not look valid", LOG_NORMAL);
if ($dbinfo->{dbtype} eq 'postgres') {
$isbad = 1;
}
else {
## TODO: Account for other non dbh types
next DB;
}
}
elsif (! $dbh->ping()) {
$isbad = 1;
$self->glog("Database $dbname failed ping", LOG_NORMAL);
}
## If not marked as bad, assume good and move on
next DB unless $isbad;
## Retry connection afresh: wrap in eval as one of these is likely to fail!
undef $dbinfo->{dbh};
eval {
($dbinfo->{backend}, $dbinfo->{dbh}) = $self->connect_database($dbname);
$self->show_db_version_and_time($dbinfo->{dbh}, $dbinfo->{backend}, qq{Database "$dbname" });
};
## If we cannot connect, mark the db (and the sync) as stalled
if (! defined $dbinfo->{dbh}) {
$self->glog("Database $dbname is unreachable, marking as stalled", LOG_NORMAL);
$dbinfo->{status} = 'stalled';
$bad_dbs++;
if ($sync->{status} ne 'stalled') {
$self->glog("Marked sync $syncname as stalled", LOG_NORMAL);
$sync->{status} = 'stalled';
$SQL = 'UPDATE bucardo.sync SET status = ? WHERE name = ?';
eval {
my $sth = $self->{masterdbh}->prepare($SQL);
$sth->execute('stalled',$syncname);
};
if ($@) {
$self->glog("Failed to set sync $syncname as stalled: $@", LOG_WARN);
$self->{masterdbh}->rollback();
}
}
$SQL = 'UPDATE bucardo.db SET status = ? WHERE name = ?';
my $sth = $self->{masterdbh}->prepare($SQL);
eval {
$sth->execute('stalled',$dbname);
$self->{masterdbh}->commit();
};
if ($@) {
$self->glog("Failed to set db $dbname as stalled: $@", LOG_WARN);
$self->{masterdbh}->rollback();
}
}
} ## end each database in this sync
} ## end each sync
## If any databases were marked as bad, go ahead and stall other syncs that are using them
## (todo)
return $bad_dbs;
} ## end of check_sync_health
sub restore_syncs {
## Try to restore stalled syncs by checking its stalled databases
## Arguments: none
## Returns: number of syncs restored
my $self = shift;
$self->glog('Starting restore_syncs', LOG_DEBUG);
## How many syncs did we restore?
my $restored_syncs = 0;
## No need to check databases more than once, as they can span across syncs
my $db_checked = {};
## If a sync is stalled, check its databases
SYNC: for my $syncname (sort keys %{ $self->{sync} }) {
my $sync = $self->{sync}{$syncname};
next SYNC if $sync->{status} ne 'stalled';
$self->glog("Checking stalled sync $syncname", LOG_DEBUG);
## Number of databases restored for this sync only
my $restored_dbs = 0;
## Walk through each database used by this sync
DB: for my $dbname (sort keys %{ $sync->{db} }) {
## Only check each database (by name) once
next if $db_checked->{$dbname}++;
$self->glog("Checking database $dbname for sync $syncname", LOG_DEBUG);
my $dbinfo = $sync->{db}{$dbname};
## All we need to worry about are stalled ones
next DB if $dbinfo->{status} ne 'stalled';
## Just in case, remove the database handle
undef $dbinfo->{dbh};
eval {
($dbinfo->{backend}, $dbinfo->{dbh}) = $self->connect_database($dbname);
$self->show_db_version_and_time($dbinfo->{dbh}, $dbinfo->{backend}, qq{Database "$dbname" });
};
if (defined $dbinfo->{dbh}) {
$dbinfo->{status} = 'active';
$SQL = 'UPDATE bucardo.db SET status = ? WHERE name = ?';
my $sth = $self->{masterdbh}->prepare($SQL);
$sth->execute('active',$dbname);
$self->{masterdbh}->commit();
$restored_dbs++;
$self->glog("Sucessfully restored database $dbname: no longer stalled", LOG_NORMAL);
}
} ## end each database
## If any databases were restored, restore the sync too
if ($restored_dbs) {
$sync->{status} = 'stalled';
$SQL = 'UPDATE bucardo.sync SET status = ? WHERE name = ?';
my $sth = $self->{masterdbh}->prepare($SQL);
$sth->execute('active',$syncname);
$self->{masterdbh}->commit();
$restored_syncs++;
$self->glog("Sucessfully restored sync $syncname: no longer stalled", LOG_NORMAL);
}
} ## end each sync
return $restored_syncs;
} ## end of restore_syncs
sub start_controller {
## For a particular sync, does all the listening and creation of KIDs
## aka the CTL process
## Why not just spawn KIDs? Someday the CTL may have multiple kids again...
## Arguments: one
## 1. Hashref of sync information
## Returns: never
our ($self,$sync) = @_;
$self->{ctlpid} = $$;
$self->{syncname} = $sync->{name};
## Prefix all log lines with this TLA (was MCP)
$self->{logprefix} = 'CTL';
## Extract some of the more common items into local vars
my ($syncname,$kidsalive,$dbinfo, $kicked,) = @$sync{qw(
name kidsalive dbs kick_on_startup)};
## Set our process name
$0 = qq{Bucardo Controller.$self->{extraname} Sync "$syncname" for relgroup "$sync->{herd}" to dbs "$sync->{dbs}"};
## Upgrade any specific sync configs to global configs
if (exists $config{sync}{$syncname}) {
while (my ($setting, $value) = each %{$config{sync}{$syncname}}) {
$config{$setting} = $value;
$self->glog("Set sync-level config setting $setting: $value", LOG_TERSE);
}
}
## Store our PID into a file
## Save the complete returned name for later cleanup
$self->{ctlpidfile} = $self->store_pid( "bucardo.ctl.sync.$syncname.pid" );
## Start normal log output for this controller: basic facts
my $msg = qq{New controller for sync "$syncname". Relgroup is "$sync->{herd}", dbs is "$sync->{dbs}". PID=$$};
$self->glog($msg, LOG_TERSE);
## Log some startup information, and squirrel some away for later emailing
my $mailmsg = "$msg\n";
$msg = qq{ stayalive: $sync->{stayalive} checksecs: $sync->{checksecs} kicked: $kicked};
$self->glog($msg, LOG_NORMAL);
$mailmsg .= "$msg\n";
$msg = sprintf q{ kidsalive: %s onetimecopy: %s lifetimesecs: %s (%s) maxkicks: %s},
$kidsalive,
$sync->{onetimecopy},
$sync->{lifetimesecs},
$sync->{lifetime} || 'NULL',
$sync->{maxkicks};
$self->glog($msg, LOG_NORMAL);
$mailmsg .= "$msg\n";
## Allow the MCP to signal us (request to exit)
local $SIG{USR1} = sub {
## Do not change this message: looked for in the controller DIE sub
die "MCP request\n";
};
## From this point forward, we want to die gracefully
local $SIG{__DIE__} = sub {
## Arguments: one
## 1. Error message
## Returns: never (exit 0)
my ($diemsg) = @_;
## Store the line that did the actual exception
my $line = (caller)[2];
## Don't issue a warning if this was simply a MCP request
my $warn = $diemsg =~ /MCP request/ ? '' : 'Warning! ';
$self->glog(qq{${warn}Controller for "$syncname" was killed at line $line: $diemsg}, LOG_WARN);
## We send an email if it's enabled
if ($self->{sendmail} or $self->{sendmail_file}) {
## Never email passwords
my $oldpass = $self->{dbpass};
$self->{dbpass} = '???';
## Create a text version of our $self to email out
my $dump = Dumper $self;
my $body = qq{
Controller $$ has been killed at line $line
Host: $hostname
Sync name: $syncname
Relgroup: $sync->{herd}
Databases: $sync->{dbs}
Error: $diemsg
Parent process: $self->{mcppid}
Stats page: $config{stats_script_url}?sync=$syncname
Version: $VERSION
};
## Whitespace cleanup
$body =~ s/^\s+//gsm;
## Give some hints in the subject lines for known types of errors
my $moresub = '';
if ($diemsg =~ /Found stopfile/) {
$moresub = ' (stopfile)';
}
elsif ($diemsg =~ /could not serialize access/) {
$moresub = ' (serialization)';
}
elsif ($diemsg =~ /deadlock/) {
$moresub = ' (deadlock)';
}
elsif ($diemsg =~ /could not connect/) {
$moresub = ' (no connection)';
}
## Send the mail, but not for a normal shutdown
if ($moresub !~ /stopfile/) {
my $subject = qq{Bucardo "$syncname" controller killed on $shorthost$moresub};
$self->send_mail({ body => "$body\n", subject => $subject });
}
## Restore the password for the final cleanup connection
$self->{dbpass} = $oldpass;
} ## end sending email
## Cleanup the controller by killing kids, cleaning database tables and removing the PID file.
$self->cleanup_controller(0, $diemsg);
exit 0;
}; ## end SIG{__DIE__} handler sub
## Connect to the master database
($self->{master_backend}, $self->{masterdbh}) = $self->connect_database();
my $maindbh = $self->{masterdbh};
$self->glog("Bucardo database backend PID: $self->{master_backend}", LOG_VERBOSE);
## Map the PIDs to common names for better log output
$self->{pidmap}{$$} = 'CTL';
$self->{pidmap}{$self->{master_backend}} = 'Bucardo DB';
## Listen for kick requests from the MCP for this sync
my $kicklisten = "kick_$syncname";
$self->db_listen($maindbh, "ctl_$kicklisten");
## Listen for a controller ping request
my $pinglisten = "${$}_ping";
$self->db_listen($maindbh, "ctl_$pinglisten");
## Commit so we start listening right away
$maindbh->commit();
## SQL to update the syncrun table's status only
## This is currently unused, but no harm in leaving it in place.
## It would be nice to syncrun the before_sync and after_sync
## custom codes. If we reintroduce the multi-kid 'gang' concept,
## that changes things radically as well.
$SQL = q{
UPDATE bucardo.syncrun
SET status=?
WHERE sync=?
AND ended IS NULL
};
$sth{ctl_syncrun_update_status} = $maindbh->prepare($SQL);
## SQL to update the syncrun table on startup
## Returns the insert (start) time
$SQL = q{
UPDATE bucardo.syncrun
SET ended=now(), status=?
WHERE sync=?
AND ended IS NULL
RETURNING started
};
$sth{ctl_syncrun_end_now} = $maindbh->prepare($SQL);
## At this point, this controller must be authoritative for its sync
## Thus, we want to stop/kill any other CTL or KID processes that exist for this sync
## The first step is to send a friendly notice asking them to leave gracefully
my $stopsync = "stopsync_$syncname";
## This will commit after the notify:
$self->db_notify($maindbh, "kid_$stopsync");
## We also want to force other controllers of this sync to leave
$self->db_notify($maindbh, "ctl_$stopsync");
## Now we can listen for it ourselves in case the MCP requests it
$self->db_listen($maindbh, "ctl_$stopsync");
## Now we look for any PID files for this sync and send them a HUP
$count = $self->send_signal_to_PID( {sync => $syncname} );
## Next, we want to interrupt any long-running queries a kid may be in the middle of
## If they are, they will not receive the message above until done, but we can't wait
## If we stopped anyone, sleep a bit to allow them to exit and remove their PID files
$self->terminate_old_goats($syncname) and sleep 1;
## Clear out any old entries in the syncrun table
$sth = $sth{ctl_syncrun_end_now};
$count = $sth->execute("Old entry ended (CTL $$)", $syncname);
if (1 == $count) {
$info = $sth->fetchall_arrayref()->[0][0];
$self->glog("Ended old syncrun entry, start time was $info", LOG_NORMAL);
}
else {
$sth->finish();
}
## Count the number of gangs in use by this sync
my %gang;
for my $dbname (sort keys %{ $sync->{db} }) {
my $d = $sync->{db}{$dbname};
## Makes no sense to specify gangs for source databases!
next if $d->{role} eq 'source';
$gang{$d->{gang}}++;
}
$sync->{numgangs} = keys %gang;
## Listen for a kid letting us know the sync has finished
my $syncdone = "syncdone_$syncname";
$self->db_listen($maindbh, "ctl_$syncdone");
## Determine the last time this sync fired, if we are using "checksecs"
if ($sync->{checksecs}) {
## The handy syncrun table tells us the time of the last good run
$SQL = q{
SELECT CEIL(EXTRACT(epoch FROM ended))
FROM bucardo.syncrun
WHERE sync=?
AND lastgood IS TRUE
OR lastempty IS TRUE
};
$sth = $maindbh->prepare($SQL);
$count = $sth->execute($syncname);
## Got a match? Use that
if (1 == $count) {
$sync->{lastheardfrom} = $sth->fetchall_arrayref()->[0][0];
}
else {
## We default to "now" if we cannot find an earlier time
$sth->finish();
$sync->{lastheardfrom} = time();
}
$maindbh->commit();
}
## If running an after_sync customcode, we need a timestamp
if (exists $sync->{code_after_sync}) {
$SQL = 'SELECT now()';
$sync->{starttime} = $maindbh->selectall_arrayref($SQL)->[0][0];
## Rolling back as all we did was the SELECT
$maindbh->rollback();
}
## Reconnect to all databases we care about: overwrites existing dbhs
for my $dbname (sort keys %{ $sync->{db} }) {
my $d = $sync->{db}{$dbname};
if ($d->{dbtype} =~ /flat/o) {
$self->glog(qq{Not connecting to flatfile database "$dbname"}, LOG_NORMAL);
next;
}
## Do not need non-Postgres handles for the controller
next if $d->{dbtype} ne 'postgres';
## Establish a new database handle
($d->{backend}, $d->{dbh}) = $self->connect_database($dbname);
$self->glog(qq{Database "$dbname" backend PID: $d->{backend}}, LOG_NORMAL);
$self->{pidmap}{$d->{backend}} = "DB $dbname";
}
## Adjust the target table names as needed and store in the goat hash
## New table name regardless of syncs or databases
$SQL = 'SELECT newname FROM bucardo.customname WHERE goat=? AND db IS NULL and sync IS NULL';
my $sth_custom1 = $maindbh->prepare($SQL);
## New table name for this sync only
$SQL = 'SELECT newname FROM bucardo.customname WHERE goat=? AND sync=? AND db IS NULL';
my $sth_custom2 = $maindbh->prepare($SQL);
## New table name for a specific database only
$SQL = 'SELECT newname FROM bucardo.customname WHERE goat=? AND db=? AND sync IS NULL';
my $sth_custom3 = $maindbh->prepare($SQL);
## New table name for this sync and a specific database
$SQL = 'SELECT newname FROM bucardo.customname WHERE goat=? AND sync=? AND db=?';
my $sth_custom4 = $maindbh->prepare($SQL);
## Adjust the target table columns as needed and store in the goat hash
## New table cols regardless of syncs or databases
$SQL = 'SELECT clause FROM bucardo.customcols WHERE goat=? AND db IS NULL and sync IS NULL';
my $sth_customc1 = $maindbh->prepare($SQL);
## New table cols for this sync only
$SQL = 'SELECT clause FROM bucardo.customcols WHERE goat=? AND sync=? AND db IS NULL';
my $sth_customc2 = $maindbh->prepare($SQL);
## New table cols for a specific database only
$SQL = 'SELECT clause FROM bucardo.customcols WHERE goat=? AND db=? AND sync IS NULL';
my $sth_customc3 = $maindbh->prepare($SQL);
## New table cols for this sync and a specific database
$SQL = 'SELECT clause FROM bucardo.customcols WHERE goat=? AND sync=? AND db=?';
my $sth_customc4 = $maindbh->prepare($SQL);
for my $g (@{ $sync->{goatlist} }) {
## We only transform tables for now
next if $g->{reltype} ne 'table';
my ($S,$T) = ($g->{safeschema},$g->{safetable});
## See if we have any custom names or columns. Each level overrides the last
my $customname = '';
my $customcols = '';
## Just this goat
$count = $sth_custom1->execute($g->{id});
if ($count < 1) {
$sth_custom1->finish();
}
else {
$customname = $sth_custom1->fetchall_arrayref()->[0][0];
}
$count = $sth_customc1->execute($g->{id});
if ($count < 1) {
$sth_customc1->finish();
}
else {
$customcols = $sth_customc1->fetchall_arrayref()->[0][0];
}
## Just this goat and this sync
$count = $sth_custom2->execute($g->{id}, $syncname);
if ($count < 1) {
$sth_custom2->finish();
}
else {
$customname = $sth_custom2->fetchall_arrayref()->[0][0];
}
$count = $sth_customc2->execute($g->{id}, $syncname);
if ($count < 1) {
$sth_customc2->finish();
}
else {
$customcols = $sth_customc2->fetchall_arrayref()->[0][0];
}
## Need to pick one source at random to extract the list of columns from
my $saved_sourcedbh = '';
## Set for each target db
$g->{newname}{$syncname} = {};
$g->{newcols}{$syncname} = {};
for my $dbname (sort keys %{ $sync->{db} }) {
my $d = $sync->{db}{$dbname};
my $type= $d->{dbtype};
my $cname;
my $ccols = '';
## We only ever change table names (or cols) for true targets
if ($d->{role} ne 'source') {
## Save local copies for this database only
$cname = $customname;
$ccols = $customcols;
## Anything for this goat and this database?
$count = $sth_custom3->execute($g->{id}, $dbname);
if ($count < 1) {
$sth_custom3->finish();
}
else {
$cname = $sth_custom3->fetchall_arrayref()->[0][0];
}
$count = $sth_customc3->execute($g->{id}, $dbname);
if ($count < 1) {
$sth_customc3->finish();
}
else {
$ccols = $sth_customc3->fetchall_arrayref()->[0][0];
}
## Anything for this goat, this sync, and this database?
$count = $sth_custom4->execute($g->{id}, $syncname, $dbname);
if ($count < 1) {
$sth_custom4->finish();
}
else {
$cname = $sth_custom4->fetchall_arrayref()->[0][0];
}
$count = $sth_customc4->execute($g->{id}, $syncname, $dbname);
if ($count < 1) {
$sth_customc4->finish();
}
else {
$ccols = $sth_customc4->fetchall_arrayref()->[0][0];
}
}
## Got a new name match? Just use that for everything
if (defined $cname and $cname) {
$g->{newname}{$syncname}{$dbname} = $cname;
}
## Only a few use schemas:
elsif ($d->{dbtype} eq 'postgres'
or $d->{dbtype} eq 'flatpg') {
$g->{newname}{$syncname}{$dbname} = "$S.$T";
}
## Some always get the raw table name
elsif ($d->{dbtype} eq 'redis') {
$g->{newname}{$syncname}{$dbname} = $g->{tablename};
}
else {
$g->{newname}{$syncname}{$dbname} = $T;
}
## Set the columns for this combo: empty for no change
$g->{newcols}{$syncname}{$dbname} = $ccols;
## If we do not have a source database handle yet, grab one
if (! $saved_sourcedbh) {
for my $dbname (sort keys %{ $sync->{db} }) {
next if $sync->{db}{$dbname}{role} ne 'source';
## All we need is the handle, nothing more
$saved_sourcedbh = $sync->{db}{$dbname}{dbh};
## Leave this loop, we got what we came for
last;
}
}
## We either get the specific columns, or use a '*' if no customcols
my $SELECT = $ccols || 'SELECT *';
## Run a dummy query against the source to pull back the column names
## This is particularly important for customcols of course!
$sth = $saved_sourcedbh->prepare("SELECT * FROM ($SELECT FROM $S.$T LIMIT 0) AS foo LIMIT 0");
$sth->execute();
## Store the arrayref of column names for this goat and this select clause
$g->{tcolumns}{$SELECT} = $sth->{NAME};
$sth->finish();
$saved_sourcedbh->rollback();
## Make sure none of them are un-named, which Postgres outputs as ?column?
if (grep { /\?/ } @{ $g->{tcolumns}{$SELECT} }) {
die "Invalid customcols given: must give an alias to all columns! ($g->{tcolumns}{$SELECT}) for $SELECT\n";
}
}
}
## Set to true if we determine the kid(s) should make a run
## Can be set by:
## kick notice from the MCP for this sync
## 'checksecs' timeout
## if we are just starting up (now)
my $kick_request = 1;
## How long it has been since we checked on our kids
my $kidchecktime = 0;
## For custom code:
our $input = {}; ## XXX still needed?
## We are finally ready to enter the main loop
CONTROLLER: {
## Bail if the stopfile exists
if (-e $self->{stop_file}) {
$self->glog(qq{Found stopfile "$self->{stop_file}": exiting}, LOG_TERSE);
## Do not change this message: looked for in the controller DIE sub
my $stopmsg = 'Found stopfile';
## Grab the reason, if it exists, so we can propagate it onward
my $ctlreason = get_reason(0);
if ($ctlreason) {
$stopmsg .= ": $ctlreason";
}
## This exception is caught by the controller's __DIE__ sub above
die "$stopmsg\n";
}
## Process any notifications from the main database
## Ignore things we may have sent ourselves
my $nlist = $self->db_get_notices($maindbh, $self->{master_backend});
NOTICE: for my $name (sort keys %{ $nlist }) {
my $npid = $nlist->{$name}{firstpid};
## Strip prefix so we can easily use both pre and post 9.0 versions
$name =~ s/^ctl_//o;
## Kick request from the MCP?
if ($name eq $kicklisten) {
$kick_request = 1;
next NOTICE;
}
## Request for a ping via listen/notify
if ($name eq $pinglisten) {
$self->glog('Got a ping, issuing pong', LOG_DEBUG);
$self->db_notify($maindbh, "ctl_${$}_pong");
next NOTICE;
}
## Another controller has asked us to leave as we are no longer The Man
if ($name eq $stopsync) {
$self->glog('Got a stop sync request, so exiting', LOG_TERSE);
die 'Stop sync request';
}
## A kid has just finished syncing
if ($name eq $syncdone) {
$self->{syncdone} = time;
$self->glog("Kid $npid has reported that sync $syncname is done", LOG_DEBUG);
## If this was a onetimecopy sync, flip the bit (which should be done in the db already)
if ($sync->{onetimecopy}) {
$sync->{onetimecopy} = 0;
}
next NOTICE;
}
## Someone else's sync is getting kicked, finishing up, or stopping
next NOTICE if
(index($name, 'kick_') == 0)
or
(index($name, 'syncdone_') == 0)
or
(index($name, 'stopsync_') == 0);
## Ignore any messages sent to a kid
next NOTICE if 0 == index($name, 'kid_');
## Should not happen, but let's at least log it
$self->glog("Warning: received unknown message $name from $npid!", LOG_TERSE);
} ## end of each notification
## To ensure we can receive new notifications next time:
$maindbh->commit();
if ($self->{syncdone}) {
## Reset the notice
$self->{syncdone} = 0;
## Run all after_sync custom codes
if (exists $sync->{code_after_sync}) {
for my $code (@{$sync->{code_after_sync}}) {
#$sth{ctl_syncrun_update_status}->execute("Code after_sync (CTL $$)", $syncname);
$maindbh->commit();
my $result = $self->run_ctl_custom_code($sync,$input,$code, 'nostrict');
$self->glog("End of after_sync $code->{id}", LOG_VERBOSE);
} ## end each custom code
}
## Let anyone listening know that this sync is complete. Global message
my $notifymsg = "syncdone_$syncname";
$self->db_notify($maindbh, $notifymsg);
## If we are not a stayalive, this is a good time to leave
if (! $sync->{stayalive} and ! $kidsalive) {
$self->cleanup_controller(1, 'Kids are done');
exit 0;
}
## XXX: re-examine
# If we ran an after_sync and grabbed rows, reset the time
# if (exists $rows_for_custom_code->{source}) {
# $SQL = "SELECT $self->{mcp_clock_timestamp}";
# $sync->{starttime} = $maindbh->selectall_arrayref($SQL)->[0][0];
# }
} ## end if sync done
## If we are using checksecs, possibly force a kick
if ($sync->{checksecs}) {
## Already being kicked? Reset the clock
if ($kick_request) {
$sync->{lastheardfrom} = time();
}
elsif (time() - $sync->{lastheardfrom} >= $sync->{checksecs}) {
if ($sync->{onetimecopy}) {
$self->glog(qq{Timed out, but in onetimecopy mode, so not kicking, for "$syncname"}, LOG_DEBUG);
}
else {
$self->glog(qq{Timed out - force a sync for "$syncname"}, LOG_VERBOSE);
$kick_request = 1;
}
## Reset the clock
$sync->{lastheardfrom} = time();
}
}
## XXX What about non stayalive kids?
## XXX This is called too soon - recently created kids are not there yet!
## Check that our kids are alive and healthy
## XXX Skip if we know the kids are busy? (cannot ping/pong!)
## XXX Maybe skip this entirely and just check on a kick?
if ($sync->{stayalive} ## CTL must be persistent
and $kidsalive ## KID must be persistent
and $self->{kidpid} ## KID must have been created at least once
and time() - $kidchecktime >= $config{ctl_checkonkids_time}) {
my $pidfile = "$config{piddir}/bucardo.kid.sync.$syncname.pid";
## If we find a problem, set this to true
my $resurrect = 0;
## Make sure the PID file exists
if (! -e $pidfile) {
$self->glog("PID file missing: $pidfile", LOG_DEBUG);
$resurrect = 1;
}
else {
## Make sure that a kill 0 sees it
## XXX Use ping/pong?
my $pid = $self->{kidpid};
$count = kill 0 => $pid;
if ($count != 1) {
$self->glog("Warning: Kid $pid is not responding, will respawn", LOG_TERSE);
$resurrect = 2;
}
}
## At this point, the PID file does not exist or the kid is not responding
if ($resurrect) {
## XXX Try harder to kill it?
## First clear out any old entries in the syncrun table
$sth = $sth{ctl_syncrun_end_now};
$count = $sth->execute("Old entry died (CTL $$)", $syncname);
if (1 == $count) {
$info = $sth->fetchall_arrayref()->[0][0];
$self->glog("Old syncrun entry removed during resurrection, start time was $info", LOG_NORMAL);
}
else {
$sth->finish();
}
$self->glog("Resurrecting kid $syncname, resurrect was $resurrect", LOG_DEBUG);
$self->{kidpid} = $self->create_newkid($sync);
## Sleep a little here to prevent runaway kid creation
sleep $config{kid_restart_sleep};
}
## Reset the time
$kidchecktime = time();
} ## end of time to check on our kid's health
## Redo if we are not kicking but are stayalive and the queue is clear
if (! $kick_request and $sync->{stayalive}) {
sleep $config{ctl_sleep};
redo CONTROLLER;
}
## Reset the kick_request for the next run
$kick_request = 0;
## At this point, we know we are about to run a sync
## We will either create the kid(s), or signal the existing one(s)
## XXX If a custom code handler needs a database handle, create one
our ($cc_sourcedbh,$safe_sourcedbh);
## Run all before_sync code
## XXX Move to kid? Do not want to run over and over if something is queued
if (exists $sync->{code_before_sync}) {
#$sth{ctl_syncrun_update_status}->execute("Code before_sync (CTL $$)", $syncname);
$maindbh->commit();
for my $code (@{$sync->{code_before_sync}}) {
my $result = $self->run_ctl_custom_code($sync,$input,$code, 'nostrict');
if ($result eq 'redo') {
redo CONTROLLER;
}
}
}
$maindbh->commit();
if ($self->{kidpid}) {
## Tell any listening kids to go ahead and start
$self->db_notify($maindbh, "kid_run_$syncname");
}
else {
## Create any kids that do not exist yet (or have been killed, as detected above)
$self->glog("Creating a new kid for sync $syncname", LOG_VERBOSE);
$self->{kidpid} = $self->create_newkid($sync);
}
sleep $config{ctl_sleep};
redo CONTROLLER;
} ## end CONTROLLER
die 'How did we reach outside of the main controller loop?';
} ## end of start_controller
sub start_kid {
## A single kid, in charge of doing a sync between two or more databases
## aka the KID process
## Arguments: one
## 1. Hashref of sync information
## Returns: never (exits)
my ($self,$sync) = @_;
## Prefix all log lines with this TLA
$self->{logprefix} = 'KID';
## Extract some of the more common items into local vars
my ($syncname, $goatlist, $kidsalive, $dbs, $kicked) = @$sync{qw(
name goatlist kidsalive dbs kick_on_startup)};
## Adjust the process name, start logging
$0 = qq{Bucardo Kid.$self->{extraname} Sync "$syncname"};
my $extra = $sync->{onetimecopy} ? "OTC: $sync->{onetimecopy}" : '';
if ($config{log_showsyncname}) {
$self->{logprefix} .= " ($syncname)";
}
$self->glog(qq{New kid, sync "$syncname" alive=$kidsalive Parent=$self->{ctlpid} PID=$$ kicked=$kicked $extra}, LOG_TERSE);
## Store our PID into a file
## Save the complete returned name for later cleanup
$self->{kidpidfile} = $self->store_pid( "bucardo.kid.sync.$syncname.pid" );
## Establish these early so the DIE block can use them
my ($S,$T,$pkval) = ('?','?','?');
## Keep track of how many times this kid has done work
my $kidloop = 0;
## Catch USR1 errors as a signal from the parent CTL process to exit right away
local $SIG{USR1} = sub {
## Mostly so we do not send an email:
$self->{clean_exit} = 1;
die "CTL request\n";
};
## Set up some common groupings of the databases inside sync->{db}
## Also setup common attributes
my (@dbs, @dbs_source, @dbs_target, @dbs_delta, @dbs_fullcopy,
@dbs_connectable, @dbs_dbi, @dbs_write, @dbs_non_fullcopy,
@dbs_postgres, @dbs_drizzle, @dbs_firebird, @dbs_mongo, @dbs_mysql, @dbs_oracle,
@dbs_redis, @dbs_sqlite);
## Used to weed out all but one source if in onetimecopy mode
my $found_first_source = 0;
for my $dbname (sort keys %{ $sync->{db} }) {
my $d = $sync->{db}{$dbname};
## All databases start with triggers enabled
$d->{triggers_enabled} = 1;
## First, do some exclusions
## If this is a onetimecopy sync, the fullcopy targets are dead to us
next if $sync->{onetimecopy} and $d->{role} eq 'fullcopy';
## If this is a onetimecopy sync, we only need to connect to a single source
if ($sync->{onetimecopy} and $d->{role} eq 'source') {
next if $found_first_source;
$found_first_source = 1;
}
## If this is inactive, we've already checked that if it is a source in validate_sync
## Thus, if we made it this far, it is a target and should be skipped
if ($d->{status} eq 'inactive') {
$self->glog(qq{Skipping inactive database "$dbname" entirely}, LOG_NORMAL);
## Don't just skip it: nuke it from orbit! It's the only way to be sure.
delete $sync->{db}{$dbname};
next;
}
## Now set the default attributes
## Is this a SQL database?
$d->{does_sql} = 0;
## Do we have a DBI-based driver?
$d->{does_dbi} = 0;
## Can it do truncate?
$d->{does_truncate} = 0;
## Does it support asynchronous queries well?
$d->{does_async} = 0;
## Does it have good support for ANY()?
$d->{does_ANY_clause} = 0;
## Can it do savepoints (and roll them back)?
$d->{does_savepoints} = 0;
## Does it support truncate cascade?
$d->{does_cascade} = 0;
## Does it support a LIMIT clause?
$d->{does_limit} = 0;
## Can it be queried?
$d->{does_append_only} = 0;
## List of tables in this database that need makedelta inserts
$d->{does_makedelta} = {};
## Does it have that annoying timestamp +dd bug?
$d->{has_mysql_timestamp_issue} = 0;
## Start clumping into groups and adjust the attributes
## Postgres
if ('postgres' eq $d->{dbtype}) {
push @dbs_postgres => $dbname;
$d->{does_sql} = 1;
$d->{does_truncate} = 1;
$d->{does_savepoints} = 1;
$d->{does_cascade} = 1;
$d->{does_limit} = 1;
$d->{does_async} = 1;
$d->{does_ANY_clause} = 1;
}
## Drizzle
if ('drizzle' eq $d->{dbtype}) {
push @dbs_drizzle => $dbname;
$d->{does_sql} = 1;
$d->{does_truncate} = 1;
$d->{does_savepoints} = 1;
$d->{does_limit} = 1;
$d->{has_mysql_timestamp_issue} = 1;
}
## MongoDB
if ('mongo' eq $d->{dbtype}) {
push @dbs_mongo => $dbname;
}
## MySQL (and MariaDB)
if ('mysql' eq $d->{dbtype} or 'mariadb' eq $d->{dbtype}) {
push @dbs_mysql => $dbname;
$d->{does_sql} = 1;
$d->{does_truncate} = 1;
$d->{does_savepoints} = 1;
$d->{does_limit} = 1;
$d->{has_mysql_timestamp_issue} = 1;
}
## Firebird
if ('firebird' eq $d->{dbtype}) {
push @dbs_firebird => $dbname;
$d->{does_sql} = 1;
$d->{does_truncate} = 1;
$d->{does_savepoints} = 1;
$d->{does_limit} = 1;
$d->{has_mysql_timestamp_issue} = 1;
}
## Oracle
if ('oracle' eq $d->{dbtype}) {
push @dbs_oracle => $dbname;
$d->{does_sql} = 1;
$d->{does_truncate} = 1;
$d->{does_savepoints} = 1;
}
## Redis
if ('redis' eq $d->{dbtype}) {
push @dbs_redis => $dbname;
}
## SQLite
if ('sqlite' eq $d->{dbtype}) {
push @dbs_sqlite => $dbname;
$d->{does_sql} = 1;
$d->{does_truncate} = 1;
$d->{does_savepoints} = 1;
$d->{does_limit} = 1;
}
## Flat files
if ($d->{dbtype} =~ /flat/) {
$d->{does_append_only} = 1;
}
## Everyone goes into this bucket
push @dbs => $dbname;
## Databases we read data from
push @dbs_source => $dbname
if $d->{role} eq 'source';
## Target databases
push @dbs_target => $dbname
if $d->{role} ne 'source';
## Databases that (potentially) get written to
## This is all of them, unless we are a source
## and a fullcopy sync or in onetimecopy mode
push @dbs_write => $dbname
if (!$sync->{fullcopy} and !$sync->{onetimecopy})
or $d->{role} ne 'source';
## Databases that get deltas
## If in onetimecopy mode, this is always forced to be empty
## Likewise, no point in populating if this is a fullcopy sync
push @dbs_delta => $dbname
if $d->{role} eq 'source'
and ! $sync->{onetimecopy}
and ! $sync->{fullcopy};
## Databases that get the full monty
## In normal mode, this means a role of 'fullcopy'
## In onetimecopy mode, this means a role of 'target'
push @dbs_fullcopy => $dbname
if ($sync->{onetimecopy} and $d->{role} eq 'target')
or ($sync->{fullcopy} and $d->{role} eq 'fullcopy');
## Non-fullcopy databases. Basically dbs_source + dbs_target
push @dbs_non_fullcopy => $dbname
if $d->{role} ne 'fullcopy';
## Databases with Perl DBI support
if ($d->{dbtype} eq 'postgres'
or $d->{dbtype} eq 'drizzle'
or $d->{dbtype} eq 'firebird'
or $d->{dbtype} eq 'mariadb'
or $d->{dbtype} eq 'mysql'
or $d->{dbtype} eq 'oracle'
or $d->{dbtype} eq 'sqlite') {
push @dbs_dbi => $dbname;
$d->{does_dbi} = 1;
}
## Things we can connect to. Almost everything
push @dbs_connectable => $dbname
if $d->{dbtype} !~ /flat/;
}
## Connect to the main database
($self->{master_backend}, $self->{masterdbh}) = $self->connect_database();
## Set a shortcut for this handle, and log the details
my $maindbh = $self->{masterdbh};
$self->glog("Bucardo database backend PID: $self->{master_backend}", LOG_VERBOSE);
## Setup mapping so we can report in the log which things came from this backend
$self->{pidmap}{$self->{master_backend}} = 'Bucardo DB';
## SQL to enter a new database in the dbrun table
$SQL = q{
INSERT INTO bucardo.dbrun(sync,dbname,pgpid)
VALUES (?,?,?)
};
$sth{dbrun_insert} = $maindbh->prepare($SQL);
## SQL to remove a database from the dbrun table
$SQL{dbrun_delete} = q{
DELETE FROM bucardo.dbrun
WHERE sync = ? AND dbname = ?
};
$sth{dbrun_delete} = $maindbh->prepare($SQL{dbrun_delete});
## Disable the CTL exception handler.
## Fancy exception handler to clean things up before leaving.
my $err_handler = sub {
## Arguments: one
## 1. Error message
## Returns: never (exit 1)
## Trim whitespace from our message
my ($msg) = @_;
$msg =~ s/\s+$//g;
## Where did we die?
my $line = (caller)[2];
$msg .= "\nLine: $line";
## Subject line tweaking later on
my $moresub = '';
## Find any error messages/states for all databases
if ($msg =~ /DBD::Pg/) {
$msg .= "\nMain DB state: " . ($maindbh->state || '?');
$msg .= ' Error: ' . ($maindbh->err || 'none');
for my $dbname (@dbs_dbi) {
my $d = $sync->{db}{$dbname};
my $dbh = $d->{dbh};
my $state = $dbh->state || '?';
$msg .= "\nDB $dbname state: $state";
$msg .= ' Error: ' . ($dbh->err || 'none');
## If this was a deadlock problem, try and gather more information
if ($state eq '40P01' and $d->{dbtype} eq 'postgres') {
$msg .= $self->get_deadlock_details($dbh, $msg);
$moresub = ' (deadlock)';
last;
}
}
}
$msg .= "\n";
(my $flatmsg = $msg) =~ s/\n/ /g;
$self->glog("Kid has died, error is: $flatmsg", LOG_TERSE);
## Drop connection to the main database, then reconnect
if (defined $maindbh and $maindbh) {
$maindbh->rollback;
$_->finish for values %{ $maindbh->{CachedKids} };
$maindbh->disconnect;
}
my ($finalbackend, $finaldbh) = $self->connect_database();
$self->glog("Final database backend PID: $finalbackend", LOG_VERBOSE);
$sth{dbrun_delete} = $finaldbh->prepare($SQL{dbrun_delete});
$self->db_notify($finaldbh, 'kid_pid_stop', 1);
## Drop all open database connections, clear out the dbrun table
for my $dbname (@dbs_dbi) {
my $d = $sync->{db}{$dbname};
my $dbh = $d->{dbh} or do {
$self->glog("Missing $dbname database handle", LOG_WARN);
next;
};
## Is this still around?
if (!$dbh->ping) {
$self->glog("Ping failed for database $dbname", LOG_TERSE);
## We want to give the MCP a hint that something is wrong
$self->db_notify($finaldbh, "dead_db_$dbname", 1);
## We'll assume no disconnect is necessary - but we'll undef it below just in case
}
else {
## Rollback, finish all statement handles, and disconnect
$dbh->rollback();
$self->glog("Disconnecting from database $dbname", LOG_DEBUG);
$_->finish for values %{ $dbh->{CachedKids} };
$dbh->disconnect();
}
## Make sure we don't think we are still in the middle of an async query
$d->{async_active} = 0;
## Make sure we never access this connection again
undef $dbh;
## Clear out the entry from the dbrun table
$sth = $sth{dbrun_delete};
$sth->execute($syncname, $dbname);
$finaldbh->commit();
}
## If using semaphore tables, mark the status as 'failed'
## At least in the Mongo case, it's pretty safe to do this,
## as it is unlikely the error came from Mongo Land
if ($config{semaphore_table}) {
my $tname = $config{semaphore_table};
for my $dbname (@dbs_connectable) {
my $d = $sync->{db}{$dbname};
if ($d->{dbtype} eq 'mongo') {
my $collection = $d->{dbh}->get_collection($tname);
my $object = {
'$.sync' => $syncname,
'$.status' => 'failed',
'$.endtime' => scalar gmtime,
};
$collection->update_one
(
{sync => $syncname},
$object,
{ upsert => 1, safe => 1 }
);
}
}
}
## Mark this syncrun as aborted if needed, replace the 'lastbad'
my $status = "Failed : $flatmsg (KID $$)";
$self->end_syncrun($finaldbh, 'bad', $syncname, $status);
$finaldbh->commit();
## Update the dbrun table as needed
$SQL = q{DELETE FROM bucardo.dbrun WHERE sync = ?};
$sth = $finaldbh->prepare($SQL);
$sth->execute($syncname);
## Let anyone listening know that this target sync aborted. Global message.
$self->db_notify($finaldbh, "synckill_${syncname}");
## Done with database cleanups, so disconnect
$finaldbh->disconnect();
## Send an email as needed (never for clean exit)
if (! $self->{clean_exit} and $self->{sendmail} or $self->{sendmail_file}) {
my $warn = $msg =~ /CTL.+request/ ? '' : 'Warning! ';
$self->glog(qq{${warn}Child for sync "$syncname" was killed at line $line: $msg}, LOG_WARN);
## Never display the database passwords
for (values %{$self->{dbs}}) {
$_->{dbpass} = '???';
}
$self->{dbpass} = '???';
## Create the body of the message to be mailed
my $dump = Dumper $self;
my $body = qq{
Kid $$ has been killed at line $line
Error: $msg
Possible suspects: $S.$T: $pkval
Host: $hostname
Sync name: $syncname
Stats page: $config{stats_script_url}?sync=$syncname
Parent process: $self->{mcppid} -> $self->{ctlpid}
Rows set to aborted: $count
Version: $VERSION
Loops: $kidloop
};
$body =~ s/^\s+//gsm;
if ($msg =~ /Found stopfile/) {
$moresub = ' (stopfile)';
}
elsif ($msg =~ /could not connect/) {
$moresub = ' (no connection)';
}
my $subject = qq{Bucardo kid for "$syncname" killed on $shorthost$moresub};
$self->send_mail({ body => "$body\n", subject => $subject });
} ## end sending email
my $extrainfo = sprintf '%s%s%s',
qq{Sync "$syncname"},
$S eq '?' ? '' : " $S.$T",
$pkval eq '?' ? '' : " pk: $pkval";
$self->cleanup_kid($flatmsg, $extrainfo);
exit 1;
}; ## end $err_handler
my $stop_sync_request = "stopsync_$syncname";
## Tracks how long it has been since we last ran a ping against our databases
my $lastpingcheck = 0;
## Row counts from the delta tables:
my %deltacount;
## Count of changes made (inserts,deletes,truncates,conflicts handled):
my %dmlcount;
my $did_setup = 0;
local $@;
eval {
## Listen for the controller asking us to go again if persistent
if ($kidsalive) {
$self->db_listen( $maindbh, "kid_run_$syncname" );
}
## Listen for a kid ping, even if not persistent
my $kidping = "${$}_ping";
$self->db_listen( $maindbh, "kid_$kidping" );
## Listen for a sync-wide exit signal
$self->db_listen( $maindbh, "kid_$stop_sync_request" );
## Prepare all of our SQL
## Note that none of this is actually 'prepared' until the first execute
## SQL to add a new row to the syncrun table
$SQL = 'INSERT INTO bucardo.syncrun(sync,status) VALUES (?,?)';
$sth{kid_syncrun_insert} = $maindbh->prepare($SQL);
## SQL to update the syncrun table's status only
$SQL = q{
UPDATE bucardo.syncrun
SET status=?
WHERE sync=?
AND ended IS NULL
};
$sth{kid_syncrun_update_status} = $maindbh->prepare($SQL);
## SQL to set the syncrun table as ended once complete
$SQL = q{
UPDATE bucardo.syncrun
SET deletes=deletes+?, inserts=inserts+?, truncates=truncates+?,
conflicts=?, details=?, status=?
WHERE sync=?
AND ended IS NULL
};
$sth{kid_syncrun_end} = $maindbh->prepare($SQL);
## Connect to all (connectable) databases we are responsible for
## This main list has already been pruned by the controller as needed
for my $dbname (@dbs_connectable) {
my $d = $sync->{db}{$dbname};
($d->{backend}, $d->{dbh}) = $self->connect_database($dbname);
$self->glog(qq{Database "$dbname" backend PID: $d->{backend}}, LOG_VERBOSE);
## Register ourself with the MCP (if we are Postgres)
if ($d->{dbtype} eq 'postgres') {
$self->db_notify($maindbh, 'kid_pid_start', 1, $dbname);
}
}
## Set the maximum length of the $dbname.$S.$T string.
## Used for logging output
$self->{maxdbname} = 1;
for my $dbname (keys %{ $sync->{db} }) {
$self->{maxdbname} = length $dbname if length $dbname > $self->{maxdbname};
}
my $maxst = 3;
for my $g (@$goatlist) {
next if $g->{reltype} ne 'table';
($S,$T) = ($g->{safeschema},$g->{safetable});
$maxst = length "$S.$T" if length ("$S.$T") > $maxst;
}
$self->{maxdbstname} = $self->{maxdbname} + 1 + $maxst;
## If we are using delta tables, prepare all relevant SQL
if (@dbs_delta) {
## Prepare the SQL specific to each table
for my $g (@$goatlist) {
## Only tables get all this fuss: sequences are easy
next if $g->{reltype} ne 'table';
## This is the main query: grab all unique changed primary keys since the last sync
$SQL{delta}{$g} = qq{
SELECT DISTINCT $g->{pklist}
FROM bucardo.$g->{deltatable} d
WHERE NOT EXISTS (
SELECT 1
FROM bucardo.$g->{tracktable} t
WHERE d.txntime = t.txntime
AND (t.target = DBGROUP::text)
)
};
## We also need secondary queries to catch the case of partial replications
## This is a per-target check
$SQL{deltatarget}{$g} = qq{
SELECT DISTINCT $g->{pklist}
FROM bucardo.$g->{deltatable} d
WHERE NOT EXISTS (
SELECT 1
FROM bucardo.$g->{tracktable} t
WHERE d.txntime = t.txntime
AND (t.target = TARGETNAME::text)
)
};
## Mark all unclaimed visible delta rows as done in the track table
$SQL{track}{$g} = qq{
INSERT INTO bucardo.$g->{tracktable} (txntime,target)
SELECT DISTINCT txntime, DBGROUP::text
FROM bucardo.$g->{deltatable} d
WHERE NOT EXISTS (
SELECT 1
FROM bucardo.$g->{tracktable} t
WHERE d.txntime = t.txntime
AND (t.target = DBGROUP::text)
);
};
## The same thing, but to the staging table instead, as we have to
## wait for all targets to succesfully commit in multi-source situations
($SQL{stage}{$g} = $SQL{track}{$g}) =~ s/$g->{tracktable}/$g->{stagetable}/;
} ## end each table
## For each source database, prepare the queries above
for my $dbname (@dbs_source) {
my $d = $sync->{db}{$dbname};
## Set the DBGROUP for each database: the bucardo.track_* target entry
$d->{DBGROUPNAME} = "dbgroup $dbs";
for my $g (@$goatlist) {
next if $g->{reltype} ne 'table';
($S,$T) = ($g->{safeschema},$g->{safetable});
## Replace with the target name for source delta querying
($SQL = $SQL{delta}{$g}) =~ s/DBGROUP/'$d->{DBGROUPNAME}'/o;
## As these can be expensive, make them asynchronous
$sth{getdelta}{$dbname}{$g} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC});
## We need to update either the track table or the stage table
## There is no way to know beforehand which we will need, so we prepare both
## Replace with the target name for source track updating
($SQL = $SQL{track}{$g}) =~ s/DBGROUP/'$d->{DBGROUPNAME}'/go;
## Again, async as they may be slow
$sth{track}{$dbname}{$g} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC});
## Same thing for stage
($SQL = $SQL{stage}{$g}) =~ s/DBGROUP/'$d->{DBGROUPNAME}'/go;
$sth{stage}{$dbname}{$g} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC});
} ## end each table
} ## end each source database
## Set all makedelta tables (target databases can have them too, as another sync may have them as a source)
for my $dbname (@dbs) {
my $d = $sync->{db}{$dbname};
for my $g (@$goatlist) {
next if $g->{reltype} ne 'table';
($S,$T) = ($g->{safeschema},$g->{safetable});
## Set the per database/per table makedelta setting now
if (1 == $d->{makedelta} or $g->{makedelta} eq 'on' or $g->{makedelta} =~ /\b$dbname\b/) {
$d->{does_makedelta}{"$S.$T"} = 1;
$self->glog("Set table $dbname.$S.$T to makedelta", LOG_NORMAL);
}
} ## end each table
} ## end all databases
} ## end if delta databases
## Create safe versions of the database handles if we are going to need them
if ($sync->{need_safe_dbh_strict} or $sync->{need_safe_dbh}) {
for my $dbname (@dbs_postgres) {
my $d = $sync->{db}{$dbname};
my $darg;
if ($sync->{need_safe_dbh_strict}) {
for my $arg (sort keys %{ $dbix{ $d->{role} }{strict} }) {
next if ! length $dbix{ $d->{role} }{strict}{$arg};
$darg->{$arg} = $dbix{ $d->{role} }{strict}{$arg};
}
$darg->{dbh} = $d->{dbh};
$self->{safe_dbh_strict}{$dbname} = DBIx::Safe->new($darg);
}
if ($sync->{need_safe_dbh}) {
undef $darg;
for my $arg (sort keys %{ $dbix{ $d->{role} }{notstrict} }) {
next if ! length $dbix{ $d->{role} }{notstrict}{$arg};
$darg->{$arg} = $dbix{ $d->{role} }{notstrict}{$arg};
}
$darg->{dbh} = $d->{dbh};
$self->{safe_dbh}{$dbname} = DBIx::Safe->new($darg);
}
}
} ## end DBIX::Safe creations
$did_setup = 1;
};
$err_handler->($@) if !$did_setup;
## Begin the main KID loop
my $didrun = 0;
my $runkid = sub {
KID: {
## Leave right away if we find a stopfile
if (-e $self->{stop_file}) {
$self->glog(qq{Found stopfile "$self->{stop_file}": exiting}, LOG_WARN);
last KID;
}
## Should we actually do something this round?
my $dorun = 0;
## If we were just created or kicked, go ahead and start a run.
if ($kicked) {
$dorun = 1;
$kicked = 0;
}
## If persistent, listen for messages and do an occasional ping of all databases
if ($kidsalive) {
my $nlist = $self->db_get_notices($maindbh);
for my $name (sort keys %{ $nlist }) {
my $npid = $nlist->{$name}{firstpid};
## Strip the prefix
$name =~ s/^kid_//o;
## The controller wants us to exit
if ( $name eq $stop_sync_request ) {
$self->glog('Got a stop sync request, so exiting', LOG_TERSE);
die 'Stop sync request';
}
## The controller has told us we are clear to go
elsif ($name eq "run_$syncname") {
$dorun = 1;
}
## Got a ping? Respond with a pong.
elsif ($name eq "${$}_ping") {
$self->glog('Got a ping, issuing pong', LOG_DEBUG);
$self->db_notify($maindbh, "kid_${$}_pong");
}
## Someone else's sync is running
elsif (index($name, 'run_') == 0) {
}
## Someone else's sync is stopping
elsif (index($name, 'stopsync_') == 0) {
}
## Someone else's kid is getting pinged
elsif (index($name, '_ping') > 0) {
}
## Should not happen, but let's at least log it
else {
$self->glog("Warning: received unknown message $name from $npid!", LOG_TERSE);
}
} ## end each notice
## Now that we've read in any notices, simply rollback
$maindbh->rollback();
## Periodically verify connections to all databases
if (time() - $lastpingcheck >= $config{kid_pingtime}) {
## If this fails, simply have the CTL restart it
## Other things match on the exception wording below, so change carefully
$maindbh->ping or die qq{Ping failed for main database\n};
for my $dbname (@dbs_dbi) {
my $d = $sync->{db}{$dbname};
$d->{dbh}->ping or die qq{Ping failed for database "$dbname"\n};
$d->{dbh}->rollback();
}
$lastpingcheck = time();
}
} ## end if kidsalive
## If we are not doing anything this round, sleep and start over
## We will only ever hit this on the second go around, as kids
## start as autokicked
if (! $dorun) {
sleep $config{kid_sleep};
redo KID;
}
## From this point on, we are a live kid that is expected to run the sync
## Used to report on total times for the long-running parts, e.g. COPY
my $kid_start_time = [gettimeofday];
## Create an entry in the syncrun table to let people know we've started
$self->glog('Adding entry to syncrun table', LOG_DEBUG);
$sth{kid_syncrun_insert}->execute($syncname, "Started (KID $$)");
## Increment our count of how many times we have been here before
$kidloop++;
## Reset the numbers to track total bucardo_delta matches
undef %deltacount;
$deltacount{all} = 0;
$deltacount{table} = {};
## Reset our counts of total inserts, deletes, truncates, and conflicts
undef %dmlcount;
$dmlcount{deletes} = 0;
$dmlcount{inserts} = 0;
$dmlcount{truncates} = 0;
$dmlcount{conflicts} = 0;
## Reset all of our truncate stuff
$self->{has_truncation} = 0;
delete $self->{truncateinfo};
## Reset some things at the per-database level
for my $dbname (keys %{ $sync->{db} }) {
my $d = $sync->{db}{$dbname};
## This must be set, as it is used by the conflict_strategy below
$deltacount{$dbname} = 0;
$dmlcount{allinserts}{$dbname} = 0;
$dmlcount{alldeletes}{$dbname} = 0;
delete $d->{truncatewinner};
}
## Reset things at the goat level
for my $g (@$goatlist) {
delete $g->{truncatewinner};
}
## Run all 'before_txn' code
if (exists $sync->{code_before_txn}) {
## Let external people know where we are
$sth{kid_syncrun_update_status}->execute("Code before_txn (KID $$)", $syncname);
$maindbh->commit();
for my $code (@{$sync->{code_before_txn}}) {
## Check if the code has asked us to skip other before_txn codes
last if 'last' eq $self->run_kid_custom_code($sync, $code);
}
}
## Populate the dbrun table so others know we are using these databases
$self->glog('Populating the dbrun table', LOG_DEBUG);
for my $dbname (@dbs_connectable) {
my $d = $sync->{db}{$dbname};
$sth{dbrun_insert}->execute($syncname, $dbname, $d->{backend});
}
## Add a note to the syncrun table
$self->glog('Adding note to the syncrun table', LOG_DEBUG);
$sth{kid_syncrun_update_status}->execute("Begin txn (KID $$)", $syncname);
## Figure out our isolation level. Only used for Postgres
## All others are hard-coded as 'serializable'
$self->{pg_isolation_level} = defined $sync->{isolation_level} ? $sync->{isolation_level} :
$config{isolation_level} || 'serializable';
## Commit so our dbrun and syncrun stuff is visible to others
## This should be done just before we start transactions on all dbs
$self->glog('Doing final maindbh commit', LOG_DEBUG);
$maindbh->commit();
## Start the main transaction and do things such as setting isolation levels
$self->start_main_transaction({ sync => $sync, databases => \@dbs_connectable});
## We may have a request to lock all the tables
$self->lock_all_tables({ sync => $sync, databases => \@dbs_write, tables => $goatlist});
## Do all the delta (non-fullcopy) targets
if (@dbs_delta) {
## We will never reach this while in onetimecopy mode as @dbs_delta is emptied
## Run all 'before_check_rows' code
if (exists $sync->{code_before_check_rows}) {
$sth{kid_syncrun_update_status}->execute("Code before_check_rows (KID $$)", $syncname);
$maindbh->commit();
for my $code (@{$sync->{code_before_check_rows}}) {
## Check if the code has asked us to skip other before_check_rows codes
last if 'last' eq $self->run_kid_custom_code($sync, $code);
}
}
## Check if any tables were truncated on all source databases
## If so, set $self->{has_truncation}; store results in $self->{truncateinfo}
## First level keys are schema then table name
## Third level is maxtime and maxdb, showing the "winner" for each table
$SQL = 'SELECT quote_ident(sname), quote_ident(tname), MAX(EXTRACT(epoch FROM cdate))'
. ' FROM bucardo.bucardo_truncate_trigger '
. ' WHERE sync = ? AND replicated IS NULL GROUP BY 1,2';
for my $dbname (@dbs_source) {
my $d = $sync->{db}{$dbname};
## Grab the latest truncation time for each table, for this source database
$self->glog(qq{Checking truncate_trigger table on database "$dbname"}, LOG_VERBOSE);
$sth = $d->{dbh}->prepare($SQL);
$self->{has_truncation} += $sth->execute($syncname);
for my $row (@{ $sth->fetchall_arrayref() }) {
my ($s,$t,$time) = @{ $row };
## Store if this is the new winner
if (! exists $self->{truncateinfo}{$s}{$t}{maxtime}
or $time > $self->{truncateinfo}{$s}{$t}{maxtime}) {
$self->{truncateinfo}{$s}{$t}{maxtime} = $time;
$self->{truncateinfo}{$s}{$t}{maxdb} = $dbname;
}
}
} ## end each source database, checking for truncations
## Now go through and mark the winner within the "x" hash, for easy skipping later on
if ($self->{has_truncation}) {
for my $s (keys %{ $self->{truncateinfo} }) {
for my $t (keys %{ $self->{truncateinfo}{$s} }) {
my $dbname = $self->{truncateinfo}{$s}{$t}{maxdb};
my $d = $sync->{db}{$dbname};
$d->{truncatewinner}{$s}{$t} = 1;
$self->glog("Truncate winner for $s.$t is database $dbname", LOG_DEBUG);
}
}
## Set the truncate count
my $number = @dbs_non_fullcopy; ## not the best estimate: corner cases
$dmlcount{truncate} = $number - 1;
## Now map this back to our goatlist
for my $g (@$goatlist) {
next if $g->{reltype} ne 'table';
($S,$T) = ($g->{safeschema},$g->{safetable});
if (exists $self->{truncateinfo}{$S}{$T}) {
$g->{truncatewinner} = $self->{truncateinfo}{$S}{$T}{maxdb};
}
}
}
## Next, handle all the sequences
for my $g (@$goatlist) {
next if $g->{reltype} ne 'sequence';
($S,$T) = ($g->{safeschema},$g->{safetable});
## Grab the sequence information from each database
## Figure out which source one is the highest
## Right now, this is the only sane option.
## In the future, we might consider coupling tables and sequences and
## then copying sequences based on the 'winning' underlying table
$SQL = "SELECT * FROM $S.$T";
my $maxvalue = -1;
for my $dbname (@dbs_non_fullcopy) {
my $d = $sync->{db}{$dbname};
next if $d->{dbtype} ne 'postgres';
$sth = $d->{dbh}->prepare($SQL);
$sth->execute();
my $info = $sth->fetchall_arrayref({})->[0];
$g->{sequenceinfo}{$dbname} = $info;
## Only the source databases matter for the max value comparison
next if $d->{role} ne 'source';
if ($info->{last_value} > $maxvalue) {
$maxvalue = $info->{last_value};
$g->{winning_db} = $dbname;
}
}
$self->glog("Se