Skip to content

Commit

Permalink
More documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Neoklosch committed Jun 14, 2017
1 parent 662b4a1 commit d99dc4f
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 5 deletions.
70 changes: 68 additions & 2 deletions motey/communication/communication_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,25 @@


class CommunicationManager(object):
"""
This class acts as an facade for the communication endpoints like the ``MQTT server``, the ``API server`` and the
``ZeroMQ server``.
It covers all method calls and can start and stop the mentioned components.
"""

capability_event_stream = Subject()

def __init__(self, api_server, mqtt_server, zeromq_server):
"""
Constructor of the class.
:param api_server: DI injected.
:type api_server: motey.communication.apiserver.APIServer
:param mqtt_server: DI injected.
:type mqtt_server: motey.communication.mqttserver.MQTTServer
:param zeromq_server: DI injected.
:type zeromq_server: motey.communication.zeromq_server.ZeroMQServer
"""
self.api_server = api_server
self.mqtt_server = mqtt_server
self.zeromq_server = zeromq_server
Expand All @@ -15,18 +31,37 @@ def __init__(self, api_server, mqtt_server, zeromq_server):

@property
def after_capabilities_request(self):
"""
Facades the ``ZeroMQServer.after_capabilities_request_handler()`` getter.
Returns the handler which will be executed after a capability request was received.
:return: the handler which will be executed after a capability request was received.
"""
return self.zeromq_server.after_capabilities_request_handler

@after_capabilities_request.setter
def after_capabilities_request(self, handler):
"""
Facades the ``ZeroMQServer.after_capabilities_request`` setter.
Will set the handler which will be executed after a capability request was received.
:param handler: the handler which will be executed after a capability request was received.
"""
self.zeromq_server.after_capabilities_request_handler = handler

def start(self):
"""
Start all the connected communication components.
"""
self.api_server.start()
self.mqtt_server.start()
self.zeromq_server.start()

def stop(self):
"""
Stop all the connected communication components.
Will send out a mqtt message to remove the current node.
"""
self.zeromq_server.stop()
self.mqtt_server.remove_node(network_utils.get_own_ip())
self.mqtt_server.stop()
Expand All @@ -51,13 +86,44 @@ def __nodes_request_callback(self, client, userdata, message):
self.mqtt_server.publish_new_node(network_utils.get_own_ip())

def deploy_image(self, image):
"""
Facades the ``ZeroMQServer.deploy_image()`` method.
Will deploy an image to the node stored in the ``Image.node`` attribute.
:param image: Image to be deployed.
:type image: motey.models.image.Image
:return: the id of the deployed image or None if something went wrong.
"""
return self.zeromq_server.deploy_image(image)

def request_image_status(self, image):
"""
Facades the ``ZeroMQServer.request_image_status()`` method.
Request the status of an specific image instance or None if something went wrong.
:param image: Image to be used to get the status.
:type image: motey.models.image.Image
:return: the status of the image or None if something went wrong
"""
return self.zeromq_server.request_image_status(image)

def request_capabilities(self, node):
return self.zeromq_server.request_capabilities(node)
def request_capabilities(self, ip):
"""
Facades the ``ZeroMQServer.request_capabilities()`` method.
Will fetch the capabilities of a specific node and will return them.
:param ip: The ip of the node to be requested.
:type ip: str
:return: the capabilities of a specific node
"""
return self.zeromq_server.request_capabilities(ip)

def terminate_image(self, image):
"""
Facades the ``ZeroMQServer.terminate_image()`` method.
Will terminate an image instance.
:param image: the image instance to be terminated
:type image: motey.models.image.Image
"""
self.zeromq_server.terminate_image(image)
48 changes: 48 additions & 0 deletions motey/communication/zeromq_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,20 @@ def __init__(self, logger, valmanager):

@property
def after_capabilities_request(self):
"""
Returns the handler which will be executed after a capability request was received.
:return: the handler which will be executed after a capability request was received.
"""
return self.after_capabilities_request_handler

@after_capabilities_request.setter
def after_capabilities_request(self, handler):
"""
Will set the handler which will be executed after a capability request was received.
:param handler: the handler which will be executed after a capability request was received.
"""
self.after_capabilities_request_handler = handler

def start(self):
Expand Down Expand Up @@ -119,6 +129,13 @@ def __run_capabilities_replier_thread(self):
self.capabilities_replier.send_string(json.dumps([]))

def __run_deploy_image_replier_thread(self):
"""
Private function which is be executed after the start method is called.
The method will wait for an event where it is subscribed on.
After receiving an event the data will be parsed as JSON and validated.
Afterwards it will be used to instantiate an image instance.
Finally it will send out the id of the instantiated instance or None if something went wrong.
"""
while not self.stopped:
result = self.deploy_image_replier.recv_string()
image_id = None
Expand All @@ -132,11 +149,21 @@ def __run_deploy_image_replier_thread(self):
self.deploy_image_replier.send_string(image_id if image_id else '')

def __run_image_status_replier_thread(self):
"""
Private function which is be executed after the start method is called.
The method will wait for an event where it is subscribed on.
After receiving an event the status of an image instance will be returned.
"""
while not self.stopped:
result = self.image_status_replier.recv_string()
# TODO: send image status

def __run_image_termiate_thread(self):
"""
Private function which is be executed after the start method is called.
The method will wait for an event where it is subscribed on.
After receiving an event the image instance which matches the send id will be terminated.
"""
while not self.stopped:
image_id = self.image_terminate_replier.recv_string()
self.valmanager.terminate(instance_name=image_id, plugin_type='docker')
Expand Down Expand Up @@ -166,6 +193,14 @@ def request_capabilities(self, ip):
return json_capabilities

def deploy_image(self, image):
"""
Will deploy an image to the node stored in the ``Image.node`` attribute.
:param image: Image to be deployed.
:type image: motey.models.image.Image
:return: the id of the deployed image or None if something went wrong.
"""

if not image or not image.node:
return None

Expand All @@ -176,6 +211,13 @@ def deploy_image(self, image):
return external_image_id

def request_image_status(self, image):
"""
Request the status of an specific image instance or None if something went wrong.
:param image: Image to be used to get the status.
:type image: motey.models.image.Image
:return: the status of the image or None if something went wrong
"""
if not image or not image.id or not image.node:
return None

Expand All @@ -186,6 +228,12 @@ def request_image_status(self, image):
return external_image_status

def terminate_image(self, image):
"""
Will terminate an image instance.
:param image: the image instance to be terminated
:type image: motey.models.image.Image
"""
if not image or not image.id or not image.node:
return None

Expand Down
43 changes: 40 additions & 3 deletions motey/orchestrator/inter_node_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,28 @@

class InterNodeOrchestrator(object):
"""
This class orchestpassrates yaml blueprints.
It will start and stop virtual instances of images defined in the blueprint.
This class orchestrates services.
It will start and stop virtual instances of images defined in the service.
It also can communicate with other nodes to start instances there if the requirements does not fit with the
possibilities of the current node.
"""
def __init__(self, logger, valmanager, service_repository, labeling_repository, node_repository,
communication_manager):
"""
Instantiates the ``Logger``, the ``VALManagger``, ``ServiceRepository`` and subscribe to the blueprint endpoint.
Constructor of the class.
:param logger: DI injected
:type logger: motey.utils.logger.Logger
:param valmanager: DI injected
:type valmanager: motey.val.valmanager.VALManager
:param service_repository: DI injected
:type service_repository: motey.repositories.service_repository.ServiceRepository
:param labeling_repository: DI injected
:type labeling_repository: motey.labelingengine.LabelingEngine
:param node_repository: DI injected
:type node_repository: motey.repositories.node_repository.NodeRepository
:param communication_manager: DI injected
:type communication_manager: motey.communication.communication_manager.CommunicationManager
"""
self.logger = logger
self.valmanager = valmanager
Expand All @@ -34,6 +47,7 @@ def parse_local_blueprint_file(self, file_path):
Parse a local yaml file and start the virtual images defined in the blueprint.
:param file_path: Path to the local blueprint file.
:type file_path: str
"""
with open(file_path, 'r') as stream:
self.handle_blueprint(stream)
Expand All @@ -43,6 +57,7 @@ def instantiate_service(self, service):
Instantiate a service.
:param service: the service to be used.
:type service: motey.models.service.Service
"""
if service.action == Service.ServiceAction.ADD:
self.service_repository.add(dict(service))
Expand Down Expand Up @@ -79,12 +94,25 @@ def instantiate_service(self, service):
self.deploy_service(service=service)

def deploy_service(self, service):
"""
Deploy all images of a service to the related nodes.
:param service: the service which should be deployed
:type service: motey.models.service.Service
"""
for image in service.images:
image.id = self.communication_manager.deploy_image(image)
# store new image id
self.service_repository.update(dict(service))

def get_service_status(self, service):
"""
Retruns the service status.
:param service: the service which should be used
:type service: motey.models.service.Service
:return: the status of the service
"""
for image in service.images:
image_status = self.communication_manager.request_image_status(image)
# TODO: calculate service state based on instance states
Expand All @@ -94,7 +122,9 @@ def compare_capabilities(self, needed_capabilities, node_capabilities):
Compares two dicts with capabilities.
:param needed_capabilities: the capabilities to compare with
:type needed_capabilities: dict
:param node_capabilities: the capabilties to check
:type node_capabilities: dict
:return: True if all capabilities are fulfilled, otherwiese False
"""
for capability in needed_capabilities:
Expand All @@ -108,6 +138,13 @@ def compare_capabilities(self, needed_capabilities, node_capabilities):
return True

def find_node(self, image):
"""
Try to find a node in the cluster which can be used to deploy the given image.
:param image: the image to be used
:type image: motey.models.image.Image
:return: the IP of the node to be used or None if it does not found a node which fulfill all capabilities
"""
for node in self.node_repository.all():
capabilities = self.communication_manager.request_capabilities(node['ip'])
if self.compare_capabilities(needed_capabilities=image.capabilities, node_capabilities=capabilities):
Expand Down

0 comments on commit d99dc4f

Please sign in to comment.