Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Singleton service #2444

Merged
merged 8 commits into from Oct 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 58 additions & 0 deletions master/buildbot/test/unit/test_util_service.py
Expand Up @@ -713,3 +713,61 @@ def testConfigDict(self):
'kwargs': {'a': 2},
'name': 'basic'}],
'name': 'services'})


class UnderTestSharedService(service.SharedService):
def __init__(self, arg1=None):
service.SharedService.__init__(self)


class UnderTestDependentService(service.AsyncService):
@defer.inlineCallbacks
def startService(self):
self.dependent = yield UnderTestSharedService.getService(self.parent)

def stopService(self):
assert self.dependent.running


class SharedService(unittest.SynchronousTestCase):
def test_bad_constructor(self):
parent = service.AsyncMultiService()
self.failureResultOf(UnderTestSharedService.getService(parent, arg2="foo"))

def test_creation(self):
parent = service.AsyncMultiService()
r = self.successResultOf(UnderTestSharedService.getService(parent))
r2 = self.successResultOf(UnderTestSharedService.getService(parent))
r3 = self.successResultOf(UnderTestSharedService.getService(parent, "arg1"))
r4 = self.successResultOf(UnderTestSharedService.getService(parent, "arg1"))
self.assertIdentical(r, r2)
self.assertNotIdentical(r, r3)
self.assertIdentical(r3, r4)
self.assertEqual(len(list(iter(parent))), 2)

def test_startup(self):
"""the service starts when parent starts and stop"""
parent = service.AsyncMultiService()
r = self.successResultOf(UnderTestSharedService.getService(parent))
self.assertEqual(r.running, 0)
self.successResultOf(parent.startService())
self.assertEqual(r.running, 1)
self.successResultOf(parent.stopService())
self.assertEqual(r.running, 0)

def test_already_started(self):
"""the service starts during the getService if parent already started"""
parent = service.AsyncMultiService()
self.successResultOf(parent.startService())
r = self.successResultOf(UnderTestSharedService.getService(parent))
self.assertEqual(r.running, 1)
# then we stop the parent, and the shared service stops
self.successResultOf(parent.stopService())
self.assertEqual(r.running, 0)

def test_already_stopped_last(self):
parent = service.AsyncMultiService()
o = UnderTestDependentService()
o.setServiceParent(parent)
self.successResultOf(parent.startService())
self.successResultOf(parent.stopService())
10 changes: 3 additions & 7 deletions master/buildbot/test/unit/test_worker_hyper.py
Expand Up @@ -58,7 +58,7 @@ def setUp(self):

def tearDown(self):
if self.worker is not None:
self.worker.stopService()
self.worker.master.stopService()
self.reactor.pump([.1])
self.assertIsNone(hyper.Client.instance)
_setReactor(None)
Expand All @@ -67,7 +67,6 @@ def test_constructor_normal(self):
worker = HyperLatentWorker('bot', 'pass', 'tcp://hyper.sh/', 'foo', 'bar', 'debian:wheezy')
# class instanciation configures nothing
self.assertEqual(worker.client, None)
self.assertEqual(worker.client_args, None)

def test_constructor_nohyper(self):
self.patch(workerhyper, 'Hyper', None)
Expand All @@ -85,15 +84,13 @@ def makeWorker(self, **kwargs):
master = fakemaster.make_master(testcase=self, wantData=True)
worker.setServiceParent(master)
worker.reactor = self.reactor
self.successResultOf(worker.startService())
self.successResultOf(master.startService())
return worker

def test_start_service(self):
worker = self.worker = self.makeWorker()
self.assertEqual(worker.client_args, {'clouds': {'tcp://hyper.sh/': {
'secretkey': 'bar', 'accesskey': 'foo'}}})
# client is lazily created on worker substantiation
self.assertEqual(worker.client, None)
self.assertNotEqual(worker.client, None)

def test_start_worker(self):
worker = self.makeWorker()
Expand Down Expand Up @@ -131,7 +128,6 @@ def test_start_worker_but_error(self):

def test_start_worker_but_already_created_with_same_name(self):
worker = self.makeWorker(image="cool")
worker.maybeCreateSingletons()
worker.client.create_container(image="foo", name=worker.getContainerName())
d = worker.substantiate(None, FakeBuild())
self.reactor.advance(.1)
Expand Down
51 changes: 49 additions & 2 deletions master/buildbot/util/service.py
@@ -1,4 +1,4 @@
# This file is part of Buildbot. Buildbot is free software: you can
# This file is part of Buildbot. Buildbot is free software: you can
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is probably removed by accident

# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
Expand All @@ -14,9 +14,12 @@
# Copyright Buildbot Team Members
from future.utils import itervalues

import hashlib

from twisted.application import service
from twisted.internet import defer
from twisted.internet import task
from twisted.python import failure
from twisted.python import log
from twisted.python import reflect

Expand Down Expand Up @@ -71,7 +74,10 @@ class AsyncMultiService(AsyncService, service.MultiService):
def startService(self):
service.Service.startService(self)
l = []
for svc in self:
# if a service attaches another service during the reconfiguration
# then the service will be started twice, so we dont use iter, but rather
# copy in a list
for svc in list(self):
# handle any deferreds, passing up errors and success
l.append(defer.maybeDeferred(svc.startService))
return defer.gatherResults(l, consumeErrors=True)
Expand Down Expand Up @@ -110,6 +116,47 @@ def master(self):
return self


class SharedService(AsyncMultiService):
"""a service that is created only once per parameter set in a parent service"""

@classmethod
def getService(cls, parent, *args, **kwargs):
name = cls.getName(*args, **kwargs)
if name in parent.namedServices:
return defer.succeed(parent.namedServices[name])
try:
instance = cls(*args, **kwargs)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want catch all Exceptions here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we transforms all exceptions into failures

# we transform all exceptions into failure
return defer.fail(failure.Failure())
# The class is not required to initialized its name
# but we use the name to identify the instance in the parent service
# so we force it with the name we used
instance.name = name
d = instance.setServiceParent(parent)

@d.addCallback
def returnInstance(res):
# we put the service on top of the list, so that it is stopped the last
# This make sense as the shared service is used as a dependency
# for other service
parent.services.remove(instance)
parent.services.insert(0, instance)
# hook the return value to the instance object
return instance
return d

@classmethod
def getName(cls, *args, **kwargs):
_hash = hashlib.sha1()
for arg in args:
_hash.update(str(arg))
for k, v in kwargs.items():
_hash.update(str(k))
_hash.update(str(v))
return cls.__name__ + "_" + _hash.hexdigest()


class BuildbotService(AsyncMultiService, config.ConfiguredMixin, util.ComparableMixin,
ReconfigurableServiceMixin):
compare_attrs = ('name', '_config_args', '_config_kwargs')
Expand Down
115 changes: 58 additions & 57 deletions master/buildbot/worker/hyper.py
Expand Up @@ -28,6 +28,7 @@

from buildbot import config
from buildbot.interfaces import LatentWorkerFailedToSubstantiate
from buildbot.util import service
from buildbot.util.logger import Logger
from buildbot.worker import AbstractLatentWorker

Expand All @@ -41,17 +42,55 @@
log = Logger()


class HyperLatentManager(service.SharedService):
"""A shared service class that manages all the connections to the hyper cloud

There is one instance of this manager per host, accesskey, secretkey tuple.
This manager manages its own thread pull, as Hyper_sh is blocking.

You can change the maximum number of concurent access to hyper using

import buildbot.worker.hyper
buildbot.worker.hyper.HyperLatentManager.MAX_THREADS = 1
This feature is undocumented for now, as we are not sure if this is ideal API.
"""
MAX_THREADS = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use this value outside? if not _MAX_THREADS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is eventually designed to be overriden


def __init__(self, hyper_host, hyper_accesskey, hyper_secretkey):
service.SharedService.__init__(self)
# Prepare the parameters for the Docker Client object.
self._client_args = {'clouds': {
hyper_host: {
"accesskey": hyper_accesskey,
"secretkey": hyper_secretkey
}
}}

def startService(self):
self._threadPool = threadpool.ThreadPool(
minthreads=1, maxthreads=self.MAX_THREADS, name='hyper')
self._threadPool.start()
self._client = Hyper(self._client_args)

@property
def client(self):
return self._client

def stopService(self):
self.client.close()
return self._threadPool.stop()

def deferToThread(self, reactor, meth, *args, **kwargs):
return threads.deferToThreadPool(reactor, self._threadPool, meth, *args, **kwargs)


class HyperLatentWorker(AbstractLatentWorker):
"""hyper.sh is a docker CaaS company"""
instance = None
ALLOWED_SIZES = ['s1', 's2', 's3', 's4',
'm1', 'm2', 'm3', 'l1', 'l2', 'l3']
threadPool = None
client = None
image = None
reactor = global_reactor
client_args = None
class_singletons = {}

def checkConfig(self, name, password, hyper_host,
hyper_accesskey, hyper_secretkey, image, hyper_size="s3", masterFQDN=None, **kwargs):
Expand All @@ -72,32 +111,27 @@ def checkConfig(self, name, password, hyper_host,
config.error("Size is not valid %s vs %r".format(
hyper_size, self.ALLOWED_SIZES))

@property
def client(self):
if self.manager is None:
return None
return self.manager.client

@defer.inlineCallbacks
def reconfigService(self, name, password, hyper_host,
hyper_accesskey, hyper_secretkey, image, hyper_size="s3", masterFQDN=None, **kwargs):
AbstractLatentWorker.reconfigService(self, name, password, **kwargs)
self.confighash = hashlib.sha1(
hyper_host + hyper_accesskey + hyper_secretkey).hexdigest()[:6]
yield AbstractLatentWorker.reconfigService(self, name, password, **kwargs)

self.manager = yield HyperLatentManager.getService(self.master, hyper_host, hyper_accesskey,
hyper_secretkey)
self.masterhash = hashlib.sha1(self.master.name).hexdigest()[:6]
self.size = hyper_size

# Prepare the parameters for the Docker Client object.
self.client_args = {'clouds': {
hyper_host: {
"accesskey": hyper_accesskey,
"secretkey": hyper_secretkey
}
}}
self.image = image
if not masterFQDN: # also match empty string (for UI)
masterFQDN = socket.getfqdn()
self.masterFQDN = masterFQDN

@defer.inlineCallbacks
def stopService(self):
# stopService will call stop_instance if the worker was up.
yield AbstractLatentWorker.stopService(self)
yield self.maybeDeleteSingletons()

def createEnvironment(self):
result = {
"BUILDMASTER": self.masterFQDN,
Expand All @@ -111,49 +145,16 @@ def createEnvironment(self):
"BUILDMASTER_PORT"] = self.masterFQDN.split(":")
return result

def runInThread(self, meth, *args, **kwargs):
self.maybeCreateSingletons()
return threads.deferToThreadPool(self.reactor, self.threadPool, meth, *args, **kwargs)

def maybeCreateSingletons(self):
if self.threadPool is None:
key = self.confighash
if key not in HyperLatentWorker.class_singletons:
threadPool = threadpool.ThreadPool(
minthreads=10, maxthreads=10, name='hyper')
threadPool.start()
# simple reference counter to stop the pool when the last
# latent worker is stopped
threadPool.refs = 0
client = Hyper(self.client_args)
HyperLatentWorker.class_singletons[key] = (threadPool, client)
else:
threadPool, client = HyperLatentWorker.class_singletons[key]
threadPool.refs += 1
self.threadPool = threadPool
self.client = client

@defer.inlineCallbacks
def maybeDeleteSingletons(self):
if self.threadPool is not None:
self.threadPool.refs -= 1
if self.threadPool.refs == 0:
self.client.close()
yield self.threadPool.stop()
key = self.confighash
# if key in not in the singleton, there is a bug somewhere so
# this will raise
del HyperLatentWorker.class_singletons[key]

self.threadPool = self.client = None
def deferToThread(self, meth, *args, **kwargs):
return self.manager.deferToThread(self.reactor, meth, *args, **kwargs)

@defer.inlineCallbacks
def start_instance(self, build):
if self.instance is not None:
raise ValueError('instance active')

image = yield build.render(self.image)
yield self.runInThread(self._thd_start_instance, image)
yield self.deferToThread(self._thd_start_instance, image)
defer.returnValue(True)

def getContainerName(self):
Expand Down Expand Up @@ -198,7 +199,7 @@ def stop_instance(self, fast=False):
# instance never attached, and it's because, somehow, we never
# started.
return defer.succeed(None)
return self.runInThread(self._thd_stop_instance, fast)
return self.deferToThread(self._thd_stop_instance, fast)

@property
def shortid(self):
Expand Down