Skip to content

Commit

Permalink
Merge branch 'dj_10_0' of git://github.com/maruel/buildbot
Browse files Browse the repository at this point in the history
* 'dj_10_0' of git://github.com/maruel/buildbot:
  Add blacklist to StatusPush to not send unneeded events
  • Loading branch information
Dustin J. Mitchell committed Mar 16, 2010
2 parents e8a8dc5 + e02bccf commit 1b371f0
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 12 deletions.
2 changes: 2 additions & 0 deletions buildbot/broken_test/runs/test_status_push.py
Expand Up @@ -194,6 +194,7 @@ def doNothing(self):
'who': 'bob'
},
],
'hasPatch': False,
'project': '',
'repository': '',
# BUG!!
Expand Down Expand Up @@ -333,6 +334,7 @@ def doNothing(self):
'who': 'bob'
},
],
'hasPatch': False,
'project': '',
'repository': '',
# BUG!!
Expand Down
3 changes: 3 additions & 0 deletions buildbot/broken_test/unit/test_sourcestamp.py
Expand Up @@ -10,6 +10,7 @@ def testAsDictEmpty(self):
EXPECTED = {
'revision': None,
'branch': None,
'hasPatch': False,
'changes': [],
'project': '',
'repository': '',
Expand All @@ -20,6 +21,7 @@ def testAsDictBranch(self):
EXPECTED = {
'revision': 'Rev',
'branch': 'Br',
'hasPatch': False,
'changes': [],
'project': '',
'repository': '',
Expand All @@ -41,6 +43,7 @@ def testAsDictChanges(self):
EXPECTED = {
'revision': 'rev3',
'branch': 'br3',
'hasPatch': True,
'project': '',
'repository':'',
'changes': [
Expand Down
1 change: 1 addition & 0 deletions buildbot/sourcestamp.py
Expand Up @@ -158,6 +158,7 @@ def asDict(self):
# Constant
result['revision'] = self.revision
# TODO(maruel): Make the patch content a suburl.
result['hasPatch']= self.patch is not None
result['branch'] = self.branch
result['changes'] = [c.asDict() for c in getattr(self, 'changes', [])]
result['project'] = self.project
Expand Down
6 changes: 4 additions & 2 deletions buildbot/status/persistent_queue.py
Expand Up @@ -406,13 +406,15 @@ def getIndex(self):

def popChunk(self, *args, **kwargs):
items = self.queue.popChunk(*args, **kwargs)
self._index += len(items)
if items:
self._index += len(items)
return items

def insertBackChunk(self, items):
self._index -= len(items)
ret = self.queue.insertBackChunk(items)
self._index += len(ret)
if ret:
self._index += len(ret)
return ret


Expand Down
43 changes: 33 additions & 10 deletions buildbot/status/status_push.py
Expand Up @@ -35,7 +35,8 @@ class StatusPush(StatusReceiverMultiService):
"""

def __init__(self, serverPushCb, queue=None, path=None, filter=True,
bufferDelay=0.1, retryDelay=1, waterLevel=0):
bufferDelay=0.5, retryDelay=5, waterLevel=None,
blackList=None):
"""
@serverPushCb: callback to be used. It receives 'self' as parameter. It
should call self.queueNextServerPush() when it's done to queue the next
Expand All @@ -51,7 +52,8 @@ def __init__(self, serverPushCb, queue=None, path=None, filter=True,
@retryDelay: amount of time between retries when no items were pushed on
last serverPushCb call.
@waterLevel: number of items that will trigger an immediate retry when
the server is up. Disabled if 0.
the server is up. Disabled if None.
@blackList: events that shouldn't be sent.
"""
StatusReceiverMultiService.__init__(self)

Expand All @@ -75,6 +77,7 @@ def hookPushCb(self):
self.lastIndex = self.queue.getIndex()
return serverPushCb(self)
self.serverPushCb = hookPushCb
self.blackList = blackList

# Other defaults.
# IDelayedCall object that represents the next queued push.
Expand Down Expand Up @@ -116,7 +119,7 @@ def queueNextServerPush(self):
# Determine the delay.
if self.wasLastPushSuccessful():
if (self.stopped or
(self.waterLevel != 0 and
(self.waterLevel and
self.queue.nbItems() > self.waterLevel)):
# Already at high water level so don't wait, we already have
# enough items to push.
Expand Down Expand Up @@ -200,6 +203,8 @@ def push(self, event, **objs):
- Queued to disk when the sink server is down
- Pushed (along the other queued items) to the server
"""
if self.blackList and event in self.blackList:
return
# First, generate the packet.
packet = {}
packet['id'] = self.state['next_id']
Expand Down Expand Up @@ -311,19 +316,22 @@ class HttpStatusPush(StatusPush):
"""Event streamer to a HTTP server."""

def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
maxDiskItems=None, chunkSize=200, **kwargs):
maxDiskItems=None, chunkSize=200, maxHttpRequestSize=None,
**kwargs):
"""
@serverUrl: Base URL to be used to push events notifications.
@maxMemoryItems: Maximum number of items to keep queued in memory.
@maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't
use disk at all.
@debug: Save the json with nice formatting.
@chunkSize: maximum number of items to send in each at each HTTP POST.
@maxHttpRequestSize: limits the size of encoded data for AE.
"""
# Parameters.
self.serverUrl = serverUrl
self.debug = debug
self.chunkSize = chunkSize
self.maxHttpRequestSize = maxHttpRequestSize
if maxDiskItems != 0:
# The queue directory is determined by the server url.
path = ('events_' +
Expand All @@ -345,12 +353,27 @@ def popChunk(self):
"""Pops items from the pending list.
They must be queued back on failure."""
items = self.queue.popChunk(self.chunkSize)
if self.debug:
packets = json.dumps(items, indent=2, sort_keys=True)
else:
packets = json.dumps(items, separators=(',',':'))
return (urllib.urlencode({'packets': packets}), items)
chunkSize = self.chunkSize
while True:
items = self.queue.popChunk(chunkSize)
if self.debug:
packets = json.dumps(items, indent=2, sort_keys=True)
else:
packets = json.dumps(items, separators=(',',':'))
data = urllib.urlencode({'packets': packets})
if (not self.maxHttpRequestSize or
len(data) < self.maxHttpRequestSize):
return (data, items)

if chunkSize == 1:
# This packet is just too large. Drop this packet.
log.msg("ERROR: packet %s was dropped, too large: %d > %d" %
(items[0]['id'], len(data), self.maxHttpRequestSize))
chunkSize = self.chunkSize
else:
# Try with half the packets.
chunkSize /= 2
self.queue.insertBackChunk(items)

def pushHttp(self):
"""Do the HTTP POST to the server."""
Expand Down

0 comments on commit 1b371f0

Please sign in to comment.