Skip to content

Commit

Permalink
Made small stylistic changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandr Plesovskih committed Nov 25, 2013
1 parent 33ac56d commit 187e466
Showing 1 changed file with 67 additions and 49 deletions.
116 changes: 67 additions & 49 deletions master/buildbot/changes/gerritchangesource.py
Expand Up @@ -13,15 +13,16 @@
#
# Copyright Buildbot Team Members

from collections import MutableMapping
from twisted.internet import reactor

from buildbot.changes import base
from buildbot.util import json
from buildbot import util
from twisted.python import log
from twisted.internet import defer
from twisted.internet.protocol import ProcessProtocol


class GerritChangeSource(base.ChangeSource):
"""This source will maintain a connection to gerrit ssh server
that will provide us gerrit events in json format."""
Expand All @@ -40,7 +41,11 @@ class GerritChangeSource(base.ChangeSource):
STREAM_BACKOFF_MAX = 60
"(seconds) maximum time to wait before retrying a failed connection"

def __init__(self, gerritserver, username, gerritport=29418, identity_file=None):
def __init__(self,
gerritserver,
username,
gerritport=29418,
identity_file=None):
"""
@type gerritserver: string
@param gerritserver: the dns or ip that host the gerrit ssh server,
Expand Down Expand Up @@ -75,7 +80,7 @@ def outReceived(self, data):
"""Do line buffering."""
self.data += data
lines = self.data.split("\n")
self.data = lines.pop(-1) # last line is either empty or incomplete
self.data = lines.pop(-1) # last line is either empty or incomplete
for line in lines:
log.msg("gerrit: %s" % (line,))
yield self.change_source.lineReceived(line)
Expand All @@ -93,62 +98,72 @@ def lineReceived(self, line):
log.msg("bad json line: %s" % (line,))
return defer.succeed(None)

if not(type(event) == type({}) and "type" in event):
if not(isinstance(event, MutableMapping) and "type" in event):
log.msg("no type in event %s" % (line,))
return defer.succeed(None)
func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None)
if func == None:
func_name = "eventReceived_%s" % event["type"].replace("-", "_")
func = getattr(self, func_name, None)
if func is None:
log.msg("unsupported event %s" % (event["type"],))
return defer.succeed(None)

# flatten the event dictionary, for easy access with WithProperties
def flatten(properties, base, event):
for k, v in event.items():
if type(v) == dict:
flatten(properties, base + "." + k, v)
else: # already there
properties[base + "." + k] = v
name = "{0}.{1}".format(base, k)
if isinstance(v, MutableMapping):
flatten(properties, name, v)
else: # already there
properties[name] = v

properties = {}
flatten(properties, "event", event)
return func(properties,event)
return func(properties, event)

def addChange(self, chdict):
d = self.master.addChange(**chdict)
# eat failures..
d.addErrback(log.err, 'error adding change from GerritChangeSource')
return d

def eventReceived_patchset_created(self, properties, event):
change = event["change"]
return self.addChange(dict(
author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]),
project=change["project"],
repository="ssh://%s@%s:%s/%s" % (
self.username, self.gerritserver, self.gerritport, change["project"]),
branch=change["branch"]+"/"+change["number"],
revision=event["patchSet"]["revision"],
revlink=change["url"],
comments=change["subject"],
files=["unknown"],
category=event["type"],
properties=properties))
author="%s <%s>" % (
change["owner"]["name"], change["owner"]["email"]),
project=change["project"],
repository="ssh://%s@%s:%s/%s" % (
self.username, self.gerritserver,
self.gerritport, change["project"]),
branch="{0}/{1}".format(
change["branch"], change["number"]),
revision=event["patchSet"]["revision"],
revlink=change["url"],
comments=change["subject"],
files=["unknown"],
category=event["type"],
properties=properties))

def eventReceived_ref_updated(self, properties, event):
ref = event["refUpdate"]
author = "gerrit"

if "submitter" in event:
author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"])
author = "%s <%s>" % (
event["submitter"]["name"], event["submitter"]["email"])

return self.addChange(dict(
author=author,
project=ref["project"],
repository="ssh://%s@%s:%s/%s" % (
self.username, self.gerritserver, self.gerritport, ref["project"]),
branch=ref["refName"],
revision=ref["newRev"],
comments="Gerrit: patchset(s) merged.",
files=["unknown"],
category=event["type"],
properties=properties))
author=author,
project=ref["project"],
repository="ssh://%s@%s:%s/%s" % (
self.username, self.gerritserver,
self.gerritport, ref["project"]),
branch=ref["refName"],
revision=ref["newRev"],
comments="Gerrit: patchset(s) merged.",
files=["unknown"],
category=event["type"],
properties=properties))

def streamProcessStopped(self):
self.process = None
Expand All @@ -160,27 +175,32 @@ def streamProcessStopped(self):

now = util.now()
if now - self.lastStreamProcessStart < self.STREAM_GOOD_CONNECTION_TIME:
# bad startup; start the stream process again after a timeout, and then
# increase the timeout
log.msg("'gerrit stream-events' failed; restarting after %ds" % round(self.streamProcessTimeout))
reactor.callLater(self.streamProcessTimeout, self.startStreamProcess)
# bad startup; start the stream process again after a timeout,
# and then increase the timeout
log.msg(
"'gerrit stream-events' failed; restarting after %ds"
% round(self.streamProcessTimeout))
reactor.callLater(
self.streamProcessTimeout, self.startStreamProcess)
self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT
if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX:
self.streamProcessTimeout = self.STREAM_BACKOFF_MAX
else:
# good startup, but lost connection; restart immediately, and set the timeout
# to its minimum
# good startup, but lost connection; restart immediately,
# and set the timeout to its minimum
self.startStreamProcess()
self.streamProcessTimeout = self.STREAM_BACKOFF_MIN

def startStreamProcess(self):
log.msg("starting 'gerrit stream-events'")
self.lastStreamProcessStart = util.now()
args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)]
uri = "{0}@{1}".format(self.username, self.gerritserver)
args = [uri, "-p", str(self.gerritport)]
if self.identity_file is not None:
args = args + [ '-i', self.identity_file ]
self.process = reactor.spawnProcess(self.LocalPP(self), "ssh",
[ "ssh" ] + args + [ "gerrit", "stream-events" ])
args = args + ['-i', self.identity_file]
self.process = reactor.spawnProcess(
self.LocalPP(self), "ssh",
["ssh"] + args + ["gerrit", "stream-events"])

def startService(self):
self.wantProcess = True
Expand All @@ -190,15 +210,13 @@ def stopService(self):
self.wantProcess = False
if self.process:
self.process.signalProcess("KILL")
# TODO: if this occurs while the process is restarting, some exceptions may
# be logged, although things will settle down normally
# TODO: if this occurs while the process is restarting, some exceptions
# may be logged, although things will settle down normally
return base.ChangeSource.stopService(self)

def describe(self):
status = ""
if not self.process:
status = "[NOT CONNECTED - check log]"
str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' %
(self.username, self.gerritserver, status))
return str

return ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' %
(self.username, self.gerritserver, status))

0 comments on commit 187e466

Please sign in to comment.