Skip to content

Commit

Permalink
Revamp cmd_queue/archiver.
Browse files Browse the repository at this point in the history
  • Loading branch information
ashcmd committed Nov 12, 2014
1 parent 457a3ec commit 4fe46c1
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 385 deletions.
186 changes: 36 additions & 150 deletions bin/cmd_archiver
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import os
import re
import sys
import subprocess
from os import *
from sys import *
from cmd_worker import CMDWorker

argslist = (('-F', '--file', dict(dest="archivefilename",
Expand Down Expand Up @@ -83,8 +81,8 @@ class CMDArchiver(CMDWorker):
self.debuglog("init_env_func(): entered")
l_archivedir = self.l_archivedir
# bail out if archivedir exists and not empty or inaccessible.
if (os.access(l_archivedir, F_OK)):
if (not os.access(l_archivedir, R_OK | W_OK | X_OK)):
if (os.access(l_archivedir, os.F_OK)):
if (not os.access(l_archivedir, os.R_OK | os.W_OK | os.X_OK)):
self.log("init_env_func(): l_archivedir %s must allow have r/w/x bits set for the current user" % l_archivedir, "ERROR")
return False
elif (os.listdir(l_archivedir) != []):
Expand All @@ -106,14 +104,17 @@ class CMDArchiver(CMDWorker):
"""
Let's make sure that our directories and executables exist
"""
self.debuglog("check_config_func(): entered")
pathvars = [self.rsync, self.pgdata, self.l_archivedir]
self.cmd_queue = os.path.join(self.pitr_bin_path, "cmd_queue")
pathvars = [self.pgdata, self.l_archivedir, self.cmd_queue]
for element in pathvars:
try:
os.stat("%s" % (str(element)))
except OSError, e:
print "Config %s: %s" % (str(element), str(e))
return False
if not os.access(self.cmd_queue, os.X_OK):
self.log("The cmd_queue file must have executable flag set.")
return False
return True

def get_pgcontroldata_func(self):
Expand All @@ -137,153 +138,38 @@ class CMDArchiver(CMDWorker):
self.log("get_pgcontroldata_func(): %s" % e, "ERROR")
exit(1)

def list_queue_func(self):
"""
We only want to process archives for queues that have files, so we check
and only return a queue/slave that has files to be shipped.
"""
self.debuglog("list_queue_func(): entered")
# Empty host array
hosts = []
# Loop through the list of slaves
slaves = self.generate_slave_list_func()
for host in slaves:
queuedir = os.path.join(self.l_archivedir, host)
list_archives = os.listdir(queuedir)
# If an archive directory is not empty, then we're good.
if list_archives:
# add to list of hosts
hosts.append(host)
self.debuglog("list_queue_func(): host `%s' queue not empty" % host)
return hosts

def send_queue_func(self):
"""
We are called before normal archive process in order to send queue files
that have not been shipped yet. If we have to transfer and we error we
return the slave that failed.
"""
rtn = []
self.debuglog("send_queue_func(): entered")

for host in self.list_queue_func():
self.debuglog("NOTICE: Host = " + host)
queue_dir = os.path.join(self.l_archivedir, host)
self.debuglog("NOTICE: queue_dir = " + queue_dir)
# To deal with old versions of rsync
if self.rsync_version == 2:
#Old rsync version, needs a different flag
source_or_sent = "--remove-sent-files"
else:
source_or_sent = "--remove-source-files"

queue_transfer = """%s %s -aq %s -e \"%s %s\" %s/ %s@%s:%s/""" % (str(self.rsync), str(self.rsync_flags), str(source_or_sent), str(self.ssh), str(self.ssh_flags), str(queue_dir), str(self.user), str(host), str(self.r_archivedir))

self.debuglog("send_queue_func(): host = %s, queue_dir = %s, rsync_version = %d" % (host, queue_dir, self.rsync_version))
self.debuglog("send_queue_func(): transferring queue via: %s" % queue_transfer)

retval = os.system(queue_transfer)
if retval:
# If we failed to send data to this host - append it to the list
# of hosts to retry the sending attempt on
rtn.append(host)
if any(rtn):
self.debuglog("send_queue_func(): unreachable hosts: %s" % ", ".join(rtn))
return rtn

def queue_log_func(self, slave):
"""
Queue the current log file (self.archivefile) to a slave.
slave is name of slave to queue for
"""

queue_dir = os.path.join(self.l_archivedir, slave)
queue_transfer = "%s %s %s" % (self.rsync, self.archivefile, queue_dir)

self.debuglog("queue_log_func(): Queueing log via: %s" % queue_transfer)
ret = subprocess.call(queue_transfer)
if ret:
#Rsync returned non-zero exit status.
self.notify_external(log=True, critical=True, message="queue_log_func: rsync error %d" % ret)
raise ArchiveFailure
self.notify_external(ok=True, message="0")

def ship_log_func(self, slave):
"""
Ship the current log file (self.archivefile) to a slave. Queue the log for them if we fail.
slave is name of slave to queue for
"""
archivepath = os.path.join(self.pgdata, self.archivefile)
rsync_flags_list = self.rsync_flags.split()

log_transfer_cmd = [self.rsync]
log_transfer_cmd.extend(rsync_flags_list)
log_transfer_cmd.extend(['-q', '-e', '%s %s' % (self.ssh, self.ssh_flags), archivepath,
'%s@%s:%s' % (self.user, slave, self.r_archivedir)])

self.log("Shipping %s to %s" % (self.archivefile, slave))
self.debuglog("ship_log_func(): Shipping log via: %s" % log_transfer_cmd)
ret = subprocess.call(log_transfer_cmd)
if ret:
#Rsync returned non-zero exit status
self.notify_external(log=True, critical=True, message="ship_log_func: rsync error %d" % ret)
self.queue_log_func(slave)
self.debuglog("queue_log_func(%s) handled successfully" % slave)
self.notify_external(log=True, ok=True, message="0")

def handle_queues(self):
"""
Try to ship any existing slave queues. If we cannot send to a slave, queue self.archivefile for them.
Returns a list of slaves that were not queued to.
"""

slaves = self.generate_slave_list_func()
failed_slaves = self.send_queue_func()
if not failed_slaves:
return []
self.notify_external(log=True,
warning=True,
message="Couldn't send existing queues for hosts: %s. Queueing %s for them." % (", ".join(failed_slaves), self.archivefile))
for slave in failed_slaves:
self.queue_log_func(slave)
self.debuglog("queue_log_func(%s) handled successfully" % slave)

#Don't try to ship a log if we just queued it
return set(slaves).difference(set(failed_slaves))

def archive_func(self):
"""
The main archive function.
First we check the queue. If there are files in the queue we try to send
them.
If we can't send the files from the queue, we determining which slaves
can not send files. The archiver then automatically queues all logs for
those slaves which are not sending until they can send.
"""
self.debuglog("archive_func(): entered")
self.log("Archiving %s" % self.archivefile)

if self.state != "online":
self.notify_external(log=True, warning=True, message="cmd_archiver offline, queuing archives")
return False

#Send any existing queues, and queue to any that failed.
slaves = self.handle_queues()
self.debuglog("handle_queues() called")

#Attempt to ship self.archivefile to slaves that are reachable
if slaves:
self.log("Shipping archive to %s" % ", ".join(slaves))
for slave in slaves:
self.ship_log_func(slave)
self.debuglog("ship_log_func(%s) handled successfully" % slave)

self.log("Archiving of %s handled successfully" % self.archivefile)
return True
try:
# It is tempting to just hardlink it, but we need to make
# at least one full copy since postgres might want to
# recycle the WAL file at the same inode. The cmd_queue
# will establish hardlinks to individual slave subdirs,
# thus no extra space is taken by the WAL queue.
self.debuglog("copying %s to %s/" % (self.archivefile, self.l_archivedir))
import shutil
shutil.copy(os.path.join(self.pgdata, self.archivefile),
os.path.join(self.l_archivedir, ""))

# We call cmd_queue every time and let it figure itself if
# there's a copy running already. In case there is none,
# the daemon flag comes handy.
cmd_queue = [self.cmd_queue, "-C", self.configfilename,
"--daemon"]
self.debuglog("running cmd_queue as: %s" % repr(cmd_queue))
ret = subprocess.call(cmd_queue)
if ret != 0:
self.notify_external(log=True, critical=True, message=("cmd_queue returned error status: %d" % ret))
return False
return True
except Exception, e:
self.notify_external(log=True, critical=True, message=("Failed to archive file '%s': %s" % (self.archivefile, e)))
return False


if __name__ == '__main__':
Expand All @@ -294,13 +180,13 @@ if __name__ == '__main__':

archiver = CMDArchiver(classdict)
(options, args) = archiver.parse_commandline_arguments(argslist)
configfilename = options.configfilename
archiver.configfilename = options.configfilename
init = options.init

archiver.archivefile = options.archivefilename

try:
cfg_vals = archiver.load_configuration_file(configfilename)
cfg_vals = archiver.load_configuration_file(archiver.configfilename)
except Exception, e:
archiver.log(e, "ERROR")
exit(2)
Expand All @@ -313,7 +199,7 @@ if __name__ == '__main__':
print "We are initializing queues, one moment.\n"
success = archiver.init_env_func()
else:
archiver.log("Archiver running")
archiver.debuglog("Archiver running")
#Get binary paths and check config values
archiver.get_bin_paths_func(options)
success = archiver.check_config_func()
Expand All @@ -328,7 +214,7 @@ if __name__ == '__main__':
print "Config OK. Use -F FILE to actually archive one."
success = False # in case this was put into production

archiver.log("Archiver exiting %d" % (0 if success else 1))
archiver.debuglog("Archiver exiting with status %d" % (0 if success else 1))
if success:
sys.exit(0)
else:
Expand Down
Loading

0 comments on commit 4fe46c1

Please sign in to comment.