Skip to content

Commit

Permalink
Merge pull request #2444 from tardyp/SingletonService
Browse files Browse the repository at this point in the history
Singleton service
  • Loading branch information
tardyp committed Oct 13, 2016
2 parents f7be7bc + e3c6b05 commit 62dcf0c
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 69 deletions.
58 changes: 58 additions & 0 deletions master/buildbot/test/unit/test_util_service.py
Expand Up @@ -713,3 +713,61 @@ def testConfigDict(self):
'kwargs': {'a': 2},
'name': 'basic'}],
'name': 'services'})


class UnderTestSharedService(service.SharedService):
def __init__(self, arg1=None):
service.SharedService.__init__(self)


class UnderTestDependentService(service.AsyncService):
@defer.inlineCallbacks
def startService(self):
self.dependent = yield UnderTestSharedService.getService(self.parent)

def stopService(self):
assert self.dependent.running


class SharedService(unittest.SynchronousTestCase):
def test_bad_constructor(self):
parent = service.AsyncMultiService()
self.failureResultOf(UnderTestSharedService.getService(parent, arg2="foo"))

def test_creation(self):
parent = service.AsyncMultiService()
r = self.successResultOf(UnderTestSharedService.getService(parent))
r2 = self.successResultOf(UnderTestSharedService.getService(parent))
r3 = self.successResultOf(UnderTestSharedService.getService(parent, "arg1"))
r4 = self.successResultOf(UnderTestSharedService.getService(parent, "arg1"))
self.assertIdentical(r, r2)
self.assertNotIdentical(r, r3)
self.assertIdentical(r3, r4)
self.assertEqual(len(list(iter(parent))), 2)

def test_startup(self):
"""the service starts when parent starts and stop"""
parent = service.AsyncMultiService()
r = self.successResultOf(UnderTestSharedService.getService(parent))
self.assertEqual(r.running, 0)
self.successResultOf(parent.startService())
self.assertEqual(r.running, 1)
self.successResultOf(parent.stopService())
self.assertEqual(r.running, 0)

def test_already_started(self):
"""the service starts during the getService if parent already started"""
parent = service.AsyncMultiService()
self.successResultOf(parent.startService())
r = self.successResultOf(UnderTestSharedService.getService(parent))
self.assertEqual(r.running, 1)
# then we stop the parent, and the shared service stops
self.successResultOf(parent.stopService())
self.assertEqual(r.running, 0)

def test_already_stopped_last(self):
parent = service.AsyncMultiService()
o = UnderTestDependentService()
o.setServiceParent(parent)
self.successResultOf(parent.startService())
self.successResultOf(parent.stopService())
10 changes: 3 additions & 7 deletions master/buildbot/test/unit/test_worker_hyper.py
Expand Up @@ -58,7 +58,7 @@ def setUp(self):

def tearDown(self):
if self.worker is not None:
self.worker.stopService()
self.worker.master.stopService()
self.reactor.pump([.1])
self.assertIsNone(hyper.Client.instance)
_setReactor(None)
Expand All @@ -67,7 +67,6 @@ def test_constructor_normal(self):
worker = HyperLatentWorker('bot', 'pass', 'tcp://hyper.sh/', 'foo', 'bar', 'debian:wheezy')
# class instanciation configures nothing
self.assertEqual(worker.client, None)
self.assertEqual(worker.client_args, None)

def test_constructor_nohyper(self):
self.patch(workerhyper, 'Hyper', None)
Expand All @@ -85,15 +84,13 @@ def makeWorker(self, **kwargs):
master = fakemaster.make_master(testcase=self, wantData=True)
worker.setServiceParent(master)
worker.reactor = self.reactor
self.successResultOf(worker.startService())
self.successResultOf(master.startService())
return worker

def test_start_service(self):
worker = self.worker = self.makeWorker()
self.assertEqual(worker.client_args, {'clouds': {'tcp://hyper.sh/': {
'secretkey': 'bar', 'accesskey': 'foo'}}})
# client is lazily created on worker substantiation
self.assertEqual(worker.client, None)
self.assertNotEqual(worker.client, None)

def test_start_worker(self):
worker = self.makeWorker()
Expand Down Expand Up @@ -131,7 +128,6 @@ def test_start_worker_but_error(self):

def test_start_worker_but_already_created_with_same_name(self):
worker = self.makeWorker(image="cool")
worker.maybeCreateSingletons()
worker.client.create_container(image="foo", name=worker.getContainerName())
d = worker.substantiate(None, FakeBuild())
self.reactor.advance(.1)
Expand Down
51 changes: 49 additions & 2 deletions master/buildbot/util/service.py
@@ -1,4 +1,4 @@
# This file is part of Buildbot. Buildbot is free software: you can
# This file is part of Buildbot. Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
Expand All @@ -14,9 +14,12 @@
# Copyright Buildbot Team Members
from future.utils import itervalues

import hashlib

from twisted.application import service
from twisted.internet import defer
from twisted.internet import task
from twisted.python import failure
from twisted.python import log
from twisted.python import reflect

Expand Down Expand Up @@ -71,7 +74,10 @@ class AsyncMultiService(AsyncService, service.MultiService):
def startService(self):
service.Service.startService(self)
l = []
for svc in self:
# if a service attaches another service during the reconfiguration
# then the service will be started twice, so we dont use iter, but rather
# copy in a list
for svc in list(self):
# handle any deferreds, passing up errors and success
l.append(defer.maybeDeferred(svc.startService))
return defer.gatherResults(l, consumeErrors=True)
Expand Down Expand Up @@ -110,6 +116,47 @@ def master(self):
return self


class SharedService(AsyncMultiService):
"""a service that is created only once per parameter set in a parent service"""

@classmethod
def getService(cls, parent, *args, **kwargs):
name = cls.getName(*args, **kwargs)
if name in parent.namedServices:
return defer.succeed(parent.namedServices[name])
try:
instance = cls(*args, **kwargs)
except Exception:
# we transform all exceptions into failure
return defer.fail(failure.Failure())
# The class is not required to initialized its name
# but we use the name to identify the instance in the parent service
# so we force it with the name we used
instance.name = name
d = instance.setServiceParent(parent)

@d.addCallback
def returnInstance(res):
# we put the service on top of the list, so that it is stopped the last
# This make sense as the shared service is used as a dependency
# for other service
parent.services.remove(instance)
parent.services.insert(0, instance)
# hook the return value to the instance object
return instance
return d

@classmethod
def getName(cls, *args, **kwargs):
_hash = hashlib.sha1()
for arg in args:
_hash.update(str(arg))
for k, v in kwargs.items():
_hash.update(str(k))
_hash.update(str(v))
return cls.__name__ + "_" + _hash.hexdigest()


class BuildbotService(AsyncMultiService, config.ConfiguredMixin, util.ComparableMixin,
ReconfigurableServiceMixin):
compare_attrs = ('name', '_config_args', '_config_kwargs')
Expand Down
115 changes: 58 additions & 57 deletions master/buildbot/worker/hyper.py
Expand Up @@ -28,6 +28,7 @@

from buildbot import config
from buildbot.interfaces import LatentWorkerFailedToSubstantiate
from buildbot.util import service
from buildbot.util.logger import Logger
from buildbot.worker import AbstractLatentWorker

Expand All @@ -41,17 +42,55 @@
log = Logger()


class HyperLatentManager(service.SharedService):
"""A shared service class that manages all the connections to the hyper cloud
There is one instance of this manager per host, accesskey, secretkey tuple.
This manager manages its own thread pull, as Hyper_sh is blocking.
You can change the maximum number of concurent access to hyper using
import buildbot.worker.hyper
buildbot.worker.hyper.HyperLatentManager.MAX_THREADS = 1
This feature is undocumented for now, as we are not sure if this is ideal API.
"""
MAX_THREADS = 5

def __init__(self, hyper_host, hyper_accesskey, hyper_secretkey):
service.SharedService.__init__(self)
# Prepare the parameters for the Docker Client object.
self._client_args = {'clouds': {
hyper_host: {
"accesskey": hyper_accesskey,
"secretkey": hyper_secretkey
}
}}

def startService(self):
self._threadPool = threadpool.ThreadPool(
minthreads=1, maxthreads=self.MAX_THREADS, name='hyper')
self._threadPool.start()
self._client = Hyper(self._client_args)

@property
def client(self):
return self._client

def stopService(self):
self.client.close()
return self._threadPool.stop()

def deferToThread(self, reactor, meth, *args, **kwargs):
return threads.deferToThreadPool(reactor, self._threadPool, meth, *args, **kwargs)


class HyperLatentWorker(AbstractLatentWorker):
"""hyper.sh is a docker CaaS company"""
instance = None
ALLOWED_SIZES = ['s1', 's2', 's3', 's4',
'm1', 'm2', 'm3', 'l1', 'l2', 'l3']
threadPool = None
client = None
image = None
reactor = global_reactor
client_args = None
class_singletons = {}

def checkConfig(self, name, password, hyper_host,
hyper_accesskey, hyper_secretkey, image, hyper_size="s3", masterFQDN=None, **kwargs):
Expand All @@ -72,32 +111,27 @@ def checkConfig(self, name, password, hyper_host,
config.error("Size is not valid %s vs %r".format(
hyper_size, self.ALLOWED_SIZES))

@property
def client(self):
if self.manager is None:
return None
return self.manager.client

@defer.inlineCallbacks
def reconfigService(self, name, password, hyper_host,
hyper_accesskey, hyper_secretkey, image, hyper_size="s3", masterFQDN=None, **kwargs):
AbstractLatentWorker.reconfigService(self, name, password, **kwargs)
self.confighash = hashlib.sha1(
hyper_host + hyper_accesskey + hyper_secretkey).hexdigest()[:6]
yield AbstractLatentWorker.reconfigService(self, name, password, **kwargs)

self.manager = yield HyperLatentManager.getService(self.master, hyper_host, hyper_accesskey,
hyper_secretkey)
self.masterhash = hashlib.sha1(self.master.name).hexdigest()[:6]
self.size = hyper_size

# Prepare the parameters for the Docker Client object.
self.client_args = {'clouds': {
hyper_host: {
"accesskey": hyper_accesskey,
"secretkey": hyper_secretkey
}
}}
self.image = image
if not masterFQDN: # also match empty string (for UI)
masterFQDN = socket.getfqdn()
self.masterFQDN = masterFQDN

@defer.inlineCallbacks
def stopService(self):
# stopService will call stop_instance if the worker was up.
yield AbstractLatentWorker.stopService(self)
yield self.maybeDeleteSingletons()

def createEnvironment(self):
result = {
"BUILDMASTER": self.masterFQDN,
Expand All @@ -111,49 +145,16 @@ def createEnvironment(self):
"BUILDMASTER_PORT"] = self.masterFQDN.split(":")
return result

def runInThread(self, meth, *args, **kwargs):
self.maybeCreateSingletons()
return threads.deferToThreadPool(self.reactor, self.threadPool, meth, *args, **kwargs)

def maybeCreateSingletons(self):
if self.threadPool is None:
key = self.confighash
if key not in HyperLatentWorker.class_singletons:
threadPool = threadpool.ThreadPool(
minthreads=10, maxthreads=10, name='hyper')
threadPool.start()
# simple reference counter to stop the pool when the last
# latent worker is stopped
threadPool.refs = 0
client = Hyper(self.client_args)
HyperLatentWorker.class_singletons[key] = (threadPool, client)
else:
threadPool, client = HyperLatentWorker.class_singletons[key]
threadPool.refs += 1
self.threadPool = threadPool
self.client = client

@defer.inlineCallbacks
def maybeDeleteSingletons(self):
if self.threadPool is not None:
self.threadPool.refs -= 1
if self.threadPool.refs == 0:
self.client.close()
yield self.threadPool.stop()
key = self.confighash
# if key in not in the singleton, there is a bug somewhere so
# this will raise
del HyperLatentWorker.class_singletons[key]

self.threadPool = self.client = None
def deferToThread(self, meth, *args, **kwargs):
return self.manager.deferToThread(self.reactor, meth, *args, **kwargs)

@defer.inlineCallbacks
def start_instance(self, build):
if self.instance is not None:
raise ValueError('instance active')

image = yield build.render(self.image)
yield self.runInThread(self._thd_start_instance, image)
yield self.deferToThread(self._thd_start_instance, image)
defer.returnValue(True)

def getContainerName(self):
Expand Down Expand Up @@ -198,7 +199,7 @@ def stop_instance(self, fast=False):
# instance never attached, and it's because, somehow, we never
# started.
return defer.succeed(None)
return self.runInThread(self._thd_stop_instance, fast)
return self.deferToThread(self._thd_stop_instance, fast)

@property
def shortid(self):
Expand Down

0 comments on commit 62dcf0c

Please sign in to comment.