Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'zam/5306'
Conflicts: bin/cmd_archiver bin/cmd_worker.py
- Loading branch information
Showing
2 changed files
with
294 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,288 @@ | ||
#!/usr/bin/env python | ||
|
||
""" LICENSE | ||
Copyright Command Prompt, Inc. | ||
Permission to use, copy, modify, and distribute this software and its | ||
documentation for any purpose, without fee, and without a written agreement | ||
is hereby granted, provided that the above copyright notice and this | ||
paragraph and the following two paragraphs appear in all copies. | ||
IN NO EVENT SHALL THE COMMAND PROMPT, INC. BE LIABLE TO ANY PARTY FOR | ||
DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING | ||
LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, | ||
EVEN IF THE COMMAND PROMPT, INC. HAS BEEN ADVISED OF THE POSSIBILITY OF | ||
SUCH DAMAGE. | ||
THE COMMAND PROMPT, INC. SPECIFICALLY DISCLAIMS ANY WARRANTIES, | ||
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND | ||
FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN | ||
"AS IS" BASIS, AND THE COMMAND PROMPT, INC. HAS NO OBLIGATIONS TO | ||
PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. | ||
""" | ||
|
||
from cmd_worker import CMDWorker | ||
import Threading | ||
|
||
argslist = (("-I", "--init", | ||
dict(dest="init", action="store_true",help="Initialize master environment")), | ||
("-f", "--flush", | ||
dict(dest="flush", action="store_true", help="Flush all remaining archives to slave"))) | ||
|
||
classdict = (('state', 's', None), | ||
('rsync_flags', 's', ""), | ||
('slaves', 's', None), | ||
('user', 's', None), | ||
('r_archivedir', 's', None), | ||
('l_archivedir', 's', None), | ||
('ssh_timeout', 'i', None), | ||
('notify_ok', 's', None), | ||
('notify_warning', 's', None), | ||
('notify_critical', 's', None), | ||
('debug', 'b', False), | ||
('pgdata', 's', None), | ||
('pgcontroldata', 's', ""), | ||
('rsync_version', 'i', None), | ||
('includepath', 's', None), | ||
('ssh_debug', 'b', False)) | ||
|
||
class CMDQueueWorker(threading.Thread): | ||
|
||
|
||
class CMDQueue(CMDWorker): | ||
def generate_slave_list_func(self): | ||
""" | ||
We now support multiple slaves (see the README) in order do that properly | ||
we have to break up the string and turn it into a list | ||
""" | ||
self.slaves_list = slaves.strip("'").split(",") | ||
self.debuglog("generate_slave_list_func(): slaves_list = %s" % slaves) | ||
|
||
|
||
def init_env_func(self): | ||
""" | ||
Initialize the local queues so we can check each directory for left | ||
over files | ||
""" | ||
self.debuglog("init_env_func(): entered") | ||
l_archivedir = self.l_archivedir | ||
# bail out if archivedir exists and not empty or inaccessible. | ||
if (os.access(l_archivedir, F_OK)): | ||
if (not os.access(l_archivedir, R_OK | W_OK | X_OK)): | ||
self.log("init_env_func(): l_archivedir %s must allow have r/w/x bits set for the current user" % l_archivedir, "ERROR") | ||
return False | ||
elif (os.listdir(l_archivedir) != []): | ||
self.log("init_env_func(): l_archivedir %s must be be empty" % l_archivedir, "ERROR") | ||
return False | ||
else: | ||
self.log("init_env_func(): l_archivedir %s already exists" % l_archivedir, "WARNING") | ||
queues = self.slaves_list | ||
try: | ||
for host in queues: | ||
queue = os.path.join(l_archivedir, host) | ||
os.makedirs("%s" % (queue)) | ||
except OSError, e: | ||
self.log("init_env_func(): Can not make queue directories", "ERROR") | ||
print "EXCEPTION: %s" % (str(e)) | ||
exit(1) | ||
|
||
def check_config_func(self): | ||
""" | ||
Let's make sure that our directories and executables exist | ||
""" | ||
self.debuglog("check_config_func(): entered") | ||
pathvars = [self.rsync, self.pgdata, self.l_archivedir] | ||
for element in pathvars: | ||
try: | ||
os.stat("%s" % (str(element))) | ||
except OSError, e: | ||
print "Config %s: %s" % (str(element), str(e)) | ||
return False | ||
return True | ||
|
||
def get_pgcontroldata_func(self): | ||
""" | ||
get_pgcontroldata_func doesn't actually do anything yet. This is more | ||
for archival purposes so we can remember the regex | ||
""" | ||
if not self.pgcontroldata: | ||
print 'WARNING: path to pg_controldata utility is not set, assuming it\'s in PATH' | ||
pgcontroldata = 'pg_controldata' | ||
else: | ||
pgcontroldata = self.pgcontroldata | ||
try: | ||
cmd = os.popen("%s %s" % (str(pgcontroldata), str(self.pgdata))) | ||
#return cmd.readlines | ||
for row in cmd: | ||
match = re.search('^Prior checkpoint location: *.{1,}', '%s' % (str(row))) | ||
if match != None: | ||
print match | ||
except OSError, e: | ||
self.log("get_pgcontroldata_func(): %s" % e, "ERROR") | ||
exit(1) | ||
|
||
def flush_check_func(self): | ||
""" | ||
Simple function to make sure we require input before flushing a system | ||
""" | ||
self.debuglog("flush_check_func(): entered") | ||
print "\n\n" | ||
print "Warning! Flushing all logs will cause your slave to exit" | ||
print "Standby and start up. Please verify that this is exactly what you desire.\n\n""" | ||
|
||
print "I wish to force my slave into production: No/Yes\n\n" | ||
|
||
line = str(raw_input()) | ||
if line == "Yes": | ||
print "Flushing all xlogs" | ||
return | ||
elif line == "No": | ||
print "Exiting!" | ||
else: | ||
print "Your options are Yes and No" | ||
return False | ||
|
||
def list_queue_func(self): | ||
""" | ||
We only want to process archives for queues that have files, so we check | ||
and only return a queue/slave that has files to be shipped. | ||
""" | ||
self.debuglog("list_queue_func(): entered") | ||
# Empty host array | ||
hosts = [] | ||
|
||
# Loop through the list of slaves | ||
slaves = self.slaves_list | ||
for host in slaves: | ||
queuedir = os.path.join(self.l_archivedir, host) | ||
list_archives = os.listdir(queuedir) | ||
# If an archive directory is not empty, then we're good. | ||
if list_archives: | ||
# add to list of hosts | ||
hosts.append(host) | ||
self.debuglog("list_queue_func(): host `%s' queue not empty" % host) | ||
return hosts | ||
|
||
def send_queue_func(self): | ||
""" | ||
We are called before normal archive process in order to send queue files | ||
that have not been shipped yet. If we have to transfer and we error we | ||
return the slave that failed. | ||
""" | ||
rtn = [] | ||
self.debuglog("send_queue_func(): entered") | ||
|
||
for host in self.list_queue_func(): | ||
if self.debug: | ||
print "NOTICE: Host = " + host | ||
queue_dir = os.path.join(self.l_archivedir, host) | ||
if self.debug: | ||
print "NOTICE: queue_dir = " + queue_dir | ||
# To deal with old versions of rsync | ||
if self.rsync_version == 2: | ||
#Old rsync version, needs a different flag | ||
source_or_sent = "--remove-sent-files" | ||
else: | ||
source_or_sent = "--remove-source-files" | ||
|
||
queue_transfer = """%s %s -aq %s -e \"%s %s\" %s/ %s@%s:%s/""" % (str(self.rsync), str(self.rsync_flags), str(source_or_sent), str(self.ssh), str(self.ssh_flags), str(queue_dir), str(self.user), str(host), str(self.r_archivedir)) | ||
|
||
self.debuglog("send_queue_func(): host = %s, queue_dir = %s, rsync_version = %d" % (host, queue_dir, self.rsync_version)) | ||
self.debuglog("send_queue_func(): transferring queue via: %s" % queue_transfer) | ||
|
||
retval = os.system(queue_transfer) | ||
if retval: | ||
# If we failed to send data to this host - append it to the list | ||
# of hosts to retry the sending attempt on | ||
rtn.append(host) | ||
self.debuglog("send_queue_func(): unreachable hosts: %s" % ", ".join(rtn)) | ||
return rtn | ||
|
||
def queue_log_func(self, slave): | ||
""" | ||
Queue the current log file (self.archivefile) to a slave. | ||
slave is name of slave to queue for | ||
""" | ||
|
||
queue_dir = os.path.join(self.l_archivedir, slave) | ||
queue_transfer = "%s %s %s" % (self.rsync, self.archivefile, queue_dir) | ||
|
||
self.debuglog("queue_log_func(): Queueing log via: %s" % queue_transfer) | ||
ret = subprocess.call(queue_transfer) | ||
if ret: | ||
#Rsync returned non-zero exit status. | ||
self.notify_external(log=True, critical=True, message="queue_log_func: rsync error %d" % ret) | ||
raise ArchiveFailure | ||
self.notify_external(ok=True, message="0") | ||
|
||
def ship_log_func(self, slave): | ||
""" | ||
Ship the current log file (self.archivefile) to a slave. Queue the log for them if we fail. | ||
slave is name of slave to queue for | ||
""" | ||
archivepath = os.path.join(self.pgdata, self.archivefile) | ||
rsync_flags_list = self.rsync_flags.split() | ||
|
||
log_transfer_cmd = [self.rsync] | ||
log_transfer_cmd.extend(rsync_flags_list) | ||
log_transfer_cmd.extend(['-q', '-e', '%s %s' % (self.ssh, self.ssh_flags), archivepath, | ||
'%s@%s:%s' % (self.user, slave, self.r_archivedir)]) | ||
|
||
self.log("Shipping %s to %s" % (self.archivefile, slave)) | ||
self.debuglog("ship_log_func(): Shipping log via: %s" % log_transfer_cmd) | ||
ret = subprocess.call(log_transfer_cmd) | ||
if ret: | ||
#Rsync returned non-zero exit status | ||
self.notify_external(log=True, critical=True, message="ship_log_func: rsync error %d" % ret) | ||
self.queue_log_func(slave) | ||
self.debuglog("queue_log_func(%s) handled successfully" % slave) | ||
self.notify_external(log=True, ok=True, message="0") | ||
|
||
def handle_queues(self): | ||
""" | ||
Try to ship any existing slave queues. If we cannot send to a slave, queue self.archivefile for them. | ||
Returns a list of slaves that were not queued to. | ||
""" | ||
|
||
slaves = self.slaves_list | ||
failed_slaves = self.send_queue_func() | ||
if not failed_slaves: | ||
return [] | ||
self.notify_external(log=True, | ||
warning=True, | ||
message="Couldn't send existing queues for hosts: %s. Queueing %s for them." % (", ".join(failed_slaves), self.archivefile)) | ||
for slave in failed_slaves: | ||
self.queue_log_func(slave) | ||
self.debuglog("queue_log_func(%s) handled successfully" % slave) | ||
|
||
#Don't try to ship a log if we just queued it | ||
return set(slaves).difference(set(failed_slaves)) | ||
|
||
def run(self): | ||
""" | ||
""" | ||
self.debuglog("archive_func: self.archivefile = %s" % self.archivefile) | ||
if self.state != "online": | ||
self.notify_external(log=True, warning=True, message="cmd_archiver offline, queuing archives") | ||
return False | ||
|
||
#Send any existing queues, and queue to any that failed. | ||
slaves = self.handle_queues() | ||
self.debuglog("handle_queues() handled successfully") | ||
|
||
#Attempt to ship self.archivefile to slaves that are reachable | ||
for slave in slaves: | ||
self.ship_log_func(slave) | ||
self.debuglog("ship_log_func(%s) handled successfully" % slave) | ||
|
||
self.log("Archiving of %s handled successfully" % self.archivefile) | ||
|
||
classdict = (('queue_retry_delay', 'i', 5)) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters