Skip to content

Commit

Permalink
Merge branch 'dj_buildbot-0.8.0' of git://github.com/maruel/buildbot
Browse files Browse the repository at this point in the history
* 'dj_buildbot-0.8.0' of git://github.com/maruel/buildbot:
  Remove waterLevel since it was causing more misbehaviors than improvements.

Signed-off-by: Dustin J. Mitchell <dustin@zmanda.com>
  • Loading branch information
Dustin J. Mitchell committed Apr 16, 2010
2 parents 5893e72 + 0d36a29 commit 654e0fd
Showing 1 changed file with 31 additions and 25 deletions.
56 changes: 31 additions & 25 deletions buildbot/status/status_push.py
Expand Up @@ -5,6 +5,7 @@
Implements the HTTP receiver."""

import datetime
import logging
import os
import urllib
import urlparse
Expand Down Expand Up @@ -35,8 +36,7 @@ class StatusPush(StatusReceiverMultiService):
"""

def __init__(self, serverPushCb, queue=None, path=None, filter=True,
bufferDelay=0.5, retryDelay=5, waterLevel=None,
blackList=None):
bufferDelay=1, retryDelay=5, blackList=None):
"""
@serverPushCb: callback to be used. It receives 'self' as parameter. It
should call self.queueNextServerPush() when it's done to queue the next
Expand All @@ -51,8 +51,6 @@ def __init__(self, serverPushCb, queue=None, path=None, filter=True,
end of a request to initializing a new one.
@retryDelay: amount of time between retries when no items were pushed on
last serverPushCb call.
@waterLevel: number of items that will trigger an immediate retry when
the server is up. Disabled if None.
@blackList: events that shouldn't be sent.
"""
StatusReceiverMultiService.__init__(self)
Expand All @@ -66,10 +64,9 @@ def __init__(self, serverPushCb, queue=None, path=None, filter=True,
self.filter = filter
self.bufferDelay = bufferDelay
self.retryDelay = retryDelay
self.waterLevel = waterLevel
if not callable(serverPushCb):
raise NotImplementedError('Please pass serverPushCb parameter.')
def hookPushCb(self):
def hookPushCb():
# Update the index so we know if the next push succeed or not, don't
# update the value when the queue is empty.
if not self.queue.nbItems():
Expand Down Expand Up @@ -107,7 +104,7 @@ def setServiceParent(self, parent):

def wasLastPushSuccessful(self):
"""Returns if the "virtual pointer" in the queue advanced."""
return self.lastIndex < self.queue.getIndex()
return self.lastIndex <= self.queue.getIndex()

def queueNextServerPush(self):
"""Queue the next push or call it immediately.
Expand All @@ -118,11 +115,8 @@ def queueNextServerPush(self):
the current call."""
# Determine the delay.
if self.wasLastPushSuccessful():
if (self.stopped or
(self.waterLevel and
self.queue.nbItems() > self.waterLevel)):
# Already at high water level so don't wait, we already have
# enough items to push.
if self.stopped:
# Shutting down.
delay = 0
else:
# Normal case.
Expand All @@ -139,7 +133,7 @@ def queueNextServerPush(self):
# Cleanup a previously queued task if necessary.
if self.task:
# Warning: we could be running inside the task.
if self.task.active() and delay != 0:
if self.task.active():
# There was already a task queue, don't requeue it, just let it
# go.
return
Expand All @@ -154,22 +148,25 @@ def queueNextServerPush(self):
# Do the queue/direct call.
if delay:
# Call in delay seconds.
self.task = reactor.callLater(delay, self.serverPushCb, self)
self.task = reactor.callLater(delay, self.serverPushCb)
elif self.stopped:
if not self.queue.nbItems():
return
# Call right now, we're shutting down.
@defer.deferredGenerator
def BlockForEverythingBeingSent():
d = self.serverPushCb(self)
d = self.serverPushCb()
if d:
x = defer.waitForDeferred(d)
yield x
x.getResult()
return BlockForEverythingBeingSent()
else:
# Call right now, delay == 0.
return self.serverPushCb(self)
# delay should never be 0. That can cause Buildbot to spin tightly
# trying to push events that may not be received well by a status
# listener.
logging.exception('Did not expect delay to be 0, but it is.')
return

def stopService(self):
"""Shutting down."""
Expand Down Expand Up @@ -223,7 +220,7 @@ def push(self, event, **objs):
self.queue.pushItem(packet)
if self.task is None or not self.task.active():
# No task queued since it was probably idle, let's queue a task.
self.queueNextServerPush()
return self.queueNextServerPush()

#### Events

Expand Down Expand Up @@ -316,7 +313,7 @@ class HttpStatusPush(StatusPush):
"""Event streamer to a HTTP server."""

def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
maxDiskItems=None, chunkSize=200, maxHttpRequestSize=None,
maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20,
**kwargs):
"""
@serverUrl: Base URL to be used to push events notifications.
Expand All @@ -325,12 +322,14 @@ def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
use disk at all.
@debug: Save the json with nice formatting.
@chunkSize: maximum number of items to send in each at each HTTP POST.
@maxHttpRequestSize: limits the size of encoded data for AE.
@maxHttpRequestSize: limits the size of encoded data for AE, the default
is 1MB.
"""
# Parameters.
self.serverUrl = serverUrl
self.debug = debug
self.chunkSize = chunkSize
self.lastPushWasSuccessful = True
self.maxHttpRequestSize = maxHttpRequestSize
if maxDiskItems != 0:
# The queue directory is determined by the server url.
Expand All @@ -343,17 +342,22 @@ def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
path = None
queue = MemoryQueue(maxItems=maxMemoryItems)

if not 'waterLevel' in kwargs:
kwargs['waterLevel'] = 50
# Use the unbounded method.
StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp,
queue=queue, path=path, **kwargs)

def wasLastPushSuccessful(self):
return self.lastPushWasSuccessful

def popChunk(self):
"""Pops items from the pending list.
They must be queued back on failure."""
chunkSize = self.chunkSize
if self.wasLastPushSuccessful():
chunkSize = self.chunkSize
else:
chunkSize = 1

while True:
items = self.queue.popChunk(chunkSize)
if self.debug:
Expand Down Expand Up @@ -382,7 +386,8 @@ def pushHttp(self):
def Success(result):
"""Queue up next push."""
log.msg('Sent %d events to %s' % (len(items), self.serverUrl))
self.queueNextServerPush()
self.lastPushWasSuccessful = True
return self.queueNextServerPush()

def Failure(result):
"""Insert back items not sent and queue up next push."""
Expand All @@ -395,7 +400,8 @@ def Failure(result):
# on us. Make sure the queue is saved since we just queued back
# items.
self.queue.save()
self.queueNextServerPush()
self.lastPushWasSuccessful = False
return self.queueNextServerPush()

# Trigger the HTTP POST request.
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
Expand Down

0 comments on commit 654e0fd

Please sign in to comment.