Permalink
Browse files

overhauled logging

  • Loading branch information...
1 parent 6eb6148 commit 6ce068c367fd3919a294a9fbee3bc497cd8b5940 @trey0 trey0 committed Oct 3, 2012
View
106 geocamPycroraptor2/bin/pyraptord.py
@@ -6,88 +6,30 @@
# __END_LICENSE__
import os
-import logging
-import errno
-import signal
-import time
import zerorpc
from geocamPycroraptor2.manager import Manager
-from geocamPycroraptor2.util import getPid, waitUntilDead
+from geocamPycroraptor2.daemonize import Daemon
-def getPidForManager(m):
- return getPid(os.path.join(m._logDir, m._pidFile))
-
-
-def start(m):
- pid = getPidForManager(m)
- if pid is None:
- startInternal(m)
- else:
- print 'pyraptord is already running, pid %s' % pid
-
-
-def startInternal(m):
- print 'starting pyraptord...'
- m._start()
- s = zerorpc.Server(m)
- s.bind(m._port)
- logging.info('pyraptord: listening on %s', m._port)
- print 'started'
- s.run()
-
-
-def stop(m):
- pid = getPidForManager(m)
- if pid:
- print 'stopping pyraptord (first attempt, SIGTERM), pid %s...' % pid
- os.kill(pid, signal.SIGTERM)
- isDead = waitUntilDead(pid, timeout=5)
- if isDead:
- print 'stopped'
- return True
- print 'stopping pyraptord (second attempt, SIGKILL), pid %s...' % pid
- os.kill(pid, signal.SIGKILL)
- isDead = waitUntilDead(pid, timeout=5)
- if isDead:
- logging.info('stopped')
- return True
- print "can't kill running pyraptord, pid %s", pid
- return False
- else:
- print 'pyraptord does not appear to be running'
- return True
-
-
-def restart(m):
- print 'restarting pyraptord'
- isStopped = stop(m)
- if isStopped:
- startInternal(m)
-
-
-def status(m):
- pid = getPidForManager(m)
- if pid is None:
- print 'pyraptord is stopped'
- else:
- print 'pyraptord is running, pid %s' % pid
-
-
-COMMAND_REGISTRY = {
- 'start': start,
- 'stop': stop,
- 'restart': restart,
- 'status': status
-}
-
-
-def pyraptord(handler, opts):
+def pyraptord(cmd, opts):
m = Manager(opts)
- logging.basicConfig(level=logging.DEBUG)
- handler(m)
+ d = Daemon(opts.name,
+ os.path.join(m._logDir, m._pidFile))
+
+ if cmd in ('start', 'restart'):
+ doStart = d.execute(cmd)
+ if doStart:
+ m._start()
+ s = zerorpc.Server(m)
+ s.bind(m._port)
+ m._logger.info('pyraptord: listening on %s', m._port)
+ m._logger.info('started')
+ d.writePid()
+ s.run()
+ else:
+ d.execute(cmd)
def main():
@@ -103,15 +45,17 @@ def main():
help='Name of pyraptord zerorpc service [%default]',
default='pyraptord')
opts, args = parser.parse_args()
- if len(args) != 1:
- parser.error('expected exactly 1 arg')
+ if len(args) == 0:
+ cmd = 'start'
+ elif len(args) == 1:
+ cmd = args[0]
+ else:
+ parser.error('expected at most 1 command')
- cmd = args[0]
- handler = COMMAND_REGISTRY.get(cmd)
- if handler is None:
+ if cmd not in Daemon.COMMANDS:
parser.error('unknown command "%s"' % cmd)
- pyraptord(handler, opts)
+ pyraptord(cmd, opts)
if __name__ == '__main__':
View
95 geocamPycroraptor2/daemonize.py
@@ -6,31 +6,35 @@
import os
import sys
+import signal
from geocamPycroraptor2 import log
+from geocamPycroraptor2.util import getPid, waitUntilDead
-def daemonize(logPathTemplate, logPathContext, pidPath):
+def cleanIfExists(path):
+ if os.path.exists(path):
+ try:
+ os.unlink(path)
+ except OSError:
+ print 'could not delete file %s', path
+
+
+def daemonize(name, logFile):
os.chdir('/')
os.umask(0)
# close stdin
devNull = file('/dev/null', 'rw')
os.dup2(devNull.fileno(), 0)
- # redirect stdout and stderr to log file
- if logPathTemplate == None:
+ print 'log at %s' % logFile.name
+ if logFile is None:
logFile = devNull
- else:
- logName, logFile = (log.openLogFromTemplate
- ('pyraptord',
- logPathTemplate,
- logPathContext))
- print 'daemonizing -- log file %s' % logName
os.dup2(logFile.fileno(), 1)
os.dup2(logFile.fileno(), 2)
- sys.stdout = log.StreamLogger('out', sys.stdout)
- sys.stderr = log.StreamLogger('err', sys.stderr)
+ sys.stdout = log.TimestampingStream('%s.out' % name, sys.stdout)
+ sys.stderr = log.TimestampingStream('%s.err' % name, sys.stderr)
# detach from tty
pid = os.fork()
@@ -41,11 +45,64 @@ def daemonize(logPathTemplate, logPathContext, pidPath):
if pid:
os._exit(0)
- # write pid file
- if pidPath != None:
- pidDir = os.path.dirname(pidPath)
- if not os.path.isdir(pidDir):
- os.makedirs(pidDir)
- pidFile = file(pidPath, 'w')
- pidFile.write('%d\n' % os.getpid())
- pidFile.close()
+
+class Daemon(object):
+ COMMANDS = ('start', 'stop', 'restart', 'status')
+
+ def __init__(self, name, pidPath):
+ self._name = name
+ self._pidPath = pidPath
+
+ def execute(self, cmd):
+ if cmd in self.COMMANDS:
+ return getattr(self, cmd)()
+
+ def start(self):
+ pid = getPid(self._pidPath)
+ if pid is None:
+ print 'starting %s' % self._name
+ return True
+ else:
+ print '%s is already running, pid %s' % (self._name, pid)
+ return False
+
+ def writePid(self):
+ f = open(self._pidPath, 'w')
+ f.write('%d\n' % os.getpid())
+ f.close()
+
+ def stop(self):
+ pid = getPid(self._pidPath)
+ if pid:
+ print ('stopping %s (first attempt, SIGTERM), pid %s...'
+ % (self._name, pid))
+ os.kill(pid, signal.SIGTERM)
+ isDead = waitUntilDead(pid, timeout=5)
+ if isDead:
+ print 'stopped'
+ cleanIfExists(self._pidPath)
+ return
+ print ('stopping %s (second attempt, SIGKILL), pid %s...'
+ % (self._name, pid))
+ os.kill(pid, signal.SIGKILL)
+ isDead = waitUntilDead(pid, timeout=5)
+ if isDead:
+ print 'stopped'
+ cleanIfExists(self._pidPath)
+ return
+ print ("can't kill running %s, pid %s"
+ % (self._name, pid))
+ else:
+ print '%s does not appear to be running' % self._name
+
+ def restart(self):
+ print 'restarting %s' % self._name
+ self.stop()
+ return self.start()
+
+ def status(self):
+ pid = getPid(self._pidPath)
+ if pid is None:
+ print '%s is stopped' % self._name
+ else:
+ print '%s is running, pid %s' % (self._name, pid)
View
193 geocamPycroraptor2/log.py
@@ -8,6 +8,7 @@
import re
import datetime
from string import Template
+import logging
import gevent
@@ -19,6 +20,15 @@
UNIQUE_REGEX = r'\$\{unique\}|\$unique\b'
+class UtcFormatter(logging.Formatter):
+ def formatTime(self, record, datefmt=None):
+ dt = datetime.datetime.utcfromtimestamp(record.created)
+ if datefmt:
+ return dt.strftime(datefmt)
+ else:
+ return dt.isoformat() + 'Z'
+
+
def getFileNameTimeString(timestamp=None):
if timestamp is None:
timestamp = datetime.datetime.utcnow()
@@ -27,12 +37,6 @@ def getFileNameTimeString(timestamp=None):
return '%s-%06d-UTC' % (seconds, us)
-def getTimeString(timestamp=None):
- if timestamp is None:
- timestamp = datetime.datetime.utcnow()
- return timestamp.isoformat() + 'Z'
-
-
def openLogFromPath(owner, path, mode='a+'):
logDir = os.path.dirname(path)
if not os.path.exists(logDir):
@@ -86,120 +90,19 @@ def openLogFromTemplate(owner, fnameTemplate, env):
return (fname, openLogFromPath(owner, fname))
-class TimestampLine:
- def __init__(self, streamName, lineType, text, timestamp=None):
- if timestamp == None:
- timestamp = datetime.datetime.now()
- self.streamName = streamName
- self.lineType = lineType
- self.text = text
- self.timestamp = timestamp
-
-
-class LineSource(object):
- def __init__(self, lineHandler=None):
- self._lineHandlers = {}
- self._lineHandlerCount = 0
- if lineHandler:
- self.addLineHandler(lineHandler)
-
- def addLineHandler(self, handler):
- handlerRef = self._lineHandlerCount
- self._lineHandlerCount += 1
- self._lineHandlers[handlerRef] = handler
- return handlerRef
-
- def delLineHandler(self, handlerRef):
- del self._lineHandlers[handlerRef]
-
- def handleLine(self, tsline):
- for hnd in self._lineHandlers.itervalues():
- hnd(tsline)
-
-
-class TimestampLineParser(LineParser, LineSource):
- def __init__(self, streamName,
- lineHandler=None,
- maxLineLength=160):
- LineParser.__init__(self, self.handleRawLine)
- LineSource.__init__(self, lineHandler)
- self._streamName = streamName
-
- def timestamp(self, line):
- if line.endswith('\r\n'):
- text = line[:-2]
- lineType = 'n'
- elif line.endswith('\n'):
- text = line[:-1]
- lineType = 'n'
- else:
- text = line
- lineType = 'c'
- return TimestampLine(self._streamName,
- lineType,
- text,
- datetime.datetime.utcnow())
-
- def handleRawLine(self, text):
- self.handleLine(self.timestamp(text))
-
-
-class TimestampLineSource(TimestampLineParser):
- def __init__(self, streamName, fd,
- lineHandler=None,
- maxLineLength=160):
- (super(TimestampLineSource, self).__init__
- (streamName, lineHandler, maxLineLength))
- self._fd = fd
- self._q = queueFromFile(fd, maxLineLength)
- self._job = gevent.spawn(self._handleQueue)
-
- def stop(self):
- trackerG.close(self._fd)
- self._q.put(StopIteration)
- if self._job is not None:
- self._job.kill()
- self._job = None
-
- def _handleQueue(self):
- print '_hq'
- for line in self._q:
- print 'yo log'
- self.handleRawLine(line)
-
-
-class EventLineSource(LineSource):
- def __init__(self, streamName,
- lineHandler=None):
- LineSource.__init__(self, lineHandler)
- self._streamName = streamName
-
- def stop(self):
- pass
-
- def log(self, text):
- self.handleLine(TimestampLine
- (self._streamName,
- 'n',
- text,
- datetime.datetime.utcnow()))
-
-
-class LineBuffer(LineSource):
- def __init__(self, lineHandler=None, maxSize=2048):
- LineSource.__init__(self, lineHandler)
+class LineBuffer(logging.Handler):
+ def __init__(self, maxSize=2048):
self._maxSize = maxSize
self._lines = []
self._lineCount = 0
- def addLine(self, tsline):
+ def emit(self, rec):
DELETE_SIZE = self._maxSize // 2
if len(self._lines) == self._maxSize - DELETE_SIZE:
del self._lines[0:DELETE_SIZE]
- tsline.lineCount = self._lineCount
- self._lines.append(tsline)
+ # rec.lineCount = self._lineCount
+ self._lines.append(rec)
self._lineCount += 1
- self.handleLine(tsline)
def getLines(self, minTime=None, maxLines=None):
n = len(self._lines)
@@ -217,23 +120,53 @@ def getLines(self, minTime=None, maxLines=None):
return self._lines[minIndex:]
-class TimestampLineLogger:
- def __init__(self, stream):
+def escapeEndOfLine(line):
+ if line.endswith('\r\n'):
+ return 'n ' + line[:-2]
+ elif line.endswith('\n'):
+ return 'n ' + line[:-1]
+ elif line.endswith('\r'):
+ return 'r ' + line[:-1]
+ else:
+ return 'c ' + line
+
+
+def getStreamLogger(name, stream):
+ result = logging.getLogger(name)
+ result.setLevel(logging.DEBUG)
+ sh = logging.StreamHandler(stream)
+ sh.setLevel(logging.DEBUG)
+ sh.setFormatter(UtcFormatter('%(asctime)s %(name)s %(message)s'))
+ result.addHandler(sh)
+ result.propagate = False
+ return result
+
+
+class TimestampingStream(LineParser):
+ def __init__(self, name, stream, maxLineLength=160):
+ super(TimestampingStream, self).__init__(self.handleLine, maxLineLength)
self._stream = stream
+ self._logger = getStreamLogger(name, stream)
+
+ def handleLine(self, line):
+ self._logger.info(escapeEndOfLine(line))
- def handleLine(self, tsline):
- self._stream.write('%s %s %s %s\n'
- % (tsline.streamName,
- tsline.lineType,
- getTimeString(tsline.timestamp),
- tsline.text))
-
-
-class StreamLogger(TimestampLineParser):
- def __init__(self, streamName, outStream,
- maxLineLength=160):
- self._logger = TimestampLineLogger(outStream)
- TimestampLineParser.__init__(self,
- streamName,
- self._logger.handleLine,
- maxLineLength)
+ def flush(self):
+ super(TimestampingStream, self).flush()
+ self._stream.flush()
+
+
+class StreamLogger(object):
+ def __init__(self, inFd, logger, level=logging.DEBUG, maxLineLength=160):
+ self._logger = logger
+ self._logger.setLevel(level)
+ self._q = queueFromFile(inFd, maxLineLength)
+ self._job = gevent.spawn(self._handleQueue)
+
+ def _handleQueue(self):
+ for line in self._q:
+ self._logger.info(escapeEndOfLine(line))
+
+ def stop(self):
+ self._job.kill()
+ # could probably more thoroughly flush things
View
48 geocamPycroraptor2/manager.py
@@ -5,6 +5,7 @@
# __END_LICENSE__
import os
+import sys
import logging
import gevent
@@ -13,7 +14,7 @@
from geocamPycroraptor2.util import loadConfig
from geocamPycroraptor2.service import Service
-from geocamPycroraptor2 import prexceptions, daemonize
+from geocamPycroraptor2 import prexceptions, daemonize, log
class Manager(object):
@@ -22,32 +23,57 @@ def __init__(self, opts):
self._config = loadConfig(opts.config)
self._name = opts.name
self._logDir = self._config.get('LOG_DIR', '/tmp/pyraptord/logs')
- self._logFile = self._config.get('LOG_FILE', 'pyraptord_${unique}.txt')
+ self._logFname = self._config.get('LOG_FILE', 'pyraptord_${unique}.txt')
self._pidFile = self._config.get('PID_FILE', 'pyraptord_pid.txt')
+ self._logger = logging.getLogger('pyraptord.evt')
+ self._logger.setLevel(logging.DEBUG)
+ self._logger.propagate = False
def _start(self):
+ fmt = log.UtcFormatter('%(asctime)s evt n %(message)s')
+
+ if self._opts.foreground:
+ # send logger output to console
+ ch = logging.StreamHandler(sys.stderr)
+ ch.setFormatter(fmt)
+ ch.setLevel(logging.DEBUG)
+ self._logger.addHandler(ch)
+
+ if self._logFname is None:
+ self._logPath = '/dev/null'
+ self._logFile = None
+ else:
+ logPathTemplate = os.path.join(self._logDir, self._logFname)
+ self._logPath, self._logFile = (log.openLogFromTemplate
+ ('pyraptord',
+ logPathTemplate,
+ {}))
+
+ # send logger output to file
+ lh = logging.StreamHandler(self._logFile)
+ lh.setFormatter(fmt)
+ lh.setLevel(logging.DEBUG)
+ self._logger.addHandler(lh)
+
+ # load ports config
self._ports = loadConfig(self._config.PORTS)
self._port = self._ports[self._name]
if not self._opts.foreground:
- (daemonize.daemonize
- (os.path.join(self._logDir, self._logFile),
- {},
- os.path.join(self._logDir, self._pidFile)))
+ self._logger.debug('daemonizing')
+ daemonize.daemonize('pyraptord', self._logFile)
+ # start startup services
self._services = {}
if 'startup' in self._config.GROUPS:
startupGroup = self._config.GROUPS.startup
- logging.debug('startup group: %s', startupGroup)
+ self._logger.debug('startup group: %s', startupGroup)
for svcName in startupGroup:
- print 'here0'
self.start(svcName)
else:
- logging.debug('no group named "startup"')
- print 'here1'
+ self._logger.debug('no group named "startup"')
self._jobs = []
self._jobs.append(gevent.spawn(self._cleanupChildren))
- print 'here2'
def _cleanupChildren(self):
while 1:
View
4 geocamPycroraptor2/prexceptions.py
@@ -5,11 +5,11 @@
# __END_LICENSE__
-class ServiceNotStartable(Exception):
+class ServiceAlreadyActive(Exception):
pass
-class ServiceNotStoppable(Exception):
+class ServiceNotActive(Exception):
pass
View
66 geocamPycroraptor2/service.py
@@ -29,9 +29,11 @@ def __init__(self, name, config, parent):
self._childStdin = None
self._tslineLogger = None
self._logBuffer = None
+ self._logger = None
self._outLogger = None
self._errLogger = None
self._eventLogger = None
+ self._stdinLogger = None
self._statusDict = None
self._status = None
self._jobs = []
@@ -55,12 +57,18 @@ def getEnvVariables(self):
return self._config.get('env', {})
def start(self):
- if not statuslib.startable(self._status):
- raise prexceptions.ServiceNotStartable(self._name)
+ if not self.isStartable():
+ raise prexceptions.ServiceAlreadyActive(self._name)
cmdArgs = shlex.split(self.getCommand())
- self._logBuffer = log.LineBuffer()
+ self._logger = logging.getLogger('service.%s' % self._name)
+ self._logger.setLevel(logging.DEBUG)
+ self._logger.propagate = False
+
+ #self._logBuffer = log.LineBuffer()
+ # FIX: add handler for line buffer
+
logName = self.getLogNameTemplate()
if logName is None:
self._log = None
@@ -71,18 +79,29 @@ def start(self):
(self._name,
logPath,
self._env))
- self._tslineLogger = log.TimestampLineLogger(self._log)
- self._logBuffer.addLineHandler(self._tslineLogger.handleLine)
+
+ sh = logging.StreamHandler(self._log)
+ sh.setLevel(logging.DEBUG)
+ sh.setFormatter(log.UtcFormatter('%(asctime)s %(name)s %(message)s'))
+ self._logger.addHandler(sh)
childStdoutReadFd, childStdoutWriteFd = trackerG.openpty(self._name)
childStderrReadFd, childStderrWriteFd = trackerG.openpty(self._name)
trackerG.debug()
- self._eventLogger = log.EventLineSource('evt', self._logBuffer.handleLine)
- self._outLogger = log.TimestampLineParser('out', childStdoutReadFd,
- self._logBuffer.handleLine)
- self._errLogger = log.TimestampLineParser('err', childStderrReadFd,
- self._logBuffer.handleLine)
+ self._eventLogger = self._logger.getChild('evt n')
+ self._eventLogger.setLevel(logging.DEBUG)
+
+ self._stdinLogger = self._logger.getChild('inp')
+ self._stdinLogger.setLevel(logging.DEBUG)
+
+ self._outLogger = (log.StreamLogger
+ (childStdoutReadFd,
+ self._logger.getChild('out')))
+
+ self._errLogger = (log.StreamLogger
+ (childStderrReadFd,
+ self._logger.getChild('err')))
workingDir = self.getWorkingDir()
if workingDir:
@@ -96,7 +115,10 @@ def start(self):
else:
childEnv[k] = v
- logging.info('starting %s', self._name)
+ self._eventLogger.info('starting')
+ escapedArgs = ' '.join(['"%s"' % arg
+ for arg in cmdArgs])
+ self._eventLogger.info('command: %s', escapedArgs)
try:
self._proc = subprocess.Popen(cmdArgs,
@@ -125,12 +147,12 @@ def start(self):
else:
self._childStdin = self._proc.stdin
self._setStatus(dict(status=statuslib.RUNNING,
- procStatus=statuslib.RUNNING,
- pid=self._proc.pid))
+ procStatus=statuslib.RUNNING,
+ pid=self._proc.pid))
def stop(self):
- if not statuslib.stoppable(self._status):
- raise prexceptions.ServiceNotStoppable(self._name)
+ if not self.isActive():
+ raise prexceptions.ServiceNotActive(self._name)
statusDict = self._statusDict.copy()
statusDict['status'] = statuslib.STOPPING
@@ -141,6 +163,12 @@ def stop(self):
def getStatus(self):
return self._statusDict
+ def isActive(self):
+ return statuslib.isActive(self._status)
+
+ def isStartable(self):
+ return statuslib.isStartable(self._status)
+
def _stopInternal(self):
self._proc.send_signal(signal.SIGTERM)
gevent.sleep(5)
@@ -190,15 +218,19 @@ def _postExitCleanup(self):
self._errLogger.stop()
self._errLogger = None
if self._eventLogger:
- self._eventLogger.stop()
self._eventLogger = None
+ if self._stdinLogger:
+ self._stdinLogger = None
self._log.flush()
self._log.close()
# note: keep self._logBuffer around in case a client requests old log data.
# it will be reinitialized the next time the task is started.
# self._checkForPendingRestart()
def stdin(self, text):
- logging.debug('%s stdin: %s', self._name, text)
+ if not self.isActive():
+ raise prexceptions.ServiceNotActive(self._name)
+
+ self._stdinLogger.info(log.escapeEndOfLine(text))
self._childStdin.write(text)
self._childStdin.flush()
View
13 geocamPycroraptor2/status.py
@@ -19,18 +19,19 @@
SIGNAL_EXIT = 'signalExit'
ERROR_EXIT = 'errorExit'
+
STARTABLE_STATUS = (NOT_STARTED,
SUCCESS,
ABORTED,
FAILED)
-STOPPABLE_STATUS = (STARTING,
- RUNNING)
+ACTIVE_STATUS = (STARTING,
+ RUNNING)
-def startable(status):
- return status in STARTABLE_STATUS
+def isActive(status):
+ return status in ACTIVE_STATUS
-def stoppable(status):
- return status in STOPPABLE_STATUS
+def isStartable(status):
+ return status in STARTABLE_STATUS
View
2 geocamPycroraptor2/tests/pycroraptor.yaml
@@ -3,7 +3,7 @@ LOG_DIR: "/tmp/pyraptord/logs"
SERVICES:
bc:
- command: "bc"
+ command: "bc -i"
GROUPS:
startup: ["bc"]
View
6 geocamPycroraptor2/util.py
@@ -83,8 +83,8 @@ def getPid(pidPath):
if pidIsActive(pid):
return pid
else:
- logging.debug('getPid: process does not appear to be running, removing stale pid file "%s"',
- pidPath)
+ print ('getPid: process does not appear to be running, removing stale pid file "%s"'
+ % pidPath)
os.unlink(pidPath)
return None
@@ -97,4 +97,4 @@ def waitUntilDead(pid, timeout):
return False
if not pidIsActive(pid):
return True
- gevent.sleep(0.5)
+ gevent.sleep(0.1)

0 comments on commit 6ce068c

Please sign in to comment.