Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue240 2 #256

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sarra/examples/subscribe/hpfx_amis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ broker amqps://anonymous@hpfx.collab.science.gc.ca/
exchange xpublic

# instances: number of downloading processes to run at once. defaults to 1. Not enough for this case
#instances 5
instances 5

# expire, in operational use, should be longer than longest expected interruption
expire 10m

on_message msg_stdfiles

subtopic *.WXO-DD.bulletins.alphanumeric.#
mirror true
directory /local/ben/amis/
Expand Down
30 changes: 30 additions & 0 deletions sarra/plugins/msg_stdfiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/python3

"""
a on_msg callback to print the content of the AMQP message.
one can see clearly the difference between v02 and v03 messages.

"""

class Test_StdFiles(object):

def __init__(self,parent):
parent.logger.debug("msg_rawlog initialized")

def on_message(self,parent):
import sys
import subprocess

msg = parent.msg
parent.logger.info("stdfiles closed? stdin={}, stdout={}, stderr={}".format(sys.stdin.closed, sys.stdout.closed, sys.stderr.closed))
parent.logger.info("stdfiles fds? stdin={}, stdout={}, stderr={}".format(sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()))
parent.logger.info("this is logging")
print("this is stdout")
print("this is stderr", file=sys.stderr)
subprocess.Popen(['/bin/echo', 'this is subprocess stdout'])
return True

test_stdfiles = Test_StdFiles(self)

self.on_message = test_stdfiles.on_message

2 changes: 1 addition & 1 deletion sarra/sr_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def amqp_close(self):
self.hc = None

def amqp_connect(self):
self.hc = HostConnect(logger = self.logger)
self.hc = HostConnect(logger=self.logger)
self.hc.choose_amqp_alternative(self.use_amqplib, self.use_pika)
self.hc.loop = False
self.hc.set_url(self.admin)
Expand Down
229 changes: 125 additions & 104 deletions sarra/sr_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import urllib, urllib.parse, urllib.request, urllib.error
import shutil
import sarra
import io

from appdirs import *
from logging import handlers
Expand Down Expand Up @@ -65,31 +66,67 @@
pika_available = False
# ==========================================

if sys.hexversion > 0x03030000 :
if sys.hexversion > 0x03030000:
from shutil import get_terminal_size
py2old=False
else:
py2old=True

class StreamToLogger(object):
"""
Fake file-like stream object that redirects writes to a logger instance.

class StdFileLogWrapper(io.TextIOWrapper):
""" Wrapper that delegate stream write operations to an underlying logging handler stream

It borrows the handler buffer at construction time and will update it as the logging rolls
"""
def __init__(self, logger, log_level=logging.INFO):
self.logger = logger
self.log_level = log_level
self.linebuf = ''
def __init__(self, handler, stdno):
super(StdFileLogWrapper, self).__init__(handler.stream.buffer)
self.stream = handler.stream
self.handler = handler
self.stdno = stdno
self.dup2()

def write(self, buf):
for line in buf.rstrip().splitlines():
self.logger.log(self.log_level, line.rstrip())
if self.stream != self.handler.stream:
# Support log rotation so need to check if stream changed
# don't trust self.stream for write operation only there to check the change
self.stream = self.handler.stream
self.dup2()
self.handler.stream.write(buf)

def fileno(self):
""" Fake the file descriptor for the underlying standard fd ( 1 or 2 )

:return: the fileno from the faked std file
"""
return self.stdno

def dup2(self):
""" Avoiding dup2 for win32 since 3.6 with the introduction of _WindowsConsoleIO

introduced since issue 240: https://github.com/MetPX/sarracenia/issues/240
:return: None
"""
if sys.platform != 'win32':
os.dup2(self.handler.stream.fileno(), self.fileno())

def flush(self):
"""
when stdout/stderr are assigned to a stream, builtin routine call flush.
if the logger doesn't have flush method, things bomb.
"""
pass
self.buffer.flush()

def close(self):
pass

@property
def closed(self):
""" Override the closed property in parent (io.TextIOWrapper)

This is useful to really redirect the close info from the underlying stream
"""
return self.buffer.closed

@property
def buffer(self):
return self.handler.stream.buffer


class sr_config:

Expand Down Expand Up @@ -190,15 +227,12 @@ def __init__(self,config=None,args=None,action=None):

# logging is interactive at start

self.debug = False
self.statehost = False
self.hostform = 'short'
self.loglevel = logging.INFO

self.LOG_FORMAT= '%(asctime)s [%(levelname)s] %(message)s'
# self.LOG_FORMAT= '%(asctime)s [%(levelname)s] %(message)s %(module)s/%(funcName)s #%(lineno)d'
logging.basicConfig(level=self.loglevel, format = self.LOG_FORMAT )
self.logger = logging.getLogger()
self.logger = None
self.handler = None
self.setlog(interactive=True)
self.logger.debug("sr_config __init__")

# program_name
Expand Down Expand Up @@ -614,10 +648,6 @@ def declare_option(self,option):
def defaults(self):
self.logger.debug("sr_config defaults")

# IN BIG DEBUG
#self.debug = True
self.debug = False

self.retry_mode = True
self.retry_ttl = None

Expand Down Expand Up @@ -921,9 +951,10 @@ def execfile(self, opname, path):
self.logger.error("installing %s plugin %s failed: not found " % (opname, path) )
return False

try :
exec(compile(open(script).read(), script, 'exec'))
except :
try:
with open(script) as f:
exec(compile(f.read(), script, 'exec'))
except:
self.logger.error("sr_config/execfile 2 failed for option '%s' and plugin '%s'" % (opname, path))
self.logger.debug('Exception details: ', exc_info=True)
return False
Expand Down Expand Up @@ -1183,23 +1214,26 @@ def list_file(self,path):

self.run_command([ cmd, path ] )

def run_command(self,cmd_list):
def run_command(self, cmd_list):
sr_path = os.environ.get('SARRA_LIB')
sc_path = os.environ.get('SARRAC_LIB')
import sys,subprocess
import sys
import subprocess

try:
if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5) :
subprocess.check_call(cmd_list, close_fds=False )
else :
self.logger.debug("using subprocess.run")
if sc_path and cmd_list[0].startswith("sr_cp"):
subprocess.run([sc_path+'/'+cmd_list[0]]+cmd_list[1:],check=True)
elif sr_path and cmd_list[0].startswith("sr"):
subprocess.run([sr_path+'/'+cmd_list[0]+'.py']+cmd_list[1:],check=True)
else:
subprocess.run(cmd_list,check=True)
except: self.logger.error("trying run command %s " % ' '.join(cmd_list) )
if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5):
subprocess.check_call(cmd_list, close_fds=False)
else:
self.logger.debug("using subprocess.run")
if sc_path and cmd_list[0].startswith("sr_cp"):
subprocess.run([sc_path+'/'+cmd_list[0]]+cmd_list[1:], check=True)
elif sr_path and cmd_list[0].startswith("sr"):
subprocess.run([sr_path+'/'+cmd_list[0]+'.py']+cmd_list[1:], check=True)
else:
subprocess.run(cmd_list, check=True)
except Exception as e:
self.logger.error("trying run command {} with {}".format(' '.join(cmd_list), e))
self.logger.debug("Exception details:", exc_info=True)

def register_plugins(self):
self.logger.debug("register_plugins")
Expand Down Expand Up @@ -1685,19 +1719,13 @@ def option(self,words):
n = 2

elif words0 == 'debug': # See: sr_config.7
debug = self.debug
if (words1 is None) or words[0][0:1] == '-' :
self.debug = True
if (words1 is None) or words[0][0:1] == '-':
self.loglevel = logging.DEBUG
n = 1
else :
self.debug = self.isTrue(words[1])
elif self.isTrue(words[1]):
self.loglevel = logging.DEBUG
n = 2

if self.debug : self.loglevel = logging.DEBUG
else: self.loglevel = logging.INFO

if debug != self.debug : self.set_loglevel()

elif words0 == 'delete': # See: sr_sarra.8
if (words1 is None) or words[0][0:1] == '-' :
self.delete = True
Expand Down Expand Up @@ -2040,15 +2068,20 @@ def option(self,words):
self.lr_interval = int(float(words1))
n = 2

elif words0 in ['loglevel','ll']: # See: sr_config.7
elif words0 in ['loglevel', 'll']: # See: sr_config.7
level = words1.lower()
if level in 'critical' : self.loglevel = logging.CRITICAL
elif level in 'error' : self.loglevel = logging.ERROR
elif level in 'info' : self.loglevel = logging.INFO
elif level in 'warning' : self.loglevel = logging.WARNING
elif level in 'debug' : self.loglevel = logging.DEBUG
elif level in 'none' : self.loglevel = None
self.set_loglevel()
if level in 'critical':
self.loglevel = logging.CRITICAL
elif level in 'error':
self.loglevel = logging.ERROR
elif level in 'info':
self.loglevel = logging.INFO
elif level in 'warning':
self.loglevel = logging.WARNING
elif level in 'debug':
self.loglevel = logging.DEBUG
elif level in 'none':
self.loglevel = logging.NOTSET
n = 2

elif words0 in ['manager','feeder'] : # See: sr_config.7, sr_sarra.8
Expand Down Expand Up @@ -2466,16 +2499,17 @@ def option(self,words):
self.source_from_exchange = self.isTrue(words[1])
n = 2

elif words0 == 'statehost': # MG FIXME to be documented somewhere ???
elif words0 == 'statehost': # MG FIXME to be documented somewhere ???
self.statehost = True
self.hostform = 'short'
if (words1 is None) or words[0][0:1] == '-' :
self.hostform = 'short'
if words1 is None or words[0][0:1] == '-':
n = 1
elif words1.lower() in ['short','fqdn']:
self.hostform = words1.lower()
elif words1.lower() in ['short', 'fqdn']:
self.hostform = words1.lower()
n = 2
else:
if not self.isTrue(words[1]): self.statehost = False
if not self.isTrue(words[1]):
self.statehost = False
n = 2

elif words0 == 'strip': # See: sr_config.7
Expand Down Expand Up @@ -2673,58 +2707,45 @@ def set_sumalgo(self,sumflg):
self.lastflg = 'd'
self.sumalgo = self.sumalgos['d']

def setlog(self, interactive=False):
base_log_format = '%(asctime)s [%(levelname)s] %(message)s'
if logging.getLogger().hasHandlers():
for h in logging.getLogger().handlers:
h.close()
logging.getLogger().removeHandler(h)
self.logger = logging.getLogger()
self.logger.setLevel(self.loglevel)

def set_loglevel(self):
if not self.loglevel:
if hasattr(self, 'logger'):
del self.logger
self.logpath = None
self.logger = logging.RootLogger(logging.CRITICAL)
self.logger.addHandler(logging.NullHandler())
if interactive or not self.logpath:
logging.basicConfig(format=base_log_format, level=self.loglevel)
self.logger.debug("logging to the console with {}".format(self.logger))
else:
self.logger.setLevel(self.loglevel)

def setlog(self):
if self.loglevel and self.logpath and self.lr_interval > 0 and self.lr_backupCount > 0:
self.logger.debug("Switching to rotating log file: %s" % self.logpath)
handler = self.create_handler(base_log_format, logging.DEBUG)
self.logger.addHandler(handler)
sys.stdout = StdFileLogWrapper(handler, 1)
sys.stderr = StdFileLogWrapper(handler, 2)
self.logger.debug("logging to file ({}) with {}".format(self.logpath, self.logger))

def create_handler(self, log_format, level):
if self.lr_interval > 0 and self.lr_backupCount > 0:
handler = handlers.TimedRotatingFileHandler(self.logpath, when=self.lr_when, interval=self.lr_interval,
backupCount=self.lr_backupCount)
self.create_new_logger(self.LOG_FORMAT, handler)
if self.chmod_log:
os.chmod(self.logpath, self.chmod_log)
sys.stdout = StreamToLogger(self.logger, logging.INFO)
sys.stderr = StreamToLogger(self.logger, logging.ERROR)
elif self.loglevel and self.logpath:
self.logger.debug("Switching to log file: %s" % self.logpath)
handler = logging.FileHandler(self.logpath)
self.create_new_logger(self.LOG_FORMAT, handler)
if self.chmod_log:
os.chmod(self.logpath, self.chmod_log)
elif self.loglevel:
self.logger.debug('Keeping on screen logging')
handler = logging.StreamHandler()
self.create_new_logger(self.LOG_FORMAT, handler)
else:
self.set_loglevel()

def create_new_logger(self, log_format, handler):
self.logger = logging.RootLogger(self.loglevel)
fmt = logging.Formatter(log_format)
handler.setFormatter(fmt)
self.logger.addHandler(handler)

# check url and add credentials if needed from credential file
handler = logging.FileHandler(self.logpath)
handler.setFormatter(logging.Formatter(log_format))
handler.setLevel(level)
if self.chmod_log:
os.chmod(self.logpath, self.chmod_log)
return handler

def validate_urlstr(self,urlstr):

# check url and add credentials if needed from credential file
ok, details = self.credentials.get(urlstr)
if details == None :
self.logger.error("bad credential %s" % urlstr)
return False, urllib.parse.urlparse(urlstr)

return True, details.url


def validate_parts(self):
self.logger.debug("sr_config validate_parts %s" % self.parts)
if not self.parts[0] in ['0','1','p','i']:
Expand Down
Loading