Skip to content

Commit

Permalink
Test: CTS: make logging.py compatible with Python 2.6+ and 3.2+
Browse files Browse the repository at this point in the history
also reasonably happy with pylint --max-line-length=120
  • Loading branch information
kgaillot committed Sep 1, 2016
1 parent dd169b2 commit ef0efee
Showing 1 changed file with 114 additions and 67 deletions.
181 changes: 114 additions & 67 deletions cts/logging.py
@@ -1,112 +1,159 @@
'''
Classes related to producing logs
'''

__copyright__='''
Copyright (C) 2014 Andrew Beekhof <andrew@beekhof.net>
Licensed under the GNU GPL.
'''

#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.

import string, sys, time, os

class Logger:
""" Logging classes for Pacemaker's Cluster Test Suite (CTS)
"""

# Pacemaker targets compatibility with Python 2.6+ and 3.2+
from __future__ import print_function, unicode_literals, absolute_import, division

__copyright__ = "Copyright (C) 2014-2016 Andrew Beekhof <andrew@beekhof.net>"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"

import io
import os
import sys
import time


# Wrapper to detect a string under Python 2 or 3
try:
_StringType = basestring
except NameError:
_StringType = str

def _is_string(obj):
""" Return True if obj is a simple string. """

return isinstance(obj, _StringType)


def _strip(line):
""" Wrapper for strip() that works regardless of Python version """

if sys.version_info < (3,):
return line.decode('utf-8').strip()
else:
return line.strip()


def _rstrip(line):
""" Wrapper for rstrip() that works regardless of Python version """

if sys.version_info < (3,):
return line.decode('utf-8').rstrip()
else:
return line.rstrip()


class Logger(object):
""" Abstract class to use as parent for CTS logging classes """

TimeFormat = "%b %d %H:%M:%S\t"

def __init__(self):
# Whether this logger should print debug messages
self.debug_target = True

def __call__(self, lines):
""" Log specified messages """

raise ValueError("Abstract class member (__call__)")

def write(self, line):
return self(line.rstrip())
""" Log a single line excluding trailing whitespace """

return self(_rstrip(line))

def writelines(self, lines):
for s in lines:
self.write(s)
return 1
def flush(self):
""" Log a series of lines excluding trailing whitespace """

for line in lines:
self.write(line)
return 1
def isatty(self):
return None

def is_debug_target(self):
""" Return True if this logger should receive debug messages """

return self.debug_target


class StdErrLog(Logger):
""" Class to log to standard error """

def __init__(self, filename, tag):
pass
Logger.__init__(self)
self.debug_target = False

def __call__(self, lines):
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))
if isinstance(lines, basestring):
sys.__stderr__.writelines([t, lines, "\n"])
else:
for line in lines:
sys.__stderr__.writelines([t, line, "\n"])
""" Log specified lines to stderr """

timestamp = time.strftime(Logger.TimeFormat,
time.localtime(time.time()))
if _is_string(lines):
lines = [lines]
for line in lines:
print("%s%s" % (timestamp, line), file=sys.__stderr__)
sys.__stderr__.flush()

def name(self):
return "StdErrLog"

class FileLog(Logger):
def __init__(self, filename, tag):
self.logfile=filename
self.hostname = os.uname()[1]+" "
""" Class to log to a file """

self.source = ""
def __init__(self, filename, tag):
Logger.__init__(self)
self.logfile = filename
self.hostname = os.uname()[1]
if tag:
self.source = tag+": "
self.source = tag + ": "
else:
self.source = ""

def __call__(self, lines):
""" Log specified lines to the file """

fd = open(self.logfile, "a")
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))

if isinstance(lines, basestring):
fd.writelines([t, self.hostname, self.source, lines, "\n"])
else:
for line in lines:
fd.writelines([t, self.hostname, self.source, line, "\n"])
fd.close()
logf = io.open(self.logfile, "at")
timestamp = time.strftime(Logger.TimeFormat,
time.localtime(time.time()))
if _is_string(lines):
lines = [lines]
for line in lines:
print("%s%s %s%s" % (timestamp, self.hostname, self.source, line),
file=logf)
logf.close()

def name(self):
return "FileLog"

class LogFactory:
class LogFactory(object):
""" Singleton to log messages to various destinations """

log_methods=[]
log_methods = []
have_stderr = False

def __init__(self):
pass

def add_file(self, filename, tag=None):
""" When logging messages, log them to specified file """

if filename:
LogFactory.log_methods.append(FileLog(filename, tag))

def add_stderr(self):
""" When logging messages, log them to standard error """

if not LogFactory.have_stderr:
LogFactory.have_stderr = True
LogFactory.log_methods.append(StdErrLog(None, None))

def log(self, args):
""" Log a message (to all configured log destinations) """

for logfn in LogFactory.log_methods:
logfn(string.strip(args))
logfn(_strip(args))

def debug(self, args):
""" Log a debug message (to all configured log destinations) """

for logfn in LogFactory.log_methods:
if logfn.name() != "StdErrLog":
logfn("debug: %s" % string.strip(args))
if logfn.is_debug_target():
logfn("debug: %s" % _strip(args))

def traceback(self, traceback):
""" Log a stack trace (to all configured log destinations) """

for logfn in LogFactory.log_methods:
traceback.print_exc(50, logfn)

0 comments on commit ef0efee

Please sign in to comment.