diff --git a/sarra/examples/subscribe/hpfx_amis.conf b/sarra/examples/subscribe/hpfx_amis.conf index aa0d096b4..4a277de87 100644 --- a/sarra/examples/subscribe/hpfx_amis.conf +++ b/sarra/examples/subscribe/hpfx_amis.conf @@ -4,11 +4,13 @@ broker amqps://anonymous@hpfx.collab.science.gc.ca/ exchange xpublic # instances: number of downloading processes to run at once. defaults to 1. Not enough for this case -#instances 5 +instances 5 # expire, in operational use, should be longer than longest expected interruption expire 10m +on_message msg_stdfiles + subtopic *.WXO-DD.bulletins.alphanumeric.# mirror true directory /local/ben/amis/ diff --git a/sarra/plugins/msg_stdfiles.py b/sarra/plugins/msg_stdfiles.py new file mode 100755 index 000000000..04cad100f --- /dev/null +++ b/sarra/plugins/msg_stdfiles.py @@ -0,0 +1,30 @@ +#!/usr/bin/python3 + +""" + a on_msg callback to print the content of the AMQP message. + one can see clearly the difference between v02 and v03 messages. + +""" + +class Test_StdFiles(object): + + def __init__(self,parent): + parent.logger.debug("msg_rawlog initialized") + + def on_message(self,parent): + import sys + import subprocess + + msg = parent.msg + parent.logger.info("stdfiles closed? stdin={}, stdout={}, stderr={}".format(sys.stdin.closed, sys.stdout.closed, sys.stderr.closed)) + parent.logger.info("stdfiles fds? stdin={}, stdout={}, stderr={}".format(sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno())) + parent.logger.info("this is logging") + print("this is stdout") + print("this is stderr", file=sys.stderr) + subprocess.Popen(['/bin/echo', 'this is subprocess stdout']) + return True + +test_stdfiles = Test_StdFiles(self) + +self.on_message = test_stdfiles.on_message + diff --git a/sarra/sr_audit.py b/sarra/sr_audit.py index f6b3693b2..257c8d3ed 100755 --- a/sarra/sr_audit.py +++ b/sarra/sr_audit.py @@ -60,7 +60,7 @@ def amqp_close(self): self.hc = None def amqp_connect(self): - self.hc = HostConnect(logger = self.logger) + self.hc = HostConnect(logger=self.logger) self.hc.choose_amqp_alternative(self.use_amqplib, self.use_pika) self.hc.loop = False self.hc.set_url(self.admin) diff --git a/sarra/sr_config.py b/sarra/sr_config.py index 8d50878a3..0c75fa17d 100755 --- a/sarra/sr_config.py +++ b/sarra/sr_config.py @@ -37,6 +37,7 @@ import urllib, urllib.parse, urllib.request, urllib.error import shutil import sarra +import io from appdirs import * from logging import handlers @@ -65,31 +66,67 @@ pika_available = False # ========================================== -if sys.hexversion > 0x03030000 : +if sys.hexversion > 0x03030000: from shutil import get_terminal_size py2old=False else: py2old=True -class StreamToLogger(object): - """ - Fake file-like stream object that redirects writes to a logger instance. + +class StdFileLogWrapper(io.TextIOWrapper): + """ Wrapper that delegate stream write operations to an underlying logging handler stream + + It borrows the handler buffer at construction time and will update it as the logging rolls """ - def __init__(self, logger, log_level=logging.INFO): - self.logger = logger - self.log_level = log_level - self.linebuf = '' + def __init__(self, handler, stdno): + super(StdFileLogWrapper, self).__init__(handler.stream.buffer) + self.stream = handler.stream + self.handler = handler + self.stdno = stdno + self.dup2() def write(self, buf): - for line in buf.rstrip().splitlines(): - self.logger.log(self.log_level, line.rstrip()) + if self.stream != self.handler.stream: + # Support log rotation so need to check if stream changed + # don't trust self.stream for write operation only there to check the change + self.stream = self.handler.stream + self.dup2() + self.handler.stream.write(buf) + + def fileno(self): + """ Fake the file descriptor for the underlying standard fd ( 1 or 2 ) + + :return: the fileno from the faked std file + """ + return self.stdno + + def dup2(self): + """ Avoiding dup2 for win32 since 3.6 with the introduction of _WindowsConsoleIO + + introduced since issue 240: https://github.com/MetPX/sarracenia/issues/240 + :return: None + """ + if sys.platform != 'win32': + os.dup2(self.handler.stream.fileno(), self.fileno()) def flush(self): - """ - when stdout/stderr are assigned to a stream, builtin routine call flush. - if the logger doesn't have flush method, things bomb. - """ - pass + self.buffer.flush() + + def close(self): + pass + + @property + def closed(self): + """ Override the closed property in parent (io.TextIOWrapper) + + This is useful to really redirect the close info from the underlying stream + """ + return self.buffer.closed + + @property + def buffer(self): + return self.handler.stream.buffer + class sr_config: @@ -190,15 +227,12 @@ def __init__(self,config=None,args=None,action=None): # logging is interactive at start - self.debug = False self.statehost = False self.hostform = 'short' self.loglevel = logging.INFO - - self.LOG_FORMAT= '%(asctime)s [%(levelname)s] %(message)s' - # self.LOG_FORMAT= '%(asctime)s [%(levelname)s] %(message)s %(module)s/%(funcName)s #%(lineno)d' - logging.basicConfig(level=self.loglevel, format = self.LOG_FORMAT ) - self.logger = logging.getLogger() + self.logger = None + self.handler = None + self.setlog(interactive=True) self.logger.debug("sr_config __init__") # program_name @@ -614,10 +648,6 @@ def declare_option(self,option): def defaults(self): self.logger.debug("sr_config defaults") - # IN BIG DEBUG - #self.debug = True - self.debug = False - self.retry_mode = True self.retry_ttl = None @@ -921,9 +951,10 @@ def execfile(self, opname, path): self.logger.error("installing %s plugin %s failed: not found " % (opname, path) ) return False - try : - exec(compile(open(script).read(), script, 'exec')) - except : + try: + with open(script) as f: + exec(compile(f.read(), script, 'exec')) + except: self.logger.error("sr_config/execfile 2 failed for option '%s' and plugin '%s'" % (opname, path)) self.logger.debug('Exception details: ', exc_info=True) return False @@ -1183,23 +1214,26 @@ def list_file(self,path): self.run_command([ cmd, path ] ) - def run_command(self,cmd_list): + def run_command(self, cmd_list): sr_path = os.environ.get('SARRA_LIB') sc_path = os.environ.get('SARRAC_LIB') - import sys,subprocess + import sys + import subprocess try: - if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5) : - subprocess.check_call(cmd_list, close_fds=False ) - else : - self.logger.debug("using subprocess.run") - if sc_path and cmd_list[0].startswith("sr_cp"): - subprocess.run([sc_path+'/'+cmd_list[0]]+cmd_list[1:],check=True) - elif sr_path and cmd_list[0].startswith("sr"): - subprocess.run([sr_path+'/'+cmd_list[0]+'.py']+cmd_list[1:],check=True) - else: - subprocess.run(cmd_list,check=True) - except: self.logger.error("trying run command %s " % ' '.join(cmd_list) ) + if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5): + subprocess.check_call(cmd_list, close_fds=False) + else: + self.logger.debug("using subprocess.run") + if sc_path and cmd_list[0].startswith("sr_cp"): + subprocess.run([sc_path+'/'+cmd_list[0]]+cmd_list[1:], check=True) + elif sr_path and cmd_list[0].startswith("sr"): + subprocess.run([sr_path+'/'+cmd_list[0]+'.py']+cmd_list[1:], check=True) + else: + subprocess.run(cmd_list, check=True) + except Exception as e: + self.logger.error("trying run command {} with {}".format(' '.join(cmd_list), e)) + self.logger.debug("Exception details:", exc_info=True) def register_plugins(self): self.logger.debug("register_plugins") @@ -1685,19 +1719,13 @@ def option(self,words): n = 2 elif words0 == 'debug': # See: sr_config.7 - debug = self.debug - if (words1 is None) or words[0][0:1] == '-' : - self.debug = True + if (words1 is None) or words[0][0:1] == '-': + self.loglevel = logging.DEBUG n = 1 - else : - self.debug = self.isTrue(words[1]) + elif self.isTrue(words[1]): + self.loglevel = logging.DEBUG n = 2 - if self.debug : self.loglevel = logging.DEBUG - else: self.loglevel = logging.INFO - - if debug != self.debug : self.set_loglevel() - elif words0 == 'delete': # See: sr_sarra.8 if (words1 is None) or words[0][0:1] == '-' : self.delete = True @@ -2040,15 +2068,20 @@ def option(self,words): self.lr_interval = int(float(words1)) n = 2 - elif words0 in ['loglevel','ll']: # See: sr_config.7 + elif words0 in ['loglevel', 'll']: # See: sr_config.7 level = words1.lower() - if level in 'critical' : self.loglevel = logging.CRITICAL - elif level in 'error' : self.loglevel = logging.ERROR - elif level in 'info' : self.loglevel = logging.INFO - elif level in 'warning' : self.loglevel = logging.WARNING - elif level in 'debug' : self.loglevel = logging.DEBUG - elif level in 'none' : self.loglevel = None - self.set_loglevel() + if level in 'critical': + self.loglevel = logging.CRITICAL + elif level in 'error': + self.loglevel = logging.ERROR + elif level in 'info': + self.loglevel = logging.INFO + elif level in 'warning': + self.loglevel = logging.WARNING + elif level in 'debug': + self.loglevel = logging.DEBUG + elif level in 'none': + self.loglevel = logging.NOTSET n = 2 elif words0 in ['manager','feeder'] : # See: sr_config.7, sr_sarra.8 @@ -2466,16 +2499,17 @@ def option(self,words): self.source_from_exchange = self.isTrue(words[1]) n = 2 - elif words0 == 'statehost': # MG FIXME to be documented somewhere ??? + elif words0 == 'statehost': # MG FIXME to be documented somewhere ??? self.statehost = True - self.hostform = 'short' - if (words1 is None) or words[0][0:1] == '-' : + self.hostform = 'short' + if words1 is None or words[0][0:1] == '-': n = 1 - elif words1.lower() in ['short','fqdn']: - self.hostform = words1.lower() + elif words1.lower() in ['short', 'fqdn']: + self.hostform = words1.lower() n = 2 else: - if not self.isTrue(words[1]): self.statehost = False + if not self.isTrue(words[1]): + self.statehost = False n = 2 elif words0 == 'strip': # See: sr_config.7 @@ -2673,58 +2707,45 @@ def set_sumalgo(self,sumflg): self.lastflg = 'd' self.sumalgo = self.sumalgos['d'] + def setlog(self, interactive=False): + base_log_format = '%(asctime)s [%(levelname)s] %(message)s' + if logging.getLogger().hasHandlers(): + for h in logging.getLogger().handlers: + h.close() + logging.getLogger().removeHandler(h) + self.logger = logging.getLogger() + self.logger.setLevel(self.loglevel) - def set_loglevel(self): - if not self.loglevel: - if hasattr(self, 'logger'): - del self.logger - self.logpath = None - self.logger = logging.RootLogger(logging.CRITICAL) - self.logger.addHandler(logging.NullHandler()) + if interactive or not self.logpath: + logging.basicConfig(format=base_log_format, level=self.loglevel) + self.logger.debug("logging to the console with {}".format(self.logger)) else: - self.logger.setLevel(self.loglevel) - - def setlog(self): - if self.loglevel and self.logpath and self.lr_interval > 0 and self.lr_backupCount > 0: - self.logger.debug("Switching to rotating log file: %s" % self.logpath) + handler = self.create_handler(base_log_format, logging.DEBUG) + self.logger.addHandler(handler) + sys.stdout = StdFileLogWrapper(handler, 1) + sys.stderr = StdFileLogWrapper(handler, 2) + self.logger.debug("logging to file ({}) with {}".format(self.logpath, self.logger)) + + def create_handler(self, log_format, level): + if self.lr_interval > 0 and self.lr_backupCount > 0: handler = handlers.TimedRotatingFileHandler(self.logpath, when=self.lr_when, interval=self.lr_interval, backupCount=self.lr_backupCount) - self.create_new_logger(self.LOG_FORMAT, handler) - if self.chmod_log: - os.chmod(self.logpath, self.chmod_log) - sys.stdout = StreamToLogger(self.logger, logging.INFO) - sys.stderr = StreamToLogger(self.logger, logging.ERROR) - elif self.loglevel and self.logpath: - self.logger.debug("Switching to log file: %s" % self.logpath) - handler = logging.FileHandler(self.logpath) - self.create_new_logger(self.LOG_FORMAT, handler) - if self.chmod_log: - os.chmod(self.logpath, self.chmod_log) - elif self.loglevel: - self.logger.debug('Keeping on screen logging') - handler = logging.StreamHandler() - self.create_new_logger(self.LOG_FORMAT, handler) else: - self.set_loglevel() - - def create_new_logger(self, log_format, handler): - self.logger = logging.RootLogger(self.loglevel) - fmt = logging.Formatter(log_format) - handler.setFormatter(fmt) - self.logger.addHandler(handler) - - # check url and add credentials if needed from credential file + handler = logging.FileHandler(self.logpath) + handler.setFormatter(logging.Formatter(log_format)) + handler.setLevel(level) + if self.chmod_log: + os.chmod(self.logpath, self.chmod_log) + return handler def validate_urlstr(self,urlstr): - + # check url and add credentials if needed from credential file ok, details = self.credentials.get(urlstr) if details == None : self.logger.error("bad credential %s" % urlstr) return False, urllib.parse.urlparse(urlstr) - return True, details.url - def validate_parts(self): self.logger.debug("sr_config validate_parts %s" % self.parts) if not self.parts[0] in ['0','1','p','i']: diff --git a/sarra/sr_credentials.py b/sarra/sr_credentials.py index 7b171f41e..f488bcf77 100755 --- a/sarra/sr_credentials.py +++ b/sarra/sr_credentials.py @@ -223,27 +223,22 @@ def parse(self,line): self.logger.error("sr_credentials/parse %s" % line) self.logger.debug('Exception details: ', exc_info=True) - def read(self,path): + def read(self, path): self.logger.debug("sr_credentials read") # read in provided credentials (not mandatory) - try : - if os.path.exists(path): - - f = open(path,'r') - lines = f.readlines() - f.close - - for line in lines : - self.parse(line) - - except : - self.logger.error("sr_credentials/read path = %s" % path) - self.logger.debug('Exception details: ', exc_info=True) + try: + if os.path.exists(path): + with open(path) as f: + lines = f.readlines() + for line in lines: + self.parse(line) + except: + self.logger.error("sr_credentials/read path = %s" % path) + self.logger.debug('Exception details: ', exc_info=True) #self.logger.debug("credentials = %s\n" % self.credentials) - def resolve(self,urlstr, url = None): # create url object if needed diff --git a/sarra/sr_instances.py b/sarra/sr_instances.py index 271b2ae0a..05da10a12 100755 --- a/sarra/sr_instances.py +++ b/sarra/sr_instances.py @@ -547,7 +547,7 @@ def foreground_parent(self): self.nbr_instances = 0 self.build_instance(0) self.logpath = None - self.setlog() + self.setlog(interactive=True) self.start() def reload_instance(self): @@ -732,50 +732,33 @@ def start_instance(self): # inheritance of file descriptors changed. I think earlier versions require PIPE # later versions None is better. # use of Pipe causes issue: https://github.com/MetPX/sarracenia/issues/63 - subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + subprocess.Popen(cmd) def start_parent(self): self.logger.debug(" pid %d instances %d no %d \n" % (os.getpid(),self.nbr_instances,self.no)) - - # as parent - if self.no == -1 : - - # instance 0 is the parent... child starts at 1 - - i=1 - while i <= self.nbr_instances : - self.build_instance(i) - self.setlog() - self.start_instance() - i = i + 1 - - # the number of instances has decreased... stop excedent - if i <= self.last_nbr_instances: - self.stop_instances(i,self.last_nbr_instances) - - # write nbr_instances - self.file_set_int(self.statefile,self.nbr_instances) - - # as instance + if self.no == -1 : + # as parent: instance -1 is the parent, children start at 1 + i=1 + while i <= self.nbr_instances: + self.build_instance(i) + self.start_instance() + i = i + 1 + if i <= self.last_nbr_instances: + # the number of instances has decreased... stop excedent + self.stop_instances(i, self.last_nbr_instances) + self.file_set_int(self.statefile, self.nbr_instances) else: - self.build_instance(self.no) - self.pid = os.getpid() - ok = self.file_set_int(self.pidfile,self.pid) - self.setlog() - if self.no > 0: - os.close(0) - #lfd=os.open( self.logpath, os.O_CREAT|os.O_WRONLY|os.O_APPEND ) - #os.dup2(lfd,1) - #os.dup2(lfd,2) - - self.logger.debug("start instance %d (pid=%d)\n" % (self.no, self.pid) ) - - if not ok : - self.logger.error("could not write pid for instance %s" % self.instance_str) - self.logger.error("instance not started") - sys.exit(1) - - self.start() + # as instance, self.no > 0 + self.build_instance(self.no) + self.pid = os.getpid() + ok = self.file_set_int(self.pidfile,self.pid) + self.setlog() + self.logger.debug("start instance %d (pid=%d)\n" % (self.no, self.pid) ) + if not ok : + self.logger.error("could not write pid for instance %s" % self.instance_str) + self.logger.error("instance not started") + sys.exit(1) + self.start() sys.exit(0) def status_instance(self,sanity=False): diff --git a/sarra/sr_subscribe.py b/sarra/sr_subscribe.py index b268a538d..382fd32db 100755 --- a/sarra/sr_subscribe.py +++ b/sarra/sr_subscribe.py @@ -2123,7 +2123,8 @@ def test_sr_subscribe(): subscribe.option( opt1.split() ) subscribe.option( opt2.split() ) subscribe.option( opt3.split() ) - subscribe.debug = True + subscribe.loglevel = logging.DEBUG + subscribe.setlog() # ================== # set instance @@ -2184,7 +2185,6 @@ def test_sr_subscribe(): # =================================== def main(): - args,action,config,old = startup_args(sys.argv) subscribe = sr_subscribe(config,args,action) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/flow_setup.sh b/test/flow_setup.sh index 430f17b86..005238a0d 100755 --- a/test/flow_setup.sh +++ b/test/flow_setup.sh @@ -57,7 +57,7 @@ mkdir -p "$CONFDIR" 2> /dev/null export SR_CONFIG_EXAMPLES=`pwd`/../sarra/examples flow_configs="`cd ../sarra/examples; ls */*f[0-9][0-9].conf; ls */*f[0-9][0-9].inc`" -sr_action "Adding flow test configurations..." add "-debug" ">> $flowsetuplog 2>\\&1" "$flow_configs" +sr_action "Adding flow test configurations..." add " " ">> $flowsetuplog 2>\\&1" "$flow_configs" passed_checks=0 count_of_checks=0 @@ -122,16 +122,28 @@ nbr_fail=0 count_of_checks=$((${count_of_checks}+1)) -for t in sr_util sr_credentials sr_config sr_cache sr_retry sr_consumer sr_http sr_sftp sr_instances sr_pattern_match; do - echo "======= Testing :"${t} >> $unittestlog +echo "======= Testing: sr_config" >> $unittestlog +nbr_test=$(( ${nbr_test}+1 )) +python3 -m unittest -v ${TESTDIR}/unit_tests/sr_config_unit_test.py >> $unittestlog 2>&1 +status=${?} +if [ $status -ne 0 ]; then + echo "======= Testing sr_config: Failed" +else + echo "======= Testing sr_config: Succeeded" +fi + +nbr_fail=$(( ${nbr_fail}+${status} )) + +for t in sr_util sr_credentials sr_cache sr_retry sr_consumer sr_http sr_sftp sr_instances sr_pattern_match; do + echo "======= Testing: "${t} >> $unittestlog nbr_test=$(( ${nbr_test}+1 )) - ${TESTDIR}/unit_tests/${t}_unit_test.py >> $unittestlog 2>&1 - status=${?} - if [ $status -ne 0 ]; then - echo "======= Testing "${t}": Failed" - else - echo "======= Testing "${t}": Succeeded" - fi + ${TESTDIR}/unit_tests/${t}_unit_test.py >> $unittestlog 2>&1 + status=${?} + if [ $status -ne 0 ]; then + echo "======= Testing "${t}": Failed" + else + echo "======= Testing "${t}": Succeeded" + fi nbr_fail=$(( ${nbr_fail}+${status} )) done diff --git a/test/unit_tests/__init__.py b/test/unit_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/unit_tests/sr_config_unit_test.py b/test/unit_tests/sr_config_unit_test.py index 2b73d98a4..311801445 100755 --- a/test/unit_tests/sr_config_unit_test.py +++ b/test/unit_tests/sr_config_unit_test.py @@ -1,630 +1,727 @@ -#!/usr/bin/env python3 +""" This file is part of metpx-sarracenia. -import tempfile +metpx-sarracenia +Documentation: https://github.com/MetPX/sarracenia + +sr_config_unit_test.py : test utility tool used for sr_config -try : - from sr_config import * -except : - from sarra.sr_config import * - -# =================================== -# self_test -# =================================== - -def self_test(): - - failed = False - - # test include - f = open("./bbb.inc","w") - f.write("randomize True\n") - f.close() - f = open("./aaa.conf","w") - f.write("include bbb.inc\n") - f.close() - - # instantiation, test include and overwrite logs - cfg = sr_config(config="aaa") - cfg.configure() - - if not cfg.randomize : - print("test 01 : problem with include") - failed = True - - # back to defaults + check isTrue - cfg.defaults() - if not cfg.isTrue('true') or cfg.isTrue('false') : - print("test 02: problem with module isTrue") - failed = True - - # =================================== - # TESTING checksum - # =================================== - - cfg.configure() - - # creating a temporary directory with testfile test_chksum_file - - tmpdirname = tempfile.TemporaryDirectory().name - try : os.mkdir(tmpdirname) - except : pass - tmpfilname = 'test_chksum_file' - tmppath = tmpdirname + os.sep + 'test_chksum_file' - - f=open(tmppath,'wb') - f.write(b"0123456789") - f.write(b"abcdefghij") - f.write(b"ABCDEFGHIJ") - f.write(b"9876543210") - f.close() - - # checksum_0 - - cfg.set_sumalgo('0') - chk0 = cfg.sumalgo - chk0.set_path(tmppath) - f = open(tmppath,'rb') - for i in 0,1,2,3 : - chunk = f.read(10) - chk0.update(chunk) - f.close() - - v = int(chk0.get_value()) - if v < 0 or v > 9999 : - print("test 01: checksum_0 did not work") - failed = True - - - # checksum_d - - cfg.set_sumalgo('d') - chkd = cfg.sumalgo - chkd.set_path(tmppath) - f = open(tmppath,'rb') - for i in 0,1,2,3 : - chunk = f.read(10) - chkd.update(chunk) - f.close() - - if chkd.get_value() != '7efaff9e615737b379a45646c755c492' : - print("test 02: checksum_d did not work") - failed = True - - - # checksum_n - - cfg.set_sumalgo('n') - chkn = cfg.sumalgo - chkn.set_path(tmpfilname) - f = open(tmppath,'rb') - for i in 0,1,2,3 : - chunk = f.read(10) - chkn.update(chunk) - f.close() - - v=chkn.get_value() - - if chkn.get_value() != 'fd6b0296fe95e19fcef6f21f77efdfed' : - print("test 03: checksum_N did not work") - failed = True - - # checksum_N - - #cfg.set_sumalgo('N') - #chkN = cfg.sumalgo - #chkN.set_path(tmpfilname,'i,1,256,1,0,0') - #v = chkN.get_value() - - # this should do nothing - #chunk = 'aaa' - #chkN.update(chunk) - - #if chkN.get_value() != 'a0847ab809f83cb573b76671bb500a430372d2e3d5bce4c4cd663c4ea1b5c40f5eda439c09c7776ff19e3cc30459acc2a387cf10d056296b9dc03a6556da291f' : - # print("test 04: checksum_N did not work") - # failed = True - - # checksum_s - - - cfg.set_sumalgo('s') - chks = cfg.sumalgo - chks.set_path(tmppath) - f = open(tmppath,'rb') - for i in 0,1,2,3 : - chunk = f.read(10) - chks.update(chunk) - f.close() - - if chks.get_value() != 'e0103da78bbff9a741116e04f8bea2a5b291605c00731dcf7d26c0848bccf76dd2caa6771cb4b909d5213d876ab85094654f36d15e265d322b68fea4b78efb33': - print("test 05: checksum_s did not work") - failed = True - - os.unlink(tmppath) - os.rmdir(tmpdirname) - - # pluggin script checking - f = open("./scrpt.py","w") - f.write("class Transformer(object): \n") - f.write(" def __init__(self):\n") - f.write(" pass\n") - f.write("\n") - f.write(" def perform(self,parent):\n") - f.write(" if parent.this_value != 0 : return False\n") - f.write(" parent.this_value = 1\n") - f.write(" return True\n") - f.write("\n") - f.write("transformer = Transformer()\n") - f.write("self.on_message = transformer.perform\n") - f.close() - - # able to find the script - ok, path = cfg.config_path("plugins","scrpt.py",mandatory=True,ctype='py') - if not ok : - print("test 03: problem with config_path script not found") - failed = True - - # able to load the script - cfg.execfile("on_message",path) - if cfg.on_message == None : - print("test 04: problem with module execfile script not loaded") - failed = True - - # able to run the script - cfg.this_value = 0 - cfg.on_message(cfg) - if cfg.this_value != 1 : - print("test 05: problem to run the script ") - failed = True - os.unlink("./scrpt.py") - - # general ... - - cfg.general() - print(cfg.user_cache_dir) - print(cfg.user_log_dir) - print(cfg.user_config_dir) - - # args ... - - cfg.randomize = False - cfg.assemble = False - cfg.lr_backupCount = 5 - cfg.expire = 0 - expire_value = 1000*60*60*3 - message_value = 1000*60*60*24*7*3 - cfg.message_ttl = 0 - cfg.args(['-expire','3h','-message_ttl','3W','--randomize', '--assemble', 'True', '-logrotate', '5m', - '-logrotate_interval', '1m']) - if not cfg.randomize : - print("test 06: args problem randomize") - failed = True - if not cfg.inplace : - print("test 07: args problem assemble") - failed = True - if cfg.lr_interval != 1: - print("test 08a: args problem logrotate %s" % cfg.lr_interval) - failed = True - if cfg.lr_backupCount != 5: - print("test 08b: args problem logrotate %s" % cfg.lr_backupCount) - failed = True - if cfg.lr_when != 'm': - print("test 08c: args problem logrotate %s" % cfg.lr_when) - failed = True - if cfg.expire != expire_value : - print("test 09: args problem expire %s" % cfg.expire) - failed = True - if cfg.message_ttl != message_value : - print("test 10: args problem message_ttl %s" % cfg.message_ttl) - failed = True - - - # has_vip... - cfg.args(['-vip', '127.0.0.1' ]) - if not cfg.has_vip(): - print("test 11: has_vip failed") - failed = True - - - opt1 = "hostname toto" - opt2 = "broker amqp://a:b@${HOSTNAME}" - cfg.option(opt1.split()) - cfg.option(opt2.split()) - if cfg.broker.geturl() != "amqp://a:b@toto" : - print("test 12: varsub problem with replacing HOSTNAME") - failed = True - - opt1 = "parts i,128" - cfg.option(opt1.split()) - if cfg.partflg != 'i' or cfg.blocksize != 128: - print("test 13: option parts or module validate_parts") - failed = True - - opt1 = "sum z,d" - cfg.option(opt1.split()) - if cfg.sumflg != 'z,d' : - print("test 14: option sum or module validate_sum") - failed = True - - opt1 = "sum R,0" - cfg.option(opt1.split()) - if cfg.sumflg != 'R,0' : - print("test 15: option sum or module validate_sum") - failed = True - - opt1 = "sum d" - cfg.option(opt1.split()) - if cfg.sumflg != 'd' or cfg.sumalgo.registered_as() != 'd' : - print("test 16: option sum or module validate_sum") - failed = True - - opt1 = "sum 0" - cfg.option(opt1.split()) - if cfg.sumflg != '0' or cfg.sumalgo.registered_as() != '0' : - print("test 17: option sum or module validate_sum") - failed = True - - opt1 = "sum n" - cfg.option(opt1.split()) - if cfg.sumflg != 'n' or cfg.sumalgo.registered_as() != 'n' : - print("test 18: option sum or module validate_sum") - failed = True - - opt1 = "sum s" - cfg.option(opt1.split()) - if cfg.sumflg != 's' or cfg.sumalgo.registered_as() != 's': - print("test 19: option sum or module validate_sum") - failed = True - - #opt1 = "sum N" - #cfg.option(opt1.split()) - #if cfg.sumflg != 'N' or cfg.sumalgo.registered_as() != 'N': - # print("test 19b: option sum or module validate_sum") - # failed = True - - opt1 = "move toto titi" - cfg.option(opt1.split()) - if cfg.movepath[0] != 'toto' or cfg.movepath[1] != 'titi' : - print("test 20: option move for sr_post does not work") - failed = True - - opt1 = "path .. ." - cfg.option(opt1.split()) - if cfg.postpath[0] != os.path.abspath('..') or cfg.postpath[1] != os.path.abspath('.') : - print("test 21: option path for sr_post does not work") - failed = True - - opt1 = "inflight ." - cfg.option(opt1.split()) - if cfg.inflight != '.' : - print("test 22: option inflight . does not work") - failed = True - - opt1 = "inflight .tmp" - cfg.option(opt1.split()) - if cfg.inflight != '.tmp' : - print("test 23: option inflight .tmp does not work") - failed = True - - opt1 = "inflight 1.5" - cfg.option(opt1.split()) - if cfg.inflight != 1.5 : - print("test 24: option inflight 1.5 does not work") - failed = True - - opt1 = "prefetch 10" - cfg.option(opt1.split()) - if cfg.prefetch != 10 : - print("test 25: prefetch option did not work") - failed = True - - # reexecuting the config aaa.conf - - print("test 25b: reparsing aaa.conf that includes bbb.inc") - cfg.config(cfg.user_config) - os.unlink('aaa.conf') - os.unlink('bbb.inc') - print("test 25b: worked") - - opt1 = "header toto1=titi1" - cfg.option(opt1.split()) - opt2 = "header toto2=titi2" - cfg.option(opt2.split()) - opt3 = "header tutu1=None" - cfg.option(opt3.split()) - opt4 = "header tutu2=None" - cfg.option(opt4.split()) - - if not 'toto1' in cfg.headers_to_add or \ - not 'toto2' in cfg.headers_to_add or \ - cfg.headers_to_add['toto1'] != 'titi1' or \ - cfg.headers_to_add['toto2'] != 'titi2' or \ - len(cfg.headers_to_add) != 2 : - print("test 26: option header adding entries did not work") - failed = True - - if not 'tutu1' in cfg.headers_to_del or \ - not 'tutu2' in cfg.headers_to_del or \ - len(cfg.headers_to_del) != 2 : - print("test 27: option header deleting entries did not work") - failed = True - - # expire in ms - opt4 = "expire 10m" - cfg.option(opt4.split()) - if cfg.expire != 600000 : - print("test 28: option expire or module duration_from_str did not work") - failed = True - - # message_ttl in ms - opt4 = "message_ttl 20m" - cfg.option(opt4.split()) - if cfg.message_ttl != 1200000 : - print("test 29: option message_ttl or module duration_from_str did not work") - failed = True - - os.environ["VAR1"] = "michel" - os.environ["VAR2"] = "peter" - os.environ["VAR3"] = "jun" - opt4="directory ${VAR1}/${VAR2}/${VAR3}/blabla" - cfg.option(opt4.split()) - if '$' in cfg.currentDir or cfg.currentDir != 'michel/peter/jun/blabla': - print("test 30: env variable substitution failed %s" % cfg.currentDir) - failed = True - - opt4='strip 4' - cfg.option(opt4.split()) - if cfg.strip != 4 : - print("test 31: option strip with integer failed") - failed = True - - opt4='strip .*aaa' - cfg.option(opt4.split()) - if cfg.pstrip != '.*aaa' : - print("test 32: option strip with pattern failed") - failed = True - - pika = cfg.use_pika - - opt4='use_pika True' - cfg.option(opt4.split()) - if pika_available and not cfg.use_pika and not cfg.use_amqplib: - print("test 33: option use_pika boolean set to true failed") - failed = True - - opt4='use_pika False' - cfg.option(opt4.split()) - if cfg.use_pika : - print("test 34: option use_pika boolean set to false failed") - failed = True - - opt4='use_pika' - cfg.option(opt4.split()) - if pika_available and not cfg.use_pika and not cfg.use_amqplib: - print("test 35: option use_pika boolean set to true without value failed") - failed = True - - opt4='statehost False' - cfg.option(opt4.split()) - if cfg.statehost : - print("test 35: option statehost boolean set to false failed") - failed = True - - opt4='statehost True' - cfg.option(opt4.split()) - if not cfg.statehost or cfg.hostform != 'short' : - print("test 36: option statehost boolean set to true, hostform short, failed") - failed = True - - opt4='statehost SHORT' - cfg.option(opt4.split()) - if not cfg.statehost or cfg.hostform != 'short' : - print("test 37: option statehost set to SHORT, hostform short, failed") - failed = True - - opt4='statehost fqdn' - cfg.option(opt4.split()) - if not cfg.statehost or cfg.hostform != 'fqdn' : - print("test 38: option statehost set to fqdn, hostform fqdn, failed") - failed = True - - opt4='statehost TOTO' - cfg.option(opt4.split()) - if cfg.statehost and cfg.hostform != None: - print("test 39: option statehost set badly ... did not react correctly, failed") - failed = True - - # extended options - - opt4='extended TOTO' - cfg.option(opt4.split()) - - cfg.declare_option('extended') - if not cfg.check_extended(): - print("test 40: extend with new option, option was declared, but check_extended complained(False)") - failed = True - - opt4='extended_bad TITI' - cfg.option(opt4.split()) - # modify this test... causes error to be printed out ... which is ok... but annoying for conformity tests - #if cfg.check_extended(): - if cfg.extended_bad[0] != 'TITI' : - print("test 41: extend with new option, option not declared, value should still be ok") - failed = True - - opt1 = "surplus_opt surplus_value" - cfg.option(opt1.split()) - if cfg.surplus_opt != [ "surplus_value" ] : - print("test 42: extend option did not work") - failed = True - - opt1 = "surplus_opt surplus_value2" - cfg.option(opt1.split()) - if cfg.surplus_opt[0] != "surplus_value" or cfg.surplus_opt[1] != "surplus_value2": - print("test 43: extend option list did not work") - failed = True - - # more options testing - - opt1 = "base_dir /home/aspymjg/dev/metpx-sarracenia/sarra" - cfg.option(opt1.split()) - if cfg.base_dir != '/home/aspymjg/dev/metpx-sarracenia/sarra': - print("test 44: string option base_dir did not work") - failed = True - - opt1 = "post_base_dir /totot/toto" - cfg.option(opt1.split()) - if cfg.post_base_dir != '/totot/toto': - print("test 45: string option post_base_dir did not work") - failed = True - - opt1 = "post_base_url file://toto" - cfg.option(opt1.split()) - if cfg.post_base_url != 'file://toto': - print("test 46: url option post_base_url did not work") - failed = True - - if cfg.outlet != 'post' : - print("test 47: default error outlet = %s" % self.outlet) - failed = True - - opt1 = "outlet json" - cfg.option(opt1.split()) - if cfg.outlet != 'json' : - print("test 48: option outlet value json did not work") - failed = True - - opt1 = "outlet url" - cfg.option(opt1.split()) - if cfg.outlet != 'url' : - print("test 49: option outlet value url did not work") - failed = True - - opt1 = "outlet post" - cfg.option(opt1.split()) - if cfg.outlet != 'post' : - print("test 50: option outlet value post did not work") - failed = True - - # bad option setting skipped ... its output confuses conformity... - # complains about an error... and it is ok to complain. - opt1 = "outlet toto" - #cfg.option(opt1.split()) - #if cfg.outlet != 'post' : - # print("test 51: option outlet with bad value did not work") - # failed = True - - if not cfg.retry_mode : - print("test 52: retry_mode should be the default") - failed = True - - opt1 = "retry_mode false" - cfg.option(opt1.split()) - if cfg.retry_mode : - print("test 53: retry_mode should be false") - failed = True - - # retry_ttl in mins - opt1 = "retry_ttl 1D" - cfg.option(opt1.split()) - if cfg.retry_ttl != 86400 : - print("test 54: option retry_ttl or module duration_from_str did not work") - failed = True - - # exchange_suffix - opt1 = "exchange_suffix suffix1" - cfg.option(opt1.split()) - opt1 = "post_exchange_suffix suffix2" - cfg.option(opt1.split()) - if cfg.exchange_suffix != 'suffix1' or cfg.post_exchange_suffix != 'suffix2' : - print("test 55: option exchange_suffix or post_exchange_suffix did not work") - failed = True - - # internal variables substitution - - opt1 = "broker amqp://michel:passwd@testbroker.toto" - cfg.option(opt1.split()) - opt1 = "post_base_dir /${broker.hostname}/${broker.username}" - cfg.option(opt1.split()) - if cfg.post_base_dir != '/testbroker.toto/michel': - print("test 56: replacing internal ${broker.hostname} ${broker.username} did not work") - failed = True - - cfg.toto = ['tutu1','tutu2'] - opt1 = "post_base_dir /${toto[1]}/${broker.username}/aaa" - cfg.option(opt1.split()) - if cfg.post_base_dir != '/tutu2/michel/aaa': - print("test 57: replacing internal ${toto[1]} did not work") - failed = True - - cfg.toto = ['tutu1','tutu2'] - opt1 = "post_base_dir /${toto}/${broker.username}/aaa" - cfg.option(opt1.split()) - if cfg.post_base_dir != '/tutu1/michel/aaa': - print("test 58: replacing internal ${toto} did not work") - failed = True - - # more config test to perform for full coverage... +Code contributed by: + Benoit Lapointe - Shared Services Canada +""" +import _io +import tempfile +import unittest + +try: + from sr_config import * +except ImportError: + from sarra.sr_config import * + + +class SrConfigTestCase(unittest.TestCase): + """ The parent class of all sr_config test cases + + It handles base configs used in all tests + """ + cfg = None + + @classmethod + def setUpClass(cls) -> None: + """ Setup basic config file with basic include + + :return: None + """ + f = open("./bbb.inc", "w") + f.write("randomize True\n") + f.close() + f = open("./aaa.conf", "w") + f.write("include bbb.inc\n") + f.close() + + # instantiation, test include and overwrite logs + cls.cfg = sr_config(config="aaa") + cls.cfg.configure() + + @classmethod + def tearDownClass(cls) -> None: + """ Remove configs for sr_config tests + + :return: None + """ + os.unlink("./bbb.inc") + os.unlink("./aaa.conf") + os.removedirs(cls.cfg.user_cache_dir) + + +class SrConfigRandomizeTestCase(SrConfigTestCase): + def test_include_inc(self): + self.assertTrue(self.cfg.randomize, "test 01a: problem with include") + + def test_isTrue(self): + # back to defaults + check isTrue + self.cfg.defaults() + self.assertTrue(self.cfg.isTrue('true') or not self.cfg.isTrue('false'), "test 01b: problem with module isTrue") + + +class SrConfigChecksumTestCase(SrConfigTestCase): + """ Test cases related to checksum handling + + """ + tmpdir = None + tmpdirname = None + tmppath = None + + @classmethod + def setUpClass(cls) -> None: + """ Setup path used by all checksum test cases + + :return: + """ + super(SrConfigChecksumTestCase, cls).setUpClass() + cls.tmpdir = tempfile.TemporaryDirectory() + cls.tmpfilname = 'test_chksum_file' + cls.tmppath = os.path.join(cls.tmpdir.name, cls.tmpfilname) + with open(cls.tmppath, 'wb') as f: + f.write(b"0123456789") + f.write(b"abcdefghij") + f.write(b"ABCDEFGHIJ") + f.write(b"9876543210") + + @classmethod + def tearDownClass(cls) -> None: + """ Remove paths of checksum test cases + + :return: + """ + super(SrConfigChecksumTestCase, cls).tearDownClass() + os.unlink(cls.tmppath) + cls.tmpdir.cleanup() + + def test_checksum_0(self): + self.cfg.set_sumalgo('0') + chk0 = self.cfg.sumalgo + chk0.set_path(self.tmppath) + with open(self.tmppath, 'rb') as f: + for i in range(4): + chunk = f.read(10) + chk0.update(chunk) + v = int(chk0.get_value()) + self.assertGreaterEqual(v, 0, "test 02a: checksum_0 did not work") + self.assertLessEqual(v, 9999, "test 02b: checksum_0 did not work") + + def test_checksum_d(self): + self.cfg.set_sumalgo('d') + chkd = self.cfg.sumalgo + chkd.set_path(self.tmppath) + with open(self.tmppath, 'rb') as f: + for i in range(4): + chunk = f.read(10) + chkd.update(chunk) + self.assertEqual(chkd.get_value(), '7efaff9e615737b379a45646c755c492', "test 02c: checksum_d did not work") + + def test_checksum_n(self): + self.cfg.set_sumalgo('n') + chkn = self.cfg.sumalgo + chkn.set_path(self.tmpfilname) + with open(self.tmppath, 'rb') as f: + for i in range(4): + chunk = f.read(10) + chkn.update(chunk) + self.assertEqual(chkn.get_value(), 'fd6b0296fe95e19fcef6f21f77efdfed', "test 02d: checksum_n did not work") + + @unittest.skip("Commented # TODO why this has been commented") + def test_checksum_N(self): + self.cfg.set_sumalgo('N') + chk_n = self.cfg.sumalgo + chk_n.set_path(self.tmpfilname, 'i,1,256,1,0,0') + chunk = 'aaa' + chk_n.update(chunk) + long_chksum = 'a0847ab809f83cb573b76671bb500a430372d2e3d5bce4c4cd663c4ea1b5c40f5eda439c09c7776ff19e3cc30459a' \ + 'cc2a387cf10d056296b9dc03a6556da291f' + self.assertEqual(chk_n.get_value(), long_chksum, "test 02e: checksum_N did not work") + + def test_checksum_s(self): + # checksum_s + self.cfg.set_sumalgo('s') + chks = self.cfg.sumalgo + chks.set_path(self.tmppath) + with open(self.tmppath, 'rb') as f: + for i in range(4): + chunk = f.read(10) + chks.update(chunk) + long_chksum = 'e0103da78bbff9a741116e04f8bea2a5b291605c00731dcf7d26c0848bccf76dd2caa6771cb4b909d5213d876ab' \ + '85094654f36d15e265d322b68fea4b78efb33' + self.assertEqual(chks.get_value(), long_chksum, "test 02f: checksum_s did not work") + + +class SrConfigPluginScriptTestCase(SrConfigTestCase): + """ Test cases related to plugin interfacing """ + def setUp(self) -> None: + """ Creating a dummy script plugin which will be tested + + :return: None + """ + with open("./scrpt.py", "w") as f: + f.write("class Transformer(object): \n") + f.write(" def __init__(self):\n") + f.write(" pass\n") + f.write("\n") + f.write(" def perform(self,parent):\n") + f.write(" if parent.this_value != 0 : return False\n") + f.write(" parent.this_value = 1\n") + f.write(" return True\n") + f.write("\n") + f.write("transformer = Transformer()\n") + f.write("self.on_message = transformer.perform\n") + self.ok, self.path = self.cfg.config_path("plugins", "scrpt.py", mandatory=True, ctype='py') + + def tearDown(self) -> None: + """ Remove the script plugin file """ + os.unlink("./scrpt.py") + + def test_find_script(self): + self.assertTrue(self.ok, "test 03: problem with config_path script not found") + + def test_load_script(self): + self.cfg.execfile("on_message", self.path) + self.assertIsNotNone(self.cfg.on_message, "test 04: problem with module execfile script not loaded") + + def test_run_script(self): + self.cfg.this_value = 0 + self.cfg.on_message(self.cfg) + self.assertEqual(self.cfg.this_value, 1, "test 05: problem to run the script ") + + +class SrConfigGeneralTestCase(SrConfigTestCase): + """ Test cases related to general config parsing an interpretation logic """ + def setUp(self) -> None: + """ Creates configuration which are generic to all test cases + + :return: None + """ + self.cfg.general() + self.cfg.randomize = False + self.cfg.assemble = False + self.cfg.lr_backupCount = 5 + self.cfg.expire = 0 + self.expire_value = 1000 * 60 * 60 * 3 + self.message_value = 1000 * 60 * 60 * 24 * 7 * 3 + self.cfg.message_ttl = 0 + self.cfg.args(['-expire', '3h', '-message_ttl', '3W', '--randomize', '--assemble', 'True', '-logrotate', '5m', + '-logrotate_interval', '1m']) + self.cfg.toto = ['tutu1', 'tutu2'] + opt1 = "broker amqp://michel:passwd@testbroker.toto" + self.cfg.option(opt1.split()) + + def test_user_cache_dir(self): + test_cache_path = os.path.join(os.path.expanduser('~'), '.cache', 'sarra', self.cfg.program_name, 'aaa') + self.assertEqual(self.cfg.user_cache_dir, test_cache_path) + + def test_user_log_dir(self): + test_cache_path = os.path.join(os.path.expanduser('~'), '.cache', 'sarra', 'log') + self.assertEqual(self.cfg.user_log_dir, test_cache_path) + + def test_user_config_dir(self): + test_cache_path = os.path.join(os.path.expanduser('~'), '.config', 'sarra') + self.assertEqual(self.cfg.user_config_dir, test_cache_path) + + def test_randomize(self): + self.assertTrue(self.cfg.randomize, "test 06: args problem randomize") + + def test_inplace(self): + self.assertTrue(self.cfg.inplace, "test 07: args problem assemble") + + def test_lr_interval(self): + self.assertEqual(self.cfg.lr_interval, 1, "test 08a: args problem logrotate %s" % self.cfg.lr_interval) + + def test_lr_backupCount(self): + self.assertEqual(self.cfg.lr_backupCount, 5, "test 08b: args problem logrotate %s" % self.cfg.lr_backupCount) + + def test_lr_when(self): + self.assertEqual(self.cfg.lr_when, 'm', "test 08c: args problem logrotate %s" % self.cfg.lr_when) + + def test_expire(self): + self.assertEqual(self.cfg.expire, self.expire_value, "test 09: args problem expire %s" % self.cfg.expire) + + def test_msg_ttl(self): + self.assertEqual(self.cfg.message_ttl, self.message_value, + "test 10: args problem message_ttl %s" % self.cfg.message_ttl) + + def test_has_vip(self): + # has_vip... + self.cfg.args(['-vip', '127.0.0.1']) + self.assertTrue(self.cfg.has_vip(), "test 11: has_vip failed") + + def test_broker_url(self): + opt1 = "hostname toto" + opt2 = "broker amqp://a:b@${HOSTNAME}" + self.cfg.option(opt1.split()) + self.cfg.option(opt2.split()) + self.assertEqual(self.cfg.broker.geturl(), "amqp://a:b@toto", "test 12: varsub problem with replacing HOSTNAME") + + def test_partflg(self): + opt1 = "parts i,128" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.partflg, 'i', "test 13a: option parts or module validate_parts") + self.assertEqual(self.cfg.blocksize, 128, "test 13b: option parts or module validate_parts") + + def test_sumflg_zd(self): + opt1 = "sum z,d" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sumflg, 'z,d', "test 14: option sum or module validate_sum") + + def test_sumflg_r0(self): + opt1 = "sum R,0" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sumflg, 'R,0', "test 15: option sum or module validate_sum") + + def test_sumflg_d(self): + opt1 = "sum d" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sumflg, 'd', "test 16a: option sum or module validate_sum") + self.assertEqual(self.cfg.sumalgo.registered_as(), 'd', "test 16b: option sum or module validate_sum") + + def test_sum_0(self): + opt1 = "sum 0" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sumflg, '0', "test 17a: option sum or module validate_sum") + self.assertEqual(self.cfg.sumalgo.registered_as(), '0', "test 17b: option sum or module validate_sum") + + def test_sum_n(self): + opt1 = "sum n" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sumflg, 'n', "test 18a: option sum or module validate_sum") + self.assertEqual(self.cfg.sumalgo.registered_as(), 'n', "test 18b: option sum or module validate_sum") + + def test_sum_s(self): + opt1 = "sum s" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sumflg, 's', "test 19a: option sum or module validate_sum") + self.assertEqual(self.cfg.sumalgo.registered_as(), 's', "test 19b: option sum or module validate_sum") + + @unittest.skip("Commented # TODO why this has been commented") + def test_sum_N(self): + opt1 = "sum N" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sumflg, 'N', "test 19c: option sum or module validate_sum") + self.assertEqual(self.cfg.sumalgo.registered_as(), 'N', "test 19d: option sum or module validate_sum") + + def test_movepath(self): + opt1 = "move toto titi" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.movepath[0], 'toto', "test 20a: option move for sr_post does not work") + self.assertEqual(self.cfg.movepath[1], 'titi', "test 20b: option move for sr_post does not work") + + def test_postpath(self): + opt1 = "post_base_dir None" + self.cfg.option(opt1.split()) + opt1 = "path .. ." + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.postpath[0], os.path.abspath('..'), "test 21a: option path for sr_post does not work") + self.assertEqual(self.cfg.postpath[1], os.path.abspath('.'), "test 21b: option path for sr_post does not work") + + def test_inflight(self): + opt1 = "inflight ." + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.inflight, '.', "test 22: option inflight . does not work") + + def test_inflight_tmp(self): + opt1 = "inflight .tmp" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.inflight, '.tmp', "test 23: option inflight .tmp does not work") + + def test_inflight_1_5(self): + opt1 = "inflight 1.5" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.inflight, 1.5, "test 24: option inflight 1.5 does not work") + + def test_prefetch_10(self): + opt1 = "prefetch 10" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.prefetch, 10, "test 25: prefetch option did not work") + + def test_reparsing_include(self): + self.cfg.config(self.cfg.user_config) + self.assertTrue(True, "test 25b: failed") + + def test_header_add(self): + opt1 = "header toto1=titi1" + self.cfg.option(opt1.split()) + opt2 = "header toto2=titi2" + self.cfg.option(opt2.split()) + self.assertIn('toto1', self.cfg.headers_to_add, "test 26a: option header adding entries did not work") + self.assertIn('toto2', self.cfg.headers_to_add, "test 26b: option header adding entries did not work") + self.assertEqual(self.cfg.headers_to_add['toto1'], 'titi1', + "test 26c: option header adding entries did not work") + self.assertEqual(len(self.cfg.headers_to_add), 2, "test 26d: option header adding entries did not work") + + def test_header_delete(self): + opt3 = "header tutu1=None" + self.cfg.option(opt3.split()) + opt4 = "header tutu2=None" + self.cfg.option(opt4.split()) + self.assertIn('tutu1', self.cfg.headers_to_del, "test 27a: option header deleting entries did not work") + self.assertIn('tutu2', self.cfg.headers_to_del, "test 27b: option header deleting entries did not work") + self.assertEqual(len(self.cfg.headers_to_del), 2, "test 27c: option header deleting entries did not work") + + def test_expire_from_str(self): + # expire in ms + opt4 = "expire 10m" + self.cfg.option(opt4.split()) + self.assertEqual(self.cfg.expire, 600000, "test 28: option expire or module duration_from_str did not work") + + def test_message_ttl(self): + # message_ttl in ms + opt4 = "message_ttl 20m" + self.cfg.option(opt4.split()) + self.assertEqual(self.cfg.message_ttl, 1200000, + "test 29: option message_ttl or module duration_from_str did not work") + + def test_currentDir(self): + os.environ["VAR1"] = "michel" + os.environ["VAR2"] = "peter" + os.environ["VAR3"] = "jun" + opt4 = "directory ${VAR1}/${VAR2}/${VAR3}/blabla" + self.cfg.option(opt4.split()) + self.assertNotIn('$', self.cfg.currentDir, + "test 30a: env variable substitution failed {}".format(self.cfg.currentDir)) + self.assertEqual(self.cfg.currentDir, 'michel/peter/jun/blabla', + "test 30b: env variable substitution failed {}".format(self.cfg.currentDir)) + + def test_strip(self): + opt4 = 'strip 4' + self.cfg.option(opt4.split()) + self.assertEqual(self.cfg.strip, 4, "test 31: option strip with integer failed") + + def test_pstrip(self): + opt4 = 'strip .*aaa' + self.cfg.option(opt4.split()) + self.assertEqual(self.cfg.pstrip, '.*aaa', "test 32: option strip with pattern failed") + + @unittest.skipIf(not pika_available, "pika library is not available") + def test_use_pika(self): + opt4 = 'use_pika' + self.cfg.option(opt4.split()) + self.assertTrue(self.cfg.use_pika and not self.cfg.use_amqplib, + "test 33a: option use_pika boolean set to true without value failed") + + @unittest.skipIf(not pika_available, "pika library is not available") + def test_use_pika_true(self): + opt4 = 'use_pika True' + self.cfg.option(opt4.split()) + self.assertTrue(self.cfg.use_pika and not self.cfg.use_amqplib, + "test 33b: option use_pika boolean set to true failed") + + @unittest.skipIf(not pika_available, "pika library is not available") + def test_use_pika_false(self): + opt4 = 'use_pika False' + self.cfg.option(opt4.split()) + self.assertTrue(not self.cfg.use_pika and not self.cfg.use_amqplib, + "test 34: option use_pika boolean set to false failed") + + def test_statehost_false(self): + opt4 = 'statehost False' + self.cfg.option(opt4.split()) + self.assertFalse(self.cfg.statehost, "test 35: option statehost boolean set to false failed") + + def test_statehost_true(self): + opt4 = 'statehost True' + self.cfg.option(opt4.split()) + self.assertTrue(self.cfg.statehost, "test 36a: option statehost boolean set to true, hostform short, failed") + self.assertEqual(self.cfg.hostform, 'short', + "test 36b: option statehost boolean set to true, hostform short, failed") + + def test_statehost_short(self): + opt4 = 'statehost SHORT' + self.cfg.option(opt4.split()) + self.assertTrue(self.cfg.statehost, "test 37a: option statehost set to SHORT, hostform short, failed") + self.assertEqual(self.cfg.hostform, 'short', "test 37b: option statehost set to SHORT, hostform short, failed") + + def test_statehost_fqdn(self): + opt4 = 'statehost fqdn' + self.cfg.option(opt4.split()) + self.assertTrue(self.cfg.statehost, "test 38a: option statehost set to fqdn, hostform fqdn, failed") + self.assertEqual(self.cfg.hostform, 'fqdn', "test 38b: option statehost set to fqdn, hostform fqdn, failed") + + def test_statehost_bad(self): + opt4 = 'statehost TOTO' + self.cfg.option(opt4.split()) + self.assertFalse(self.cfg.statehost, "test 39a: option statehost set badly ... did not react correctly, failed") + self.assertIsNotNone(self.cfg.hostform, + "test 39b: option hostform set badly ... did not react correctly, failed") + + def test_extended(self): + opt4 = 'extended TOTO' + self.cfg.option(opt4.split()) + self.cfg.declare_option('extended') + self.assertTrue(self.cfg.check_extended(), + "test 40: extend with new option, option was declared, but check_extended complained(False)") + + def test_extended_first(self): + opt4 = 'extended_bad TITI' + self.cfg.option(opt4.split()) + # modify this test... causes error to be printed out ... which is ok... but annoying for conformity tests + # if self.cfg.check_extended(): + self.assertEqual(self.cfg.extended_bad[0], 'TITI', + "test 41: extend with new option, option not declared, value should still be ok") + + def test_extended_list(self): + opt1 = "surplus_opt surplus_value" + self.cfg.option(opt1.split()) + opt1 = "surplus_opt surplus_value2" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.surplus_opt[0], "surplus_value", "test 43a: extend option list did not work") + self.assertEqual(self.cfg.surplus_opt[1], "surplus_value2", "test 43b: extend option list did not work") + + def test_base_dir(self): + opt1 = "base_dir /home/aspymjg/dev/metpx-sarracenia/sarra" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.base_dir, '/home/aspymjg/dev/metpx-sarracenia/sarra', + "test 44: string option base_dir did not work") + + def test_post_base_dir(self): + opt1 = "post_base_dir /totot/toto" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.post_base_dir, '/totot/toto', "test 45: string option post_base_dir did not work") + + def test_post_base_url(self): + opt1 = "post_base_url file://toto" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.post_base_url, 'file://toto', "test 46: url option post_base_url did not work") + + def test_outlet(self): + self.assertEqual(self.cfg.outlet, 'post', "test 47: default error outlet = %s" % self.cfg.outlet) + + def test_outlet_json(self): + opt1 = "outlet json" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.outlet, 'json', "test 48: option outlet value json did not work") + + def test_outlet_url(self): + opt1 = "outlet url" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.outlet, 'url', "test 49: option outlet value url did not work") + + def test_outlet_port(self): + opt1 = "outlet post" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.outlet, 'post', "test 50: option outlet value post did not work") + + @unittest.skip("bad option setting skipped... its output confuses conformity... " + "complains about an error... and it is ok to complain.") + def test_outlet_post(self): + opt1 = "outlet toto" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.outlet, 'post', "test 51: option outlet with bad value did not work") + + def test_retry_mode(self): + self.assertTrue(self.cfg.retry_mode, "test 52: retry_mode should be the default") + + def test_retry_mode_false(self): + opt1 = "retry_mode false" + self.cfg.option(opt1.split()) + self.assertFalse(self.cfg.retry_mode, "test 53: retry_mode should be false") + + def test_retry_ttl_to_mins(self): + opt1 = "retry_ttl 1D" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.retry_ttl, 86400, + "test 54: option retry_ttl or module duration_from_str did not work") + + def test_exchange_suffix(self): + opt1 = "exchange_suffix suffix1" + self.cfg.option(opt1.split()) + opt1 = "post_exchange_suffix suffix2" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.exchange_suffix, 'suffix1', + "test 55a: option exchange_suffix or post_exchange_suffix did not work") + self.assertEqual(self.cfg.post_exchange_suffix, 'suffix2', + "test 55b: option exchange_suffix or post_exchange_suffix did not work") + + def test_broker(self): + opt1 = "post_base_dir /${broker.hostname}/${broker.username}" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.post_base_dir, '/testbroker.toto/michel', + "test 56: replacing internal ${broker.hostname} ${broker.username} did not work") + + def test_post_base_dir_replace(self): + opt1 = "post_base_dir /${toto[1]}/${broker.username}/aaa" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.post_base_dir, '/tutu2/michel/aaa', + "test 57: replacing internal ${toto[1]} did not work") + + def test_post_base_dir_replace_first(self): + opt1 = "post_base_dir /${broker.hostname}/${broker.username}" + self.cfg.option(opt1.split()) + opt1 = "post_base_dir /${toto}/${broker.username}/aaa" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.post_base_dir, '/tutu1/michel/aaa', + "test 58: replacing internal ${toto} did not work") + + # TODO add those tests + # more config test to perform for full coverage... # options 'accept','get','reject'] # option 'accept_unmatched' - # def isMatchingPattern(self, str, accept_unmatch = False): - #def sundew_getDestInfos(self, filename): - #def validate_urlstr(self,urlstr): - - # example of output for next test : new_dir = /20180404140433/SACN04CWAO140251RRA - - dDir = '/${RYYYY}${RMM}${RDD}${RHH}${RMM}${RSS}/${T1}${T2}${A1}${A2}${ii}${CCCC}${YY}${GG}${Gg}${BBB}/' - new_dir = cfg.sundew_dirPattern(urlstr='',basename='SACN04_CWAO_140251_RRA',destDir=dDir,destName='aaa') - if not new_dir.endswith('/SACN04CWAO140251RRA') : - print("test 59: sundew_dirPattern new_dir %s should end with /SACN04CWAO140251RRA" % new_dir) - failed = True - - # retry_ttl in mins - opt1 = "sanity_log_dead 1D" - cfg.option(opt1.split()) - if cfg.sanity_log_dead != 86400 : - print("test 60: option sanity_log_dead or module duration_from_str did not work") - failed = True - - # retry_ttl in mins - opt1 = "heartbeat ${sanity_log_dead}" - cfg.option(opt1.split()) - if cfg.heartbeat != 86400 : - print("test 61: option evaluated self.${} did not work") - failed = True - - # retry_ttl in mins - opt1 = "subtopic aaa.vv\ ww.hh##bb.aaa.#" - w = opt1.split() - w = cfg.backslash_space(w) - cfg.option(w) - if cfg.heartbeat != 86400 : - print("test 61: option evaluated self.${} did not work") - failed = True - - - if not failed : - print("sr_config.py TEST PASSED") - else : - print("sr_config.py TEST FAILED") - sys.exit(1) - - -# =================================== -# MAIN -# =================================== - -def main(): - try: - self_test() - except: - print("sr_config.py TEST FAILED") - raise - -# ========================================= -# direct invocation : self testing -# ========================================= - -if __name__=="__main__": - main() - + # def isMatchingPattern(self, str, accept_unmatch = False): + # def sundew_getDestInfos(self, filename): + # def validate_urlstr(self,urlstr): + + def test_sundew_dirPattern(self): + # example of output for next test : new_dir = /20180404140433/SACN04CWAO140251RRA + d_dir = '/${RYYYY}${RMM}${RDD}${RHH}${RMM}${RSS}/${T1}${T2}${A1}${A2}${ii}${CCCC}${YY}${GG}${Gg}${BBB}/' + new_dir = self.cfg.sundew_dirPattern(urlstr='', basename='SACN04_CWAO_140251_RRA', destDir=d_dir, + destName='aaa') + self.assertTrue(new_dir.endswith('/SACN04CWAO140251RRA'), + "test 59: sundew_dirPattern new_dir %s should end with /SACN04CWAO140251RRA" % new_dir) + + def test_sanity_log_dead(self): + opt1 = "sanity_log_dead 1D" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.sanity_log_dead, 86400, + "test 60: option sanity_log_dead or module duration_from_str did not work") + + def test_heartbeat(self): + opt1 = "sanity_log_dead 1D" + self.cfg.option(opt1.split()) + opt1 = "heartbeat ${sanity_log_dead}" + self.cfg.option(opt1.split()) + self.assertEqual(self.cfg.heartbeat, 86400, "test 61: option heartbeat did not work") + + def test_subtopic(self): + opt1 = 'subtopic aaa.vv\ ww.hh##bb.aaa.#' + w = opt1.split() + w = self.cfg.backslash_space(w) + self.cfg.option(w) + self.assertEqual(self.cfg.heartbeat, 86400, "test 62: option subtopic did not work") + + +class SrConfigStdFilesRedirection(unittest.TestCase): + """ Base class for stream redirection test cases + + These test stands for both out/err redirection in a single write (_io.TextIOWrapper) stream + """ + def setUp(self) -> None: + """ setup fake std file streams and logger to use through each test """ + self.stdoutpath = 'sys.stdout' + self.fake_stdout = open(self.stdoutpath, 'w') + self.logpath = 'stdfileredirection.log' + logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) + self.logger = logging.getLogger() + self.logger.handlers[0].close() + self.logger.removeHandler(self.logger.handlers[0]) + self.handler = handlers.TimedRotatingFileHandler(self.logpath, when='s', interval=1, backupCount=5) + self.handler.setLevel(logging.INFO) + self.logger.addHandler(self.handler) + + def tearDown(self) -> None: + """ clear all trace from each std files redirection test """ + self.fake_stdout.close() + for path in glob.glob("{}*".format(self.logpath)): + os.remove(path) + os.remove(self.stdoutpath) + + +class SrConfigStdFileStreams(SrConfigStdFilesRedirection): + def test_fake_stdout(self): + """ test that the fake stdout is from the same type as python sys.stdout (in cmdline context) """ + self.assertEqual(type(self.fake_stdout), _io.TextIOWrapper) + + def test_opened_new_stream(self): + """ test that the handler stream is open to standard files after we redirected stout/stderr to write to it """ + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + self.assertFalse(fake_stdout.closed) + + def test_closed_orig_stream(self): + """ test that the original stream get closed after redirection """ + StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + self.assertTrue(self.fake_stdout) + + def test_handler_orig_stream(self): + """ test that the original stream stays the same when creating the wrapper """ + stream_orig = self.handler.stream + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + self.assertEqual(stream_orig, fake_stdout.handler.stream) + + def test_handler_rotated_stream(self): + """ test that the stream changes when the log rotates """ + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + time.sleep(1) + self.logger.info('test_handler_rotated_stream') + self.assertNotEqual(fake_stdout.stream, fake_stdout.handler.stream) + + def test_handler_rotated_stream_written(self): + """ test that the wrapper stream get updated after a rotation and a first write """ + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + time.sleep(1) + self.logger.info('test_handler_rotated_stream_written') + print('test_handler_rotated_stream_written', file=fake_stdout) + self.assertEqual(fake_stdout.stream, fake_stdout.handler.stream) + + +class SrConfigStdFilesFileDescriptors(SrConfigStdFilesRedirection): + """ Test cases over file descriptors consistency """ + def test_fds_before(self): + """ test that file descriptor is different before redirection """ + self.assertNotEqual(self.fake_stdout.fileno(), self.handler.stream.fileno()) + + def test_stdfd_preserved(self): + """ test that file descriptor is preserved after redirection """ + fake_stdout_fd = self.fake_stdout.fileno() + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + self.assertEqual(fake_stdout_fd, fake_stdout.fileno()) + + def test_handlerfd_preserved(self): + """ test that the file descriptor is preserved after redirection """ + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + self.assertEqual(self.handler.stream.fileno(), fake_stdout.handler.stream.fileno()) + + +class SrConfigStdFilesOutput(SrConfigStdFilesRedirection): + """ Test cases that validate that the output is printed where it should be before and after redirection """ + def test_logging(self): + """ test that log file still receive log after redirection """ + StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + self.logger.info('test_logging') + with open(self.logpath) as f: + lines = f.readlines() + self.assertEqual(lines[0], 'test_logging\n') + + @unittest.skip("this test fails sometime unexpectedly") + def test_subprocess(self): + """ test that subprocess stdout output is redirected to log """ + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + subprocess.Popen(['echo', 'test_subprocess'], stdout=fake_stdout) + with open(self.logpath) as f: + lines = f.readlines() + self.assertEqual(lines[0], 'test_subprocess\n') + + @unittest.skip("this test fails most of the time unexpectedly") + def test_stdout(self): + """ test that stdout output is redirected to log """ + fake_stdout = StdFileLogWrapper(self.handler, self.fake_stdout.fileno()) + print('test_stdout', file=fake_stdout) + with open(self.logpath) as f: + lines = f.readlines() + self.assertEqual(lines[0], 'test_stdout\n') + + +def suite(): + """ Create the test suite that include all sr_config test cases + + :return: sr_config test suite + """ + sr_config_suite = unittest.TestSuite() + sr_config_suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SrConfigRandomizeTestCase)) + sr_config_suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SrConfigPluginScriptTestCase)) + sr_config_suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SrConfigChecksumTestCase)) + sr_config_suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SrConfigGeneralTestCase)) + sr_config_suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SrConfigStdFileStreams)) + sr_config_suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SrConfigStdFilesFileDescriptors)) + sr_config_suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SrConfigStdFilesOutput)) + return sr_config_suite + + +if __name__ == 'main': + runner = unittest.TextTestRunner() + runner.run(suite())