Skip to content

Commit

Permalink
update step methods to be async, but still using status API
Browse files Browse the repository at this point in the history
..so, no data API calls

This contains a hack to allow tests for existing steps that use now-removed
methods to continue to pass, but with warnings.  Those will need to be
fixed in a subsequent commit.

Includes work by: Pierre Tardy <pierre.tardy@intel.com>
  • Loading branch information
djmitche committed Dec 4, 2013
1 parent 8e78347 commit e24f6ed
Show file tree
Hide file tree
Showing 25 changed files with 364 additions and 450 deletions.
139 changes: 0 additions & 139 deletions master/buildbot/interfaces.py
Expand Up @@ -722,126 +722,6 @@ def getText():
LOG_CHANNEL_HEADER = 2


class IStatusLog(Interface):

"""I represent a single Log, which is a growing list of text items that
contains some kind of output for a single BuildStep. I might be finished,
in which case this list has stopped growing.
Each Log has a name, usually something boring like 'log' or 'output'.
These names are not guaranteed to be unique, however they are usually
chosen to be useful within the scope of a single step (i.e. the Compile
step might produce both 'log' and 'warnings'). The name may also have
spaces. If you want something more globally meaningful, at least within a
given Build, try::
'%s.%s' % (log.getStep.getName(), log.getName())
The Log can be presented as plain text, or it can be accessed as a list
of items, each of which has a channel indicator (header, stdout, stderr)
and a text chunk. An HTML display might represent the interleaved
channels with different styles, while a straight download-the-text
interface would just want to retrieve a big string.
The 'header' channel is used by ShellCommands to prepend a note about
which command is about to be run ('running command FOO in directory
DIR'), and append another note giving the exit code of the process.
Logs can be streaming: if the Log has not yet finished, you can
subscribe to receive new chunks as they are added.
A ShellCommand will have a Log associated with it that gathers stdout
and stderr. Logs may also be created by parsing command output or
through other synthetic means (grepping for all the warnings in a
compile log, or listing all the test cases that are going to be run).
Such synthetic Logs are usually finished as soon as they are created."""

def getName():
"""Returns a short string with the name of this log, probably 'log'.
"""

def getStep():
"""Returns the IBuildStepStatus which owns this log."""
# TODO: can there be non-Step logs?

def isFinished():
"""Return a boolean. True means the log has finished and is closed,
False means it is still open and new chunks may be added to it."""

def waitUntilFinished():
"""Return a Deferred that will fire when the log is closed. If the
log has already finished, this deferred will fire right away. The
callback is given this IStatusLog instance as an argument."""

def subscribe(receiver, catchup):
"""Register an IStatusReceiver to receive chunks (with logChunk) as
data is added to the Log. If you use this, you will also want to use
waitUntilFinished to find out when the listener can be retired.
Subscribing to a closed Log is a no-op.
If 'catchup' is True, the receiver will immediately be sent a series
of logChunk messages to bring it up to date with the partially-filled
log. This allows a status client to join a Log already in progress
without missing any data. If the Log has already finished, it is too
late to catch up: just do getText() instead.
If the Log is very large, the receiver will be called many times with
a lot of data. There is no way to throttle this data. If the receiver
is planning on sending the data on to somewhere else, over a narrow
connection, you can get a throttleable subscription by using
C{subscribeConsumer} instead."""

def unsubscribe(receiver):
"""Remove a receiver previously registered with subscribe(). Attempts
to remove a receiver which was not previously registered is a no-op.
"""

def subscribeConsumer(consumer):
"""Register an L{IStatusLogConsumer} to receive all chunks of the
logfile, including all the old entries and any that will arrive in
the future. The consumer will first have their C{registerProducer}
method invoked with a reference to an object that can be told
C{pauseProducing}, C{resumeProducing}, and C{stopProducing}. Then the
consumer's C{writeChunk} method will be called repeatedly with each
(channel, text) tuple in the log, starting with the very first. The
consumer will be notified with C{finish} when the log has been
exhausted (which can only happen when the log is finished). Note that
a small amount of data could be written via C{writeChunk} even after
C{pauseProducing} has been called.
To unsubscribe the consumer, use C{producer.stopProducing}."""

# once the log has finished, the following methods make sense. They can
# be called earlier, but they will only return the contents of the log up
# to the point at which they were called. You will lose items that are
# added later. Use C{subscribe} or C{subscribeConsumer} to avoid missing
# anything.

def hasContents():
"""Returns True if the LogFile still has contents available. Returns
False for logs that have been pruned. Clients should test this before
offering to show the contents of any log."""

def getText():
"""Return one big string with the contents of the Log. This merges
all non-header chunks together."""

def readlines(channel=LOG_CHANNEL_STDOUT):
"""Read lines from one channel of the logfile. This returns an
iterator that will provide single lines of text (including the
trailing newline).
"""

def getTextWithHeaders():
"""Return one big string with the contents of the Log. This merges
all chunks (including headers) together."""

def getChunks():
"""Generate a list of (channel, text) tuples. 'channel' is a number,
0 for stdout, 1 for stderr, 2 for header. (note that stderr is merged
into stdout if PTYs are in use)."""


class IStatusLogConsumer(Interface):

"""I am an object which can be passed to IStatusLog.subscribeConsumer().
Expand Down Expand Up @@ -1089,25 +969,6 @@ def stopBuild(reason="<no reason given>"):
finished."""


class ILogFile(Interface):

"""This is the internal interface to a LogFile, used by the BuildStep to
write data into the log.
"""
def addStdout(data):
pass

def addStderr(data):
pass

def addHeader(data):
pass

def finish():
"""The process that is feeding the log file has finished, and no
further data will be added. This closes the logfile."""


class ILogObserver(Interface):

"""Objects which provide this interface can be used in a BuildStep to
Expand Down
140 changes: 108 additions & 32 deletions master/buildbot/process/buildstep.py
Expand Up @@ -19,6 +19,7 @@
from twisted.internet import error
from twisted.python import components
from twisted.python import log
from twisted.python import failure
from twisted.python import util as twutil
from twisted.python.failure import Failure
from twisted.python.reflect import accumulateClassList
Expand Down Expand Up @@ -97,6 +98,75 @@ def wrap(self, *args, **kwargs):
return wrap


class SyncWriteOnlyLogFileWrapper(object):

# A temporary wrapper around process.log.Log to emulate *synchronous*
# writes to the logfile by handling the Deferred from each add* operation
# as part of the step's _start_unhandled_deferreds. This has to handle
# the tricky case of adding data to a log *before* addLog has returned!

def __init__(self, step, name, addLogDeferred):
self.step = step
self.name = name
self.delayedOperations = []
self.asyncLogfile = None

@addLogDeferred.addCallback
def gotAsync(log):
self.asyncLogfile = log
self._catchup()
return log

# run _catchup even if there's an error; it will helpfully generate
# a whole bunch more!
@addLogDeferred.addErrback
def problem(f):
self._catchup()
return f

def _catchup(self):
if not self.delayedOperations:
return
op = self.delayedOperations.pop(0)

try:
d = defer.maybeDeferred(op)
except Exception:
d = defer.fail(failure.Failure())

@d.addBoth
def next(x):
self._catchup()
return x
self.step._start_unhandled_deferreds.append(d)

def _delay(self, op):
self.delayedOperations.append(op)
if len(self.delayedOperations) == 1:
self._catchup()

def getName(self):
# useLog uses this
return self.name

def addStdout(self, data):
self._delay(lambda : self.asyncLogfile.addStdout(data))

def addStderr(self, data):
self._delay(lambda : self.asyncLogfile.addStderr(data))

def addHeader(self, data):
self._delay(lambda : self.asyncLogfile.addHeader(data))

def finish(self):
self._delay(lambda : self.asyncLogfile.finish())

def unwrap(self):
d = defer.Deferred()
self._delay(lambda : d.callback(self.asyncLogfile))
return d


class BuildStep(object, properties.PropertiesMixin):

haltOnFailure = False
Expand Down Expand Up @@ -226,6 +296,11 @@ def setProgress(self, metric, value):
if self.progress:
self.progress.setProgress(metric, value)

def setStateStrings(self, strings):
# call to the status API for now
self.step_status.old_setText(strings)
self.step_status.old_setText2(strings)

@defer.inlineCallbacks
def startStep(self, remote):
self.remote = remote
Expand All @@ -245,7 +320,7 @@ def startStep(self, remote):

# Set the step's text here so that the stepStarted notification sees
# the correct description
self.step_status.setText(self.describe(False))
yield self.setStateStrings(self.describe(False))
self.step_status.stepStarted()

try:
Expand Down Expand Up @@ -283,7 +358,7 @@ def setRenderable(res, attr):
if doStep:
results = yield self.run()
else:
self.step_status.setText(self.describe(True) + ['skipped'])
yield self.setStateStrings(self.describe(True) + ['skipped'])
self.step_status.setSkipped(True)
results = SKIPPED

Expand All @@ -304,9 +379,7 @@ def setRenderable(res, attr):
except Exception:
log.err(Failure(), "error while formatting exceptions")

self.step_status.setText([self.name, "exception"])
self.step_status.setText2([self.name])

yield self.setStateStrings([self.name, "exception"])
results = EXCEPTION

if self.stopped and results != RETRY:
Expand All @@ -317,16 +390,13 @@ def setRenderable(res, attr):
# to retry interrupted build due to some other issues for example
# due to slave lost
if results == CANCELLED:
self.step_status.setText(self.describe(True) +
["cancelled"])
self.step_status.setText2(["cancelled"])
yield self.setStateStrings(self.describe(True) + ["cancelled"])
else:
# leave RETRY as-is, but change anything else to EXCEPTION
if results != RETRY:
results = EXCEPTION
self.step_status.setText(self.describe(True) +
["interrupted"])
self.step_status.setText2(["interrupted"])
yield self.setStateStrings(self.describe(True) +
["interrupted"])

if self.progress:
self.progress.finish()
Expand Down Expand Up @@ -371,26 +441,15 @@ def run(self):
self._start_deferred = defer.Deferred()
unhandled = self._start_unhandled_deferreds = []
try:
# monkey-patch self.step_status.{setText,setText2} to add their
# deferreds to _start_unhandled_deferreds
# start() can return a Deferred, but the step isn't finished
# at that point. But if it returns SKIPPED, then finish will
# never be called.
def setText(txt, old=self.step_status.setText):
d = defer.maybeDeferred(old, txt)
unhandled.append(d)
return d
self.step_status.setText = setText

def setText2(txt, old=self.step_status.setText2):
d = defer.maybeDeferred(old, txt)
unhandled.append(d)
return d
self.step_status.setText2 = setText2
# monkey-patch self.step_status.{setText,setText2} back into
# existence for old steps; when these write to the data API,
# the monkey patches will stash their deferreds on the unhandled list
self.step_status.setText = self.step_status.old_setText
self.step_status.setText2 = self.step_status.old_setText2

results = yield self.start()
if results == SKIPPED:
self.step_status.setText(self.describe(True) + ['skipped'])
yield self.setStateStrings(self.describe(True) + ['skipped'])
self.step_status.setSkipped(True)
else:
results = yield self._start_deferred
Expand All @@ -406,7 +465,8 @@ def setText2(txt, old=self.step_status.setText2):
consumeErrors=True)
for success, res in unhandled_results:
if not success:
log.err(res, "from an asynchronous method executed in an old-style step")
log.err(
res, "from an asynchronous method executed in an old-style step")
results = EXCEPTION

defer.returnValue(results)
Expand Down Expand Up @@ -456,10 +516,18 @@ def slaveVersionIsOlderThan(self, command, minversion):
def getSlaveName(self):
return self.build.getSlaveName()

def addLog(self, name):
loog = self.step_status.addLog(name)
def addLog(self, name, type='s'):
# This method implements a smooth transition for nine
# it returns a synchronous version of logfile, so that steps can safely
# start writting into logfile, without waiting for log creation in db
loog_d = defer.maybeDeferred(self.step_status.addLog, name)
self._connectPendingLogObservers()
return loog
if self._start_unhandled_deferreds is None:
# This is a new-style step, so we can return the deferred
return loog_d

self._start_unhandled_deferreds.append(loog_d)
return SyncWriteOnlyLogFileWrapper(self, name, loog_d)

def getLog(self, name):
for l in self.step_status.getLogs():
Expand Down Expand Up @@ -584,7 +652,14 @@ def startCommand(self, cmd, errorMessages=[]):

d = self.runCommand(cmd) # might raise ConnectionLost
d.addCallback(lambda res: self.commandComplete(cmd))

# TODO: when the status.LogFile object no longer exists, then this
# method will a synthetic logfile for old-style steps, and to be called
# without the `logs` parameter for new-style steps. Unfortunately,
# lots of createSummary methods exist, but don't look at the log, so
# it's difficult to optimize when the synthetic logfile is needed.
d.addCallback(lambda res: self.createSummary(cmd.logs['stdio']))

d.addCallback(lambda res: self.evaluateCommand(cmd)) # returns results

def _gotResults(results):
Expand Down Expand Up @@ -680,6 +755,7 @@ def setStatus(self, cmd, results):
# get more control over the displayed text
self.step_status.setText(self.getText(cmd, results))
self.step_status.setText2(self.maybeGetText2(cmd, results))
return defer.succeed(None)


# Parses the logs for a list of regexs. Meant to be invoked like:
Expand Down

0 comments on commit e24f6ed

Please sign in to comment.