Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ celerybeat.pid

# Environments
.env
.venv
.venv*
env/
venv/
ENV/
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ classifiers = [

dependencies = [
"aio-pika ~= 9.4.2",
"omotes-sdk-protocol ~= 0.0.8",
"omotes-sdk-protocol ~= 0.1.1",
"celery ~= 5.3.6",
"typing-extensions ~= 4.11.0"
#"streamcapture ~= 1.2.3", TODO: See #4, this is reenabled once streamcapture removes AGPL dependency
]

Expand Down Expand Up @@ -69,7 +70,7 @@ requires = [
enabled = true

[tool.pytest.ini_options]
addopts = "--cov=omotes_sdk --cov-report html --cov-report term-missing --cov-fail-under 25"
addopts = "--cov=omotes_sdk --cov-report html --cov-report term-missing --cov-fail-under 55"

[tool.coverage.run]
source = ["src"]
Expand Down
54 changes: 53 additions & 1 deletion src/omotes_sdk/internal/orchestrator/orchestrator_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
JobResult,
JobCancel,
)
from omotes_sdk_protocol.workflow_pb2 import RequestAvailableWorkflows
from omotes_sdk.internal.common.broker_interface import BrokerInterface
from omotes_sdk.config import RabbitMQConfig
from omotes_sdk.job import Job
Expand Down Expand Up @@ -56,7 +57,7 @@ def callback_on_new_job_wrapped(self, message: bytes) -> None:

@dataclass
class JobCancellationHandler:
"""Handler to setup callback for receiving job cancellations."""
"""Handler to set up callback for receiving job cancellations."""

callback_on_cancel_job: Callable[[JobCancel], None]
"""Callback to call when a cancellation is received."""
Expand All @@ -72,6 +73,24 @@ def callback_on_job_cancelled_wrapped(self, message: bytes) -> None:
self.callback_on_cancel_job(cancelled_job)


@dataclass
class RequestWorkflowsHandler:
"""Handler to set up callback for receiving available work flows requests."""

callback_on_request_workflows: Callable[[RequestAvailableWorkflows], None]
"""Callback to call when a request work flows is received."""

def callback_on_request_workflows_wrapped(self, message: bytes) -> None:
"""Prepare the `RequestAvailableWorkflows` message before passing them to the callback.

:param message: Serialized AMQP message containing a request work flow.
"""
request_available_workflows = RequestAvailableWorkflows()
request_available_workflows.ParseFromString(message)

self.callback_on_request_workflows(request_available_workflows)


class OrchestratorInterface:
"""RabbitMQ interface specifically for the orchestrator."""

Expand All @@ -94,6 +113,10 @@ def __init__(
def start(self) -> None:
"""Start the orchestrator interface."""
self.broker_if.start()
self.connect_to_request_available_workflows(
callback_on_request_workflows=self.request_workflows_handler
)
self.send_available_workflows()

def stop(self) -> None:
"""Stop the orchestrator interface."""
Expand Down Expand Up @@ -126,6 +149,19 @@ def connect_to_job_cancellations(
callback_on_message=callback_handler.callback_on_job_cancelled_wrapped,
)

def connect_to_request_available_workflows(
self, callback_on_request_workflows: Callable[[RequestAvailableWorkflows], None]
) -> None:
"""Connect to the request available workflows queue.

:param callback_on_request_workflows: Callback to handle workflow updates.
"""
callback_handler = RequestWorkflowsHandler(callback_on_request_workflows)
self.broker_if.add_queue_subscription(
OmotesQueueNames.request_available_workflows_queue_name(),
callback_on_message=callback_handler.callback_on_request_workflows_wrapped,
)

def send_job_progress_update(self, job: Job, progress_update: JobProgressUpdate) -> None:
"""Send a job progress update to the SDK.

Expand Down Expand Up @@ -155,3 +191,19 @@ def send_job_result(self, job: Job, result: JobResult) -> None:
self.broker_if.send_message_to(
OmotesQueueNames.job_results_queue_name(job), result.SerializeToString()
)

def request_workflows_handler(self, request_workflows: RequestAvailableWorkflows) -> None:
"""When an available work flows request is received from the SDK.

:param request_workflows: Request available work flows.
"""
logger.info("Received an available workflows request")
self.send_available_workflows()

def send_available_workflows(self) -> None:
"""Send the available workflows to the SDK."""
work_type_manager_pb = self.workflow_type_manager.to_pb_message()
self.broker_if.send_message_to(
OmotesQueueNames.available_workflows_queue_name(),
work_type_manager_pb.SerializeToString(),
)
70 changes: 62 additions & 8 deletions src/omotes_sdk/omotes_interface.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import threading
import uuid
from dataclasses import dataclass
from datetime import timedelta
from typing import Callable, Optional
from typing import Callable, Optional, Union
from google.protobuf.struct_pb2 import Struct

from omotes_sdk.internal.common.broker_interface import BrokerInterface
Expand All @@ -14,6 +15,7 @@
JobSubmission,
JobCancel,
)
from omotes_sdk_protocol.workflow_pb2 import AvailableWorkflows, RequestAvailableWorkflows
from omotes_sdk.job import Job
from omotes_sdk.queue_names import OmotesQueueNames
from omotes_sdk.types import ParamsDict
Expand Down Expand Up @@ -71,6 +73,12 @@ def callback_on_status_update_wrapped(self, message: bytes) -> None:
self.callback_on_status_update(self.job, status_update)


class UndefinedWorkflowsException(Exception):
"""Thrown if the workflows are needed but not defined yet."""

...


class UnknownWorkflowException(Exception):
"""Thrown if a job is submitted using an unknown workflow type."""

Expand All @@ -82,23 +90,34 @@ class OmotesInterface:

broker_if: BrokerInterface
"""Interface to RabbitMQ broker."""
workflow_type_manager: WorkflowTypeManager
"""Manager of all possible workflows."""
workflow_type_manager: Union[WorkflowTypeManager, None]
"""All available workflow types."""
_workflow_config_received: threading.Event
"""Event triggered when workflow configuration is received."""

def __init__(self, rabbitmq_config: RabbitMQConfig, possible_workflows: WorkflowTypeManager):
def __init__(
self,
rabbitmq_config: RabbitMQConfig,
):
"""Create the OMOTES interface.

NOTE: Needs to be started separately.

:param rabbitmq_config: RabbitMQ configuration how to connect to OMOTES.
:param possible_workflows: Container for all workflows which are expected to exist.
"""
self.broker_if = BrokerInterface(rabbitmq_config)
self.workflow_type_manager = possible_workflows
self.workflow_type_manager = None
self._workflow_config_received = threading.Event()

def start(self) -> None:
"""Start any other interfaces."""
"""Start any other interfaces and request available workflows."""
self.broker_if.start()
self.connect_to_available_workflows_updates()
self.request_available_workflows()

while not self._workflow_config_received.is_set():
logger.info("Waiting for workflow definitions to be received from the orchestrator...")
self._workflow_config_received.wait(timeout=5)

def stop(self) -> None:
"""Stop any other interfaces."""
Expand Down Expand Up @@ -198,7 +217,9 @@ def submit_job(
:return: The job handle which is created. This object needs to be saved persistently by the
program using this SDK in order to resume listening to jobs in progress after a restart.
"""
if not self.workflow_type_manager.workflow_exists(workflow_type):
if not self.workflow_type_manager or not self.workflow_type_manager.workflow_exists(
workflow_type
):
raise UnknownWorkflowException()

job = Job(id=uuid.uuid4(), workflow_type=workflow_type)
Expand Down Expand Up @@ -243,3 +264,36 @@ def cancel_job(self, job: Job) -> None:
self.broker_if.send_message_to(
OmotesQueueNames.job_cancel_queue_name(), message=cancel_msg.SerializeToString()
)

def connect_to_available_workflows_updates(self) -> None:
"""Connect to updates of the available workflows."""
self.broker_if.add_queue_subscription(
queue_name=OmotesQueueNames.available_workflows_queue_name(),
callback_on_message=self.callback_on_update_available_workflows,
)

def callback_on_update_available_workflows(self, message: bytes) -> None:
"""Parse a serialized AvailableWorkflows message and update workflow type manager.

:param message: Serialized message.
"""
available_workflows_pb = AvailableWorkflows()
available_workflows_pb.ParseFromString(message)
self.workflow_type_manager = WorkflowTypeManager.from_pb_message(available_workflows_pb)
self._workflow_config_received.set()
logger.info("Updated the available workflows")

def request_available_workflows(self) -> None:
"""Request the available workflows from the orchestrator."""
request_available_workflows_pb = RequestAvailableWorkflows()
self.broker_if.send_message_to(
OmotesQueueNames.request_available_workflows_queue_name(),
request_available_workflows_pb.SerializeToString(),
)

def get_workflow_type_manager(self) -> WorkflowTypeManager:
"""Get the available workflows."""
if self.workflow_type_manager:
return self.workflow_type_manager
else:
raise UndefinedWorkflowsException()
16 changes: 16 additions & 0 deletions src/omotes_sdk/queue_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,19 @@ def job_cancel_queue_name() -> str:
:return: The queue name.
"""
return "job_cancellations"

@staticmethod
def available_workflows_queue_name() -> str:
"""Generate the available work flows queue name.

:return: The queue name.
"""
return "available_workflows"

@staticmethod
def request_available_workflows_queue_name() -> str:
"""Generate the request available work flows queue name.

:return: The queue name.
"""
return "request_available_workflows"
Loading