Skip to content

Commit

Permalink
first use of singleton service for hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
tardyp committed Oct 11, 2016
1 parent eb8833e commit eb489bb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 59 deletions.
10 changes: 3 additions & 7 deletions master/buildbot/test/unit/test_worker_hyper.py
Expand Up @@ -58,7 +58,7 @@ def setUp(self):

def tearDown(self):
if self.worker is not None:
self.worker.stopService()
self.worker.master.stopService()
self.reactor.pump([.1])
self.assertIsNone(hyper.Client.instance)
_setReactor(None)
Expand All @@ -67,7 +67,6 @@ def test_constructor_normal(self):
worker = HyperLatentWorker('bot', 'pass', 'tcp://hyper.sh/', 'foo', 'bar', 'debian:wheezy')
# class instanciation configures nothing
self.assertEqual(worker.client, None)
self.assertEqual(worker.client_args, None)

def test_constructor_nohyper(self):
self.patch(workerhyper, 'Hyper', None)
Expand All @@ -85,15 +84,13 @@ def makeWorker(self, **kwargs):
master = fakemaster.make_master(testcase=self, wantData=True)
worker.setServiceParent(master)
worker.reactor = self.reactor
self.successResultOf(worker.startService())
self.successResultOf(master.startService())
return worker

def test_start_service(self):
worker = self.worker = self.makeWorker()
self.assertEqual(worker.client_args, {'clouds': {'tcp://hyper.sh/': {
'secretkey': 'bar', 'accesskey': 'foo'}}})
# client is lazily created on worker substantiation
self.assertEqual(worker.client, None)
self.assertNotEqual(worker.client, None)

def test_start_worker(self):
worker = self.makeWorker()
Expand Down Expand Up @@ -131,7 +128,6 @@ def test_start_worker_but_error(self):

def test_start_worker_but_already_created_with_same_name(self):
worker = self.makeWorker(image="cool")
worker.maybeCreateSingletons()
worker.client.create_container(image="foo", name=worker.getContainerName())
d = worker.substantiate(None, FakeBuild())
self.reactor.advance(.1)
Expand Down
107 changes: 55 additions & 52 deletions master/buildbot/worker/hyper.py
Expand Up @@ -28,6 +28,7 @@

from buildbot import config
from buildbot.interfaces import LatentWorkerFailedToSubstantiate
from buildbot.util import service
from buildbot.util.logger import Logger
from buildbot.worker import AbstractLatentWorker

Expand All @@ -41,16 +42,49 @@
log = Logger()


class HyperLatentManager(service.SingletonService):
"""A singleton class that manages all the connections to the hyper cloud
There is one instance of this manager per host, accesskey, secretkey tuple.
This manager manages its own thread pull, as Hyper_sh is blocking
"""
name = "HyperLatentManager"
MAX_THREADS = 5

def __init__(self, hyper_host, hyper_accesskey, hyper_secretkey):
service.SingletonService.__init__(self)
self.name = self.getName(hyper_host, hyper_accesskey, hyper_secretkey)
# Prepare the parameters for the Docker Client object.
self.client_args = {'clouds': {
hyper_host: {
"accesskey": hyper_accesskey,
"secretkey": hyper_secretkey
}
}}

def startService(self):
self.threadPool = threadpool.ThreadPool(
minthreads=1, maxthreads=self.MAX_THREADS, name='hyper')
self.threadPool.start()
self.client = Hyper(self.client_args)

def stopService(self):
self.client.close()
return self.threadPool.stop()

def runInThread(self, reactor, meth, *args, **kwargs):
return threads.deferToThreadPool(reactor, self.threadPool, meth, *args, **kwargs)


class HyperLatentWorker(AbstractLatentWorker):
"""hyper.sh is a docker CaaS company"""
instance = None
ALLOWED_SIZES = ['s1', 's2', 's3', 's4', 'm1', 'm2', 'm3', 'l1', 'l2', 'l3']
ALLOWED_SIZES = ['s1', 's2', 's3', 's4',
'm1', 'm2', 'm3', 'l1', 'l2', 'l3']
threadPool = None
client = None
image = None
reactor = global_reactor
client_args = None
class_singletons = {}

def checkConfig(self, name, password, hyper_host,
hyper_accesskey, hyper_secretkey, image, hyper_size="s3", masterFQDN=None, **kwargs):
Expand All @@ -68,33 +102,30 @@ def checkConfig(self, name, password, hyper_host,
" HyperLatentWorker")

if hyper_size not in self.ALLOWED_SIZES:
config.error("Size is not valid %s vs %r".format(hyper_size, self.ALLOWED_SIZES))
config.error("Size is not valid %s vs %r".format(
hyper_size, self.ALLOWED_SIZES))

@property
def client(self):
if self.manager is None:
return None
return self.manager.client

@defer.inlineCallbacks
def reconfigService(self, name, password, hyper_host,
hyper_accesskey, hyper_secretkey, image, hyper_size="s3", masterFQDN=None, **kwargs):
AbstractLatentWorker.reconfigService(self, name, password, **kwargs)
self.confighash = hashlib.sha1(hyper_host + hyper_accesskey + hyper_secretkey).hexdigest()[:6]
yield AbstractLatentWorker.reconfigService(self, name, password, **kwargs)

self.manager = yield HyperLatentManager.getService(self.master, hyper_host, hyper_accesskey,
hyper_secretkey)
self.masterhash = hashlib.sha1(self.master.name).hexdigest()[:6]
self.size = hyper_size

# Prepare the parameters for the Docker Client object.
self.client_args = {'clouds': {
hyper_host: {
"accesskey": hyper_accesskey,
"secretkey": hyper_secretkey
}
}}
self.image = image
if not masterFQDN: # also match empty string (for UI)
masterFQDN = socket.getfqdn()
self.masterFQDN = masterFQDN

@defer.inlineCallbacks
def stopService(self):
# stopService will call stop_instance if the worker was up.
yield AbstractLatentWorker.stopService(self)
yield self.maybeDeleteSingletons()

def createEnvironment(self):
result = {
"BUILDMASTER": self.masterFQDN,
Expand All @@ -104,41 +135,12 @@ def createEnvironment(self):
if self.registration is not None:
result["BUILDMASTER_PORT"] = str(self.registration.getPBPort())
if ":" in self.masterFQDN:
result["BUILDMASTER"], result["BUILDMASTER_PORT"] = self.masterFQDN.split(":")
result["BUILDMASTER"], result[
"BUILDMASTER_PORT"] = self.masterFQDN.split(":")
return result

def runInThread(self, meth, *args, **kwargs):
self.maybeCreateSingletons()
return threads.deferToThreadPool(self.reactor, self.threadPool, meth, *args, **kwargs)

def maybeCreateSingletons(self):
if self.threadPool is None:
key = self.confighash
if key not in HyperLatentWorker.class_singletons:
threadPool = threadpool.ThreadPool(minthreads=10, maxthreads=10, name='hyper')
threadPool.start()
# simple reference counter to stop the pool when the last latent worker is stopped
threadPool.refs = 0
client = Hyper(self.client_args)
HyperLatentWorker.class_singletons[key] = (threadPool, client)
else:
threadPool, client = HyperLatentWorker.class_singletons[key]
threadPool.refs += 1
self.threadPool = threadPool
self.client = client

@defer.inlineCallbacks
def maybeDeleteSingletons(self):
if self.threadPool is not None:
self.threadPool.refs -= 1
if self.threadPool.refs == 0:
self.client.close()
yield self.threadPool.stop()
key = self.confighash
# if key in not in the singleton, there is a bug somewhere so this will raise
del HyperLatentWorker.class_singletons[key]

self.threadPool = self.client = None
return self.manager.runInThread(self.reactor, meth, *args, **kwargs)

@defer.inlineCallbacks
def start_instance(self, build):
Expand All @@ -153,7 +155,8 @@ def getContainerName(self):
return ('%s-%s' % ('buildbot' + self.masterhash, self.workername)).replace("_", "-")

def _thd_cleanup_instance(self):
instances = self.client.containers(filters=dict(name=self.getContainerName()))
instances = self.client.containers(
filters=dict(name=self.getContainerName()))
for instance in instances:
self.client.remove_container(instance['Id'], v=True, force=True)

Expand Down

0 comments on commit eb489bb

Please sign in to comment.