From b15feea02f87d2407982934edfb2b7110fcd8ddc Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Fri, 21 Jun 2024 17:06:12 +0200 Subject: [PATCH 01/17] workflows in one place working --- pyproject.toml | 1 + .../orchestrator/orchestrator_interface.py | 45 ++++- src/omotes_sdk/omotes_interface.py | 52 ++++- src/omotes_sdk/queue_names.py | 16 ++ src/omotes_sdk/workflow_type.py | 188 +++++++++++++++++- 5 files changed, 289 insertions(+), 13 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3f15a11..5971fb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "aio-pika ~= 9.3.1", "omotes-sdk-protocol ~= 0.0.8", "celery ~= 5.3.6", + "typing-extensions ~= 4.11.0" #"streamcapture ~= 1.2.3", TODO: See #4, this is reenabled once streamcapture removes AGPL dependency ] diff --git a/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py b/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py index c1dbd3b..208a50a 100644 --- a/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py +++ b/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py @@ -10,6 +10,7 @@ JobResult, JobCancel, ) +from omotes_sdk_protocol.work_flow_pb2 import RequestAvailableWorkflows from omotes_sdk.internal.common.broker_interface import BrokerInterface from omotes_sdk.config import RabbitMQConfig from omotes_sdk.job import Job @@ -56,7 +57,7 @@ def callback_on_new_job_wrapped(self, message: bytes) -> None: @dataclass class JobCancellationHandler: - """Handler to setup callback for receiving job cancellations.""" + """Handler to set up callback for receiving job cancellations.""" callback_on_cancel_job: Callable[[JobCancel], None] """Callback to call when a cancellation is received.""" @@ -72,6 +73,24 @@ def callback_on_job_cancelled_wrapped(self, message: bytes) -> None: self.callback_on_cancel_job(cancelled_job) +@dataclass +class RequestWorkflowsHandler: + """Handler to set up callback for receiving available work flows requests.""" + + callback_on_request_work_flows: Callable[[RequestAvailableWorkflows], None] + """Callback to call when a request work flows is received.""" + + def callback_on_request_workflows_wrapped(self, message: bytes) -> None: + """Prepare the `RequestAvailableWorkflows` message before passing them to the callback. + + :param message: Serialized AMQP message containing a request work flow. + """ + request_available_work_flows = RequestAvailableWorkflows() + request_available_work_flows.ParseFromString(message) + + self.callback_on_request_work_flows(request_available_work_flows) + + class OrchestratorInterface: """RabbitMQ interface specifically for the orchestrator.""" @@ -126,6 +145,19 @@ def connect_to_job_cancellations( callback_on_message=callback_handler.callback_on_job_cancelled_wrapped, ) + def connect_to_request_available_workflows( + self, callback_on_request_workflows: Callable[[RequestAvailableWorkflows], None] + ) -> None: + """Connect to the request available workflows queue. + + :param callback_on_request_workflows: Callback to handle workflow updates. + """ + callback_handler = RequestWorkflowsHandler(callback_on_request_workflows) + self.broker_if.add_queue_subscription( + OmotesQueueNames.request_available_workflows_queue_name(), + callback_on_message=callback_handler.callback_on_request_workflows_wrapped, + ) + def send_job_progress_update(self, job: Job, progress_update: JobProgressUpdate) -> None: """Send a job progress update to the SDK. @@ -155,3 +187,14 @@ def send_job_result(self, job: Job, result: JobResult) -> None: self.broker_if.send_message_to( OmotesQueueNames.job_results_queue_name(job), result.SerializeToString() ) + + def send_available_workflows(self, workflow_type_manager: WorkflowTypeManager) -> None: + """Send the available workflows to the SDK. + + :param workflow_type_manager: Job to which the result belongs. + """ + work_type_manager_pb = workflow_type_manager.to_pb_message() + self.broker_if.send_message_to( + OmotesQueueNames.available_workflows_queue_name(), + work_type_manager_pb.SerializeToString(), + ) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 2785a92..6f76f05 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -14,10 +14,11 @@ JobSubmission, JobCancel, ) +from omotes_sdk_protocol.work_flow_pb2 import AvailableWorkflows, RequestAvailableWorkflows from omotes_sdk.job import Job from omotes_sdk.queue_names import OmotesQueueNames from omotes_sdk.types import ParamsDict -from omotes_sdk.workflow_type import WorkflowType, WorkflowTypeManager +from omotes_sdk.workflow_type import WorkflowType logger = logging.getLogger("omotes_sdk") @@ -82,23 +83,30 @@ class OmotesInterface: broker_if: BrokerInterface """Interface to RabbitMQ broker.""" - workflow_type_manager: WorkflowTypeManager - """Manager of all possible workflows.""" + callback_on_available_workflows_update: Callable[[AvailableWorkflows], None] + """Handler which is called when the available workflows are updated.""" - def __init__(self, rabbitmq_config: RabbitMQConfig, possible_workflows: WorkflowTypeManager): + def __init__( + self, + rabbitmq_config: RabbitMQConfig, + callback_on_available_workflows_update: Callable[[AvailableWorkflows], None], + ): """Create the OMOTES interface. NOTE: Needs to be started separately. :param rabbitmq_config: RabbitMQ configuration how to connect to OMOTES. - :param possible_workflows: Container for all workflows which are expected to exist. + :param callback_on_available_workflows_update: Handler which is called when the available + workflows are updated. """ self.broker_if = BrokerInterface(rabbitmq_config) - self.workflow_type_manager = possible_workflows + self.callback_on_available_workflows_update = callback_on_available_workflows_update def start(self) -> None: - """Start any other interfaces.""" + """Start any other interfaces and request available workflows.""" self.broker_if.start() + self.connect_to_available_workflows_updates() + self.request_available_workflows() def stop(self) -> None: """Stop any other interfaces.""" @@ -198,8 +206,10 @@ def submit_job( :return: The job handle which is created. This object needs to be saved persistently by the program using this SDK in order to resume listening to jobs in progress after a restart. """ - if not self.workflow_type_manager.workflow_exists(workflow_type): - raise UnknownWorkflowException() + # TODO workflow_type_manager no longer accessible here + # create instance here as well in 'callback_on_update_available_workflows_wrapped'? + # if not self.workflow_type_manager.workflow_exists(workflow_type): + # raise UnknownWorkflowException() job = Job(id=uuid.uuid4(), workflow_type=workflow_type) logger.info("Submitting job %s", job.id) @@ -243,3 +253,27 @@ def cancel_job(self, job: Job) -> None: self.broker_if.send_message_to( OmotesQueueNames.job_cancel_queue_name(), message=cancel_msg.SerializeToString() ) + + def callback_on_update_available_workflows_wrapped(self, message: bytes) -> None: + """Parse a serialized AvailableWorkflows message and call handler. + + :param message: Serialized message. + """ + available_workflows_pb = AvailableWorkflows() + available_workflows_pb.ParseFromString(message) + self.callback_on_available_workflows_update(available_workflows_pb) + + def connect_to_available_workflows_updates(self) -> None: + """Connect to updates of the available workflows.""" + self.broker_if.add_queue_subscription( + queue_name=OmotesQueueNames.available_workflows_queue_name(), + callback_on_message=self.callback_on_update_available_workflows_wrapped, + ) + + def request_available_workflows(self) -> None: + """Request the available workflows.""" + request_available_workflows_pb = RequestAvailableWorkflows() + self.broker_if.send_message_to( + OmotesQueueNames.request_available_workflows_queue_name(), + request_available_workflows_pb.SerializeToString(), + ) diff --git a/src/omotes_sdk/queue_names.py b/src/omotes_sdk/queue_names.py index e90a0d5..40065b6 100644 --- a/src/omotes_sdk/queue_names.py +++ b/src/omotes_sdk/queue_names.py @@ -48,3 +48,19 @@ def job_cancel_queue_name() -> str: :return: The queue name. """ return "job_cancellations" + + @staticmethod + def available_workflows_queue_name() -> str: + """Generate the available work flows queue name. + + :return: The queue name. + """ + return "available_workflows" + + @staticmethod + def request_available_workflows_queue_name() -> str: + """Generate the request available work flows queue name. + + :return: The queue name. + """ + return "request_available_workflows" diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index e2c25e5..3337a23 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -1,15 +1,108 @@ -from dataclasses import dataclass, field -from typing import List, Optional, Dict +from dataclasses import dataclass, field, fields, is_dataclass +from enum import Enum +from typing import List, Optional, Dict, Any +from typing_extensions import Self + +from omotes_sdk_protocol.work_flow_pb2 import ( + AvailableWorkflows, + Workflow, + WorkflowParameter as WorkflowParameterPb, + WorkflowParameterSchema as WorkflowParameterSchemaPb, +) + + +class ParameterType(Enum): + """Json forms supported types.""" + + STRING = "string" + INTEGER = "integer" + NUMBER = "number" + BOOLEAN = "boolean" + + +class ParameterStringFormat(Enum): + """Json forms supported string formats.""" + + TIME = "time" + DATE = "date" + DATETIME = "date-time" + + +@dataclass(eq=True, frozen=True) +class DataClassToDict: + """Enable dictionary creation.""" + + def to_dict(self) -> dict: + """Create dictionary from dataclass.""" + + def get_value(v: Any) -> Any: + """Get the proper value based on the type of data.""" + if v is None or v == [] or v == "": + return None + elif isinstance(v, list): + return [get_value(item) for item in v] + elif is_dataclass(v): + return v.to_dict() + elif isinstance(v, Enum): + return v.value + else: + return v + + return { + _field.name: get_value(getattr(self, _field.name)) + for _field in fields(self) + if get_value(getattr(self, _field.name)) is not None + } @dataclass(eq=True, frozen=True) -class WorkflowType: +class ParameterSchema(DataClassToDict): + """Define a json forms schema for a WorkflowParameter this SDK supports. + + This schema can be used directly by a frontend: https://jsonforms.io/. + If needed additional schema properties can be added. + """ + + type: ParameterType = field(hash=False, compare=False) + """Type of the parameter.""" + title: str | None = field(default=None, hash=True, compare=True) + """Optionally override the 'snake_case to text' 'key_name' (displayed above the input field).""" + description: str | None = field(default=None, hash=True, compare=True) + """Optional description (displayed below the input field).""" + default: str | None = field(default=None, hash=False, compare=False) + """Optional default value (number as string).""" + format: ParameterStringFormat | None = field(default=None, hash=False, compare=False) + """Optional format of a string type parameter (for date-time).""" + enum: list[str] | None = field(default=None, hash=False, compare=False) + """Optional multiple choice values of a string type parameter.""" + minimum: float | None = field(default=None, hash=False, compare=False) + """Optional minimum allowed value of an integer or number type parameter.""" + maximum: float | None = field(default=None, hash=False, compare=False) + """Optional maximum allowed value of an integer or number type parameter.""" + + +@dataclass(eq=True, frozen=True) +class WorkflowParameter(DataClassToDict): + """Define a workflow parameter this SDK supports.""" + + key_name: str = field(hash=True, compare=True) + """Key name for the parameter.""" + schema: ParameterSchema = field(hash=False, compare=False) + """json form schema for the parameter.""" + + +@dataclass(eq=True, frozen=True) +class WorkflowType(DataClassToDict): """Define a type of workflow this SDK supports.""" workflow_type_name: str = field(hash=True, compare=True) """Technical name for the workflow.""" workflow_type_description_name: str = field(hash=False, compare=False) """Human-readable name for the workflow.""" + workflow_parameters: List[WorkflowParameter] | None = field( + default=None, hash=False, compare=False + ) + """Optional list of non-ESDL workflow parameters.""" class WorkflowTypeManager: @@ -47,3 +140,92 @@ def workflow_exists(self, workflow: WorkflowType) -> bool: :return: If the workflow exists. """ return workflow.workflow_type_name in self._workflows + + def to_dict(self) -> dict: + """Generate a dictionary representation of the available workflows. + + :return: dictionary representation. + """ + available_work_flows_dict = {} + for workflow_name, workflow_type in self._workflows.items(): + available_work_flows_dict[workflow_name] = workflow_type.to_dict() + return available_work_flows_dict + + def to_pb_message(self) -> AvailableWorkflows: + """Generate a protobuf message containing the available workflows. + + :return: AvailableWorkflows protobuf message. + """ + available_work_flows_pb = AvailableWorkflows() + for _workflow in self._workflows.values(): + workflow_pb = Workflow( + type_name=_workflow.workflow_type_name, + type_description=_workflow.workflow_type_description_name, + ) + if _workflow.workflow_parameters: + for _parameter in _workflow.workflow_parameters: + parameter_pb = WorkflowParameterPb( + key_name=_parameter.key_name, + schema=WorkflowParameterSchemaPb( + type=_parameter.schema.type.value, + title=_parameter.schema.title, + description=_parameter.schema.description, + default=_parameter.schema.default, + format=( + _parameter.schema.format.value if _parameter.schema.format else None + ), + enum=_parameter.schema.enum, + minimum=_parameter.schema.minimum, + maximum=_parameter.schema.maximum, + ), + ) + workflow_pb.parameters.extend([parameter_pb]) + available_work_flows_pb.workflows.extend([workflow_pb]) + return available_work_flows_pb + + @classmethod + def from_pb_message(cls, available_work_flows_pb: AvailableWorkflows) -> Self: + """Create a WorkflowTypeManager instance from a protobuf message. + + :param available_work_flows_pb: protobuf message containing the available workflows. + :return: WorkflowTypeManager instance. + """ + workflow_types = [] + for workflow_pb in available_work_flows_pb.workflows: + workflow_parameters = [] + for parameter_pb in workflow_pb.parameters: + workflow_parameters.append( + WorkflowParameter( + key_name=parameter_pb.key_name, + schema=ParameterSchema( + type=ParameterType(parameter_pb.schema.type), + title=parameter_pb.schema.title, + description=parameter_pb.schema.description, + default=parameter_pb.schema.default, + format=( + ParameterStringFormat(parameter_pb.schema.format) + if parameter_pb.schema.HasField("format") + else None + ), + enum=parameter_pb.schema.enum, + minimum=( + parameter_pb.schema.minimum + if parameter_pb.schema.HasField("minimum") + else None # to deal with possible '0' value properly + ), + maximum=( + parameter_pb.schema.maximum + if parameter_pb.schema.HasField("maximum") + else None # to deal with possible '0' value properly + ), + ), + ) + ) + workflow_types.append( + WorkflowType( + workflow_type_name=workflow_pb.type_name, + workflow_type_description_name=workflow_pb.type_description, + workflow_parameters=workflow_parameters, + ) + ) + return cls(workflow_types) From 16a8c99d9b0d84b10502e95ab63131cde0f3b000 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 27 Jun 2024 15:19:17 +0200 Subject: [PATCH 02/17] add get_workflows for sdk, add tests --- pyproject.toml | 2 +- .../orchestrator/orchestrator_interface.py | 22 +- src/omotes_sdk/omotes_interface.py | 52 ++- src/omotes_sdk/workflow_type.py | 391 ++++++++++++------ .../test_config/workflow_config_happy.json | 49 +++ .../workflow_config_int_min_as_float.json | 49 +++ unit_test/test_workflow_type.py | 43 +- 7 files changed, 458 insertions(+), 150 deletions(-) create mode 100644 unit_test/test_config/workflow_config_happy.json create mode 100644 unit_test/test_config/workflow_config_int_min_as_float.json diff --git a/pyproject.toml b/pyproject.toml index 5971fb6..9f1167e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ requires = [ enabled = true [tool.pytest.ini_options] -addopts = "--cov=omotes_sdk --cov-report html --cov-report term-missing --cov-fail-under 25" +addopts = "--cov=omotes_sdk --cov-report html --cov-report term-missing --cov-fail-under 55" [tool.coverage.run] source = ["src"] diff --git a/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py b/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py index 208a50a..c06e86f 100644 --- a/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py +++ b/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py @@ -10,7 +10,7 @@ JobResult, JobCancel, ) -from omotes_sdk_protocol.work_flow_pb2 import RequestAvailableWorkflows +from omotes_sdk_protocol.workflow_pb2 import RequestAvailableWorkflows from omotes_sdk.internal.common.broker_interface import BrokerInterface from omotes_sdk.config import RabbitMQConfig from omotes_sdk.job import Job @@ -77,7 +77,7 @@ def callback_on_job_cancelled_wrapped(self, message: bytes) -> None: class RequestWorkflowsHandler: """Handler to set up callback for receiving available work flows requests.""" - callback_on_request_work_flows: Callable[[RequestAvailableWorkflows], None] + callback_on_request_workflows: Callable[[RequestAvailableWorkflows], None] """Callback to call when a request work flows is received.""" def callback_on_request_workflows_wrapped(self, message: bytes) -> None: @@ -85,10 +85,10 @@ def callback_on_request_workflows_wrapped(self, message: bytes) -> None: :param message: Serialized AMQP message containing a request work flow. """ - request_available_work_flows = RequestAvailableWorkflows() - request_available_work_flows.ParseFromString(message) + request_available_workflows = RequestAvailableWorkflows() + request_available_workflows.ParseFromString(message) - self.callback_on_request_work_flows(request_available_work_flows) + self.callback_on_request_workflows(request_available_workflows) class OrchestratorInterface: @@ -113,6 +113,10 @@ def __init__( def start(self) -> None: """Start the orchestrator interface.""" self.broker_if.start() + self.connect_to_request_available_workflows( + callback_on_request_workflows=self.request_workflows_handler + ) + self.send_available_workflows(self.workflow_type_manager) def stop(self) -> None: """Stop the orchestrator interface.""" @@ -188,6 +192,14 @@ def send_job_result(self, job: Job, result: JobResult) -> None: OmotesQueueNames.job_results_queue_name(job), result.SerializeToString() ) + def request_workflows_handler(self, request_workflows: RequestAvailableWorkflows) -> None: + """When an available work flows request is received from the SDK. + + :param request_workflows: Request available work flows. + """ + logger.info("Received an available workflows request") + self.send_available_workflows(self.workflow_type_manager) + def send_available_workflows(self, workflow_type_manager: WorkflowTypeManager) -> None: """Send the available workflows to the SDK. diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 6f76f05..46be0fd 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -1,4 +1,5 @@ import logging +import time import uuid from dataclasses import dataclass from datetime import timedelta @@ -14,11 +15,11 @@ JobSubmission, JobCancel, ) -from omotes_sdk_protocol.work_flow_pb2 import AvailableWorkflows, RequestAvailableWorkflows +from omotes_sdk_protocol.workflow_pb2 import AvailableWorkflows, RequestAvailableWorkflows from omotes_sdk.job import Job from omotes_sdk.queue_names import OmotesQueueNames from omotes_sdk.types import ParamsDict -from omotes_sdk.workflow_type import WorkflowType +from omotes_sdk.workflow_type import WorkflowType, WorkflowTypeManager logger = logging.getLogger("omotes_sdk") @@ -83,24 +84,22 @@ class OmotesInterface: broker_if: BrokerInterface """Interface to RabbitMQ broker.""" - callback_on_available_workflows_update: Callable[[AvailableWorkflows], None] - """Handler which is called when the available workflows are updated.""" + workflow_type_manager: WorkflowTypeManager | None + """All available workflow types.""" def __init__( self, rabbitmq_config: RabbitMQConfig, - callback_on_available_workflows_update: Callable[[AvailableWorkflows], None], ): """Create the OMOTES interface. NOTE: Needs to be started separately. :param rabbitmq_config: RabbitMQ configuration how to connect to OMOTES. - :param callback_on_available_workflows_update: Handler which is called when the available workflows are updated. """ self.broker_if = BrokerInterface(rabbitmq_config) - self.callback_on_available_workflows_update = callback_on_available_workflows_update + self.workflow_type_manager = None def start(self) -> None: """Start any other interfaces and request available workflows.""" @@ -108,6 +107,10 @@ def start(self) -> None: self.connect_to_available_workflows_updates() self.request_available_workflows() + while not self.workflow_type_manager: + logger.info("Waiting for workflow definitions to be received from the orchestrator...") + time.sleep(5) + def stop(self) -> None: """Stop any other interfaces.""" self.broker_if.stop() @@ -206,10 +209,10 @@ def submit_job( :return: The job handle which is created. This object needs to be saved persistently by the program using this SDK in order to resume listening to jobs in progress after a restart. """ - # TODO workflow_type_manager no longer accessible here - # create instance here as well in 'callback_on_update_available_workflows_wrapped'? - # if not self.workflow_type_manager.workflow_exists(workflow_type): - # raise UnknownWorkflowException() + if not self.workflow_type_manager or not self.workflow_type_manager.workflow_exists( + workflow_type + ): + raise UnknownWorkflowException() job = Job(id=uuid.uuid4(), workflow_type=workflow_type) logger.info("Submitting job %s", job.id) @@ -254,26 +257,31 @@ def cancel_job(self, job: Job) -> None: OmotesQueueNames.job_cancel_queue_name(), message=cancel_msg.SerializeToString() ) - def callback_on_update_available_workflows_wrapped(self, message: bytes) -> None: - """Parse a serialized AvailableWorkflows message and call handler. - - :param message: Serialized message. - """ - available_workflows_pb = AvailableWorkflows() - available_workflows_pb.ParseFromString(message) - self.callback_on_available_workflows_update(available_workflows_pb) - def connect_to_available_workflows_updates(self) -> None: """Connect to updates of the available workflows.""" self.broker_if.add_queue_subscription( queue_name=OmotesQueueNames.available_workflows_queue_name(), - callback_on_message=self.callback_on_update_available_workflows_wrapped, + callback_on_message=self.callback_on_update_available_workflows, ) + def callback_on_update_available_workflows(self, message: bytes) -> None: + """Parse a serialized AvailableWorkflows message and update workflow type manager. + + :param message: Serialized message. + """ + available_workflows_pb = AvailableWorkflows() + available_workflows_pb.ParseFromString(message) + self.workflow_type_manager = WorkflowTypeManager.from_pb_message(available_workflows_pb) + logger.info("Updated the available workflows") + def request_available_workflows(self) -> None: - """Request the available workflows.""" + """Request the available workflows from the orchestrator.""" request_available_workflows_pb = RequestAvailableWorkflows() self.broker_if.send_message_to( OmotesQueueNames.request_available_workflows_queue_name(), request_available_workflows_pb.SerializeToString(), ) + + def get_workflow_type_manager(self) -> WorkflowTypeManager | None: + """Get the available workflows.""" + return self.workflow_type_manager diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index 3337a23..887fcc5 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -1,98 +1,120 @@ -from dataclasses import dataclass, field, fields, is_dataclass -from enum import Enum -from typing import List, Optional, Dict, Any +import json +from dataclasses import dataclass, field +from datetime import datetime +from typing import List, Optional, Dict, Union from typing_extensions import Self -from omotes_sdk_protocol.work_flow_pb2 import ( +from omotes_sdk_protocol.workflow_pb2 import ( AvailableWorkflows, Workflow, WorkflowParameter as WorkflowParameterPb, - WorkflowParameterSchema as WorkflowParameterSchemaPb, + StringParameter as StringParameterPb, + StringEnum as StringEnumPb, + BooleanParameter as BooleanParameterPb, + IntegerParameter as IntegerParameterPb, + FloatParameter as FloatParameterPb, + DateTimeParameter as DateTimeParameterPb, ) -class ParameterType(Enum): - """Json forms supported types.""" +@dataclass(eq=True, frozen=True) +class WorkflowParameter: + """Define a workflow parameter this SDK supports.""" + + key_name: str = field(hash=True, compare=True) + """Key name for the parameter.""" + title: str | None = field(default=None, hash=True, compare=True) + """Optionally override the 'snake_case to text' 'key_name' (displayed above the input field).""" + description: str | None = field(default=None, hash=True, compare=True) + """Optional description (displayed below the input field).""" + + def __post_init__(self) -> None: + """Check parameter format.""" + for name, field_type in self.__annotations__.items(): + # Do not check dataclasses (like KeyDisplayPair) + # TODO better way of checking that 'field_type' is dataclass? + if "KeyDisplayPair" not in str(field_type) and not isinstance( + self.__dict__[name], field_type + ): + current_type = type(self.__dict__[name]) + raise TypeError( + f"The field `{name}` was assigned by `{current_type}` ('{self.__dict__[name]}')" + f" instead of `{field_type}`" + ) + + +@dataclass(eq=True, frozen=True) +class KeyDisplayPair: + """Define a key display pair this SDK supports.""" - STRING = "string" - INTEGER = "integer" - NUMBER = "number" - BOOLEAN = "boolean" + key_name: str = field(hash=True, compare=True) + """Key name.""" + display_name: str = field(hash=True, compare=True) + """Display name.""" -class ParameterStringFormat(Enum): - """Json forms supported string formats.""" +@dataclass(eq=True, frozen=True) +class StringParameter(WorkflowParameter): + """Define a string parameter this SDK supports.""" - TIME = "time" - DATE = "date" - DATETIME = "date-time" + type_name: str = "string" + """Parameter type name.""" + default: str | None = field(default=None, hash=False, compare=False) + """Optional default value.""" + enum_options: list[KeyDisplayPair] | None = field(default=None, hash=False, compare=False) + """Optional multiple choice values.""" @dataclass(eq=True, frozen=True) -class DataClassToDict: - """Enable dictionary creation.""" - - def to_dict(self) -> dict: - """Create dictionary from dataclass.""" - - def get_value(v: Any) -> Any: - """Get the proper value based on the type of data.""" - if v is None or v == [] or v == "": - return None - elif isinstance(v, list): - return [get_value(item) for item in v] - elif is_dataclass(v): - return v.to_dict() - elif isinstance(v, Enum): - return v.value - else: - return v - - return { - _field.name: get_value(getattr(self, _field.name)) - for _field in fields(self) - if get_value(getattr(self, _field.name)) is not None - } +class BooleanParameter(WorkflowParameter): + """Define a boolean parameter this SDK supports.""" + + type_name: str = "boolean" + """Parameter type name.""" + default: bool | None = field(default=None, hash=False, compare=False) + """Optional default value.""" @dataclass(eq=True, frozen=True) -class ParameterSchema(DataClassToDict): - """Define a json forms schema for a WorkflowParameter this SDK supports. +class IntegerParameter(WorkflowParameter): + """Define an integer parameter this SDK supports.""" - This schema can be used directly by a frontend: https://jsonforms.io/. - If needed additional schema properties can be added. - """ + type_name: str = "integer" + """Parameter type name.""" + default: int | None = field(default=None, hash=False, compare=False) + """Optional default value.""" + minimum: int | None = field(default=None, hash=False, compare=False) + """Optional minimum allowed value.""" + maximum: int | None = field(default=None, hash=False, compare=False) + """Optional maximum allowed value.""" - type: ParameterType = field(hash=False, compare=False) - """Type of the parameter.""" - title: str | None = field(default=None, hash=True, compare=True) - """Optionally override the 'snake_case to text' 'key_name' (displayed above the input field).""" - description: str | None = field(default=None, hash=True, compare=True) - """Optional description (displayed below the input field).""" - default: str | None = field(default=None, hash=False, compare=False) - """Optional default value (number as string).""" - format: ParameterStringFormat | None = field(default=None, hash=False, compare=False) - """Optional format of a string type parameter (for date-time).""" - enum: list[str] | None = field(default=None, hash=False, compare=False) - """Optional multiple choice values of a string type parameter.""" + +@dataclass(eq=True, frozen=True) +class FloatParameter(WorkflowParameter): + """Define a float parameter this SDK supports.""" + + type_name: str = "float" + """Parameter type name.""" + default: float | None = field(default=None, hash=False, compare=False) + """Optional default value.""" minimum: float | None = field(default=None, hash=False, compare=False) - """Optional minimum allowed value of an integer or number type parameter.""" + """Optional minimum allowed value.""" maximum: float | None = field(default=None, hash=False, compare=False) - """Optional maximum allowed value of an integer or number type parameter.""" + """Optional maximum allowed value.""" @dataclass(eq=True, frozen=True) -class WorkflowParameter(DataClassToDict): - """Define a workflow parameter this SDK supports.""" +class DateTimeParameter(WorkflowParameter): + """Define a datetime parameter this SDK supports.""" - key_name: str = field(hash=True, compare=True) - """Key name for the parameter.""" - schema: ParameterSchema = field(hash=False, compare=False) - """json form schema for the parameter.""" + type_name: str = "datetime" + """Parameter type name.""" + default: datetime | None = field(default=None, hash=False, compare=False) + """Optional default value.""" @dataclass(eq=True, frozen=True) -class WorkflowType(DataClassToDict): +class WorkflowType: """Define a type of workflow this SDK supports.""" workflow_type_name: str = field(hash=True, compare=True) @@ -141,22 +163,12 @@ def workflow_exists(self, workflow: WorkflowType) -> bool: """ return workflow.workflow_type_name in self._workflows - def to_dict(self) -> dict: - """Generate a dictionary representation of the available workflows. - - :return: dictionary representation. - """ - available_work_flows_dict = {} - for workflow_name, workflow_type in self._workflows.items(): - available_work_flows_dict[workflow_name] = workflow_type.to_dict() - return available_work_flows_dict - def to_pb_message(self) -> AvailableWorkflows: """Generate a protobuf message containing the available workflows. :return: AvailableWorkflows protobuf message. """ - available_work_flows_pb = AvailableWorkflows() + available_workflows_pb = AvailableWorkflows() for _workflow in self._workflows.values(): workflow_pb = Workflow( type_name=_workflow.workflow_type_name, @@ -166,61 +178,146 @@ def to_pb_message(self) -> AvailableWorkflows: for _parameter in _workflow.workflow_parameters: parameter_pb = WorkflowParameterPb( key_name=_parameter.key_name, - schema=WorkflowParameterSchemaPb( - type=_parameter.schema.type.value, - title=_parameter.schema.title, - description=_parameter.schema.description, - default=_parameter.schema.default, - format=( - _parameter.schema.format.value if _parameter.schema.format else None - ), - enum=_parameter.schema.enum, - minimum=_parameter.schema.minimum, - maximum=_parameter.schema.maximum, - ), + title=_parameter.title, + description=_parameter.description, ) + if isinstance(_parameter, StringParameter): + string_parameter = StringParameterPb(default=_parameter.default) + if _parameter.enum_options: + for _string_enum in _parameter.enum_options: + string_parameter.enum_options.extend( + [ + StringEnumPb( + key_name=_string_enum.key_name, + display_name=_string_enum.display_name, + ) + ] + ) + parameter_pb.string_parameter.CopyFrom(string_parameter) + elif isinstance(_parameter, BooleanParameter): + parameter_pb.boolean_parameter.CopyFrom( + BooleanParameterPb( + default=_parameter.default, + ) + ) + elif isinstance(_parameter, IntegerParameter): + parameter_pb.integer_parameter.CopyFrom( + IntegerParameterPb( + default=_parameter.default, + minimum=_parameter.minimum, + maximum=_parameter.maximum, + ) + ) + elif isinstance(_parameter, FloatParameter): + parameter_pb.float_parameter.CopyFrom( + FloatParameterPb( + default=_parameter.default, + minimum=_parameter.minimum, + maximum=_parameter.maximum, + ) + ) + elif isinstance(_parameter, DateTimeParameter): + if _parameter.default is None: + default_value = None + else: + default_value = _parameter.default.isoformat() + parameter_pb.datetime_parameter.CopyFrom( + DateTimeParameterPb(default=default_value) + ) + else: + raise NotImplementedError( + f"Parameter type {type(_parameter)} not supported" + ) workflow_pb.parameters.extend([parameter_pb]) - available_work_flows_pb.workflows.extend([workflow_pb]) - return available_work_flows_pb + available_workflows_pb.workflows.extend([workflow_pb]) + return available_workflows_pb @classmethod - def from_pb_message(cls, available_work_flows_pb: AvailableWorkflows) -> Self: + def from_pb_message(cls, available_workflows_pb: AvailableWorkflows) -> Self: """Create a WorkflowTypeManager instance from a protobuf message. - :param available_work_flows_pb: protobuf message containing the available workflows. + :param available_workflows_pb: protobuf message containing the available workflows. :return: WorkflowTypeManager instance. """ workflow_types = [] - for workflow_pb in available_work_flows_pb.workflows: - workflow_parameters = [] + for workflow_pb in available_workflows_pb.workflows: + workflow_parameters: list[WorkflowParameter] = [] for parameter_pb in workflow_pb.parameters: - workflow_parameters.append( - WorkflowParameter( - key_name=parameter_pb.key_name, - schema=ParameterSchema( - type=ParameterType(parameter_pb.schema.type), - title=parameter_pb.schema.title, - description=parameter_pb.schema.description, - default=parameter_pb.schema.default, - format=( - ParameterStringFormat(parameter_pb.schema.format) - if parameter_pb.schema.HasField("format") - else None - ), - enum=parameter_pb.schema.enum, - minimum=( - parameter_pb.schema.minimum - if parameter_pb.schema.HasField("minimum") - else None # to deal with possible '0' value properly - ), - maximum=( - parameter_pb.schema.maximum - if parameter_pb.schema.HasField("maximum") - else None # to deal with possible '0' value properly - ), - ), - ) + base_args = dict( + key_name=parameter_pb.key_name, + title=parameter_pb.title, + description=parameter_pb.description, ) + parameter_type_name = parameter_pb.WhichOneof("parameter_type") + if parameter_type_name is None: + raise TypeError(f"Parameter protobuf message with invalid type: {parameter_pb}") + else: + parameter_type = getattr(parameter_pb, parameter_type_name) + + parameter: Union[ + StringParameter, + BooleanParameter, + IntegerParameter, + FloatParameter, + DateTimeParameter, + ] + if isinstance(parameter_type, StringParameterPb): + parameter = StringParameter( + **base_args, default=parameter_type.default, enum_options=[] + ) + for enum_option_pb in parameter_type.enum_options: + if parameter.enum_options: + parameter.enum_options.append( + KeyDisplayPair( + key_name=enum_option_pb.key_name, + display_name=enum_option_pb.display_name, + ) + ) + elif isinstance(parameter_type, BooleanParameterPb): + parameter = BooleanParameter(**base_args, default=parameter_type.default) + elif isinstance(parameter_type, IntegerParameterPb): + parameter = IntegerParameter( + **base_args, + default=parameter_type.default, + minimum=( + parameter_type.minimum if parameter_type.HasField("minimum") else None + ), # protobuf has '0' default value for int instead of None + maximum=( + parameter_type.maximum if parameter_type.HasField("maximum") else None + ), # protobuf has '0' default value for int instead of None + ) + elif isinstance(parameter_type, FloatParameterPb): + parameter = FloatParameter( + **base_args, + default=parameter_type.default, + minimum=( + parameter_type.minimum if parameter_type.HasField("minimum") else None + ), # protobuf has '0' default value for float instead of None + maximum=( + parameter_type.maximum if parameter_type.HasField("maximum") else None + ), # protobuf has '0' default value for float instead of None + ) + elif isinstance(parameter_type, DateTimeParameterPb): + if parameter_type.HasField("default"): + try: + default = datetime.fromisoformat(parameter_type.default) + except TypeError: + raise TypeError( + f"Invalid default datetime format, should be string:" + f" {parameter_type.default}" + ) + except ValueError: + raise ValueError( + f"Invalid default datetime value: {parameter_type.default}" + ) + else: + default = None + parameter = DateTimeParameter(**base_args, default=default) + else: + raise NotImplementedError( + f"Protobuf parameter type {type(parameter_pb)} not supported" + ) + workflow_parameters.append(parameter) workflow_types.append( WorkflowType( workflow_type_name=workflow_pb.type_name, @@ -229,3 +326,55 @@ def from_pb_message(cls, available_work_flows_pb: AvailableWorkflows) -> Self: ) ) return cls(workflow_types) + + @classmethod + def from_json_config_file(cls, json_config_file_path: str) -> Self: + """Create a WorkflowTypeManager instance from a json configuration file. + + :param json_config_file_path: path to the json workflow configuration file. + :return: WorkflowTypeManager instance. + """ + with open(json_config_file_path, "r") as f: + json_config_dict = json.load(f) + workflow_types = [] + for _workflow in json_config_dict: + workflow_parameters = [] + if "workflow_parameters" in _workflow: + for _parameter in _workflow["workflow_parameters"]: + parameter_type = _parameter["parameter_type"] + _parameter.pop("parameter_type") + + parameter: WorkflowParameter + if parameter_type == StringParameter.type_name: + if "enum_options" in _parameter: + enum_options = [] + for enum_option in _parameter["enum_options"]: + enum_options.append( + KeyDisplayPair( + key_name=enum_option["key_name"], + display_name=enum_option["display_name"], + ) + ) + _parameter.pop("enum_options") + parameter = StringParameter(**_parameter, enum_options=enum_options) + else: + parameter = StringParameter(**_parameter) + elif parameter_type == BooleanParameter.type_name: + parameter = BooleanParameter(**_parameter) + elif parameter_type == IntegerParameter.type_name: + parameter = IntegerParameter(**_parameter) + elif parameter_type == FloatParameter.type_name: + parameter = FloatParameter(**_parameter) + elif parameter_type == DateTimeParameter.type_name: + parameter = DateTimeParameter(**_parameter) + else: + raise NotImplementedError(f"Parameter type {parameter_type} not supported") + workflow_parameters.append(parameter) + workflow_types.append( + WorkflowType( + workflow_type_name=_workflow["workflow_type_name"], + workflow_type_description_name=_workflow["workflow_type_description_name"], + workflow_parameters=workflow_parameters, + ) + ) + return cls(workflow_types) diff --git a/unit_test/test_config/workflow_config_happy.json b/unit_test/test_config/workflow_config_happy.json new file mode 100644 index 0000000..10404c7 --- /dev/null +++ b/unit_test/test_config/workflow_config_happy.json @@ -0,0 +1,49 @@ +[ + { + "workflow_type_name": "workflow_1", + "workflow_type_description_name": "High fidelity simulator" + }, + { + "workflow_type_name": "workflow_2", + "workflow_type_description_name": "Used for testing purposes. Should not be used in production environments.", + "workflow_parameters": [ + { + "parameter_type": "datetime", + "key_name": "start_time" + }, + { + "parameter_type": "datetime", + "key_name": "end_time" + }, + { + "parameter_type": "integer", + "key_name": "step_size_in_minutes", + "title": "This will override the 'capitalized, underscore to space', key_name", + "description": "Description/explanation.", + "default": 900, + "minimum": 1, + "maximum": 525600 + }, + { + "parameter_type": "integer", + "key_name": "number_of_steps", + "minimum": 2 + }, + { + "parameter_type": "string", + "key_name": "test_enum", + "description": "How to use enum options", + "enum_options": [ + { + "key_name": "key1", + "display_name": "Display name 1" + }, + { + "key_name": "key2", + "display_name": "Display name 2" + } + ] + } + ] + } +] \ No newline at end of file diff --git a/unit_test/test_config/workflow_config_int_min_as_float.json b/unit_test/test_config/workflow_config_int_min_as_float.json new file mode 100644 index 0000000..da8fc9f --- /dev/null +++ b/unit_test/test_config/workflow_config_int_min_as_float.json @@ -0,0 +1,49 @@ +[ + { + "workflow_type_name": "workflow_1", + "workflow_type_description_name": "High fidelity simulator" + }, + { + "workflow_type_name": "workflow_2", + "workflow_type_description_name": "Used for testing purposes. Should not be used in production environments.", + "workflow_parameters": [ + { + "parameter_type": "datetime", + "key_name": "start_time" + }, + { + "parameter_type": "datetime", + "key_name": "end_time" + }, + { + "parameter_type": "integer", + "key_name": "step_size_in_minutes", + "title": "This will override the 'capitalized, underscore to space', key_name", + "description": "Description/explanation.", + "default": 900, + "minimum": 1.5, + "maximum": 525600 + }, + { + "parameter_type": "integer", + "key_name": "number_of_steps", + "minimum": 2 + }, + { + "parameter_type": "string", + "key_name": "test_enum", + "description": "How to use enum options", + "enum_options": [ + { + "key_name": "key1", + "display_name": "Display name 1" + }, + { + "key_name": "key2", + "display_name": "Display name 2" + } + ] + } + ] + } +] \ No newline at end of file diff --git a/unit_test/test_workflow_type.py b/unit_test/test_workflow_type.py index eb0a9ab..b59f562 100644 --- a/unit_test/test_workflow_type.py +++ b/unit_test/test_workflow_type.py @@ -1,6 +1,6 @@ import unittest -from omotes_sdk.workflow_type import WorkflowType +from omotes_sdk.workflow_type import WorkflowType, WorkflowTypeManager class WorkflowTypeTest(unittest.TestCase): @@ -49,3 +49,44 @@ def test__hash__same_descript_but_different_id_should_have_different_hash(self) # Assert self.assertNotEqual(hash_1, hash_2) + + def test__from_json_config_file__happy(self) -> None: + # Arrange + workflow_json_file = "./unit_test/test_config/workflow_config_happy.json" + + # Act + workflow_type_manager = WorkflowTypeManager.from_json_config_file(workflow_json_file) + + # Assert + self.assertEqual(len(workflow_type_manager.get_all_workflows()), 2) + + def test__from_json_config_file__integer_minimum_as_float(self) -> None: + # Arrange + workflow_json_file = "./unit_test/test_config/workflow_config_int_min_as_float.json" + + # Act / Assert + with self.assertRaises(TypeError): + WorkflowTypeManager.from_json_config_file(workflow_json_file) + + def test__to_pb_message__happy(self) -> None: + # Arrange + workflow_json_file = "./unit_test/test_config/workflow_config_happy.json" + + # Act + workflow_type_manager = WorkflowTypeManager.from_json_config_file(workflow_json_file) + pb_message = workflow_type_manager.to_pb_message() + + # Assert + self.assertEqual(pb_message.workflows[0].type_name, "workflow_1") + + def test__from_pb_message__happy(self) -> None: + # Arrange + workflow_json_file = "./unit_test/test_config/workflow_config_happy.json" + + # Act + workflow_type_manager = WorkflowTypeManager.from_json_config_file(workflow_json_file) + pb_message = workflow_type_manager.to_pb_message() + workflow_type_manager_from_pb = WorkflowTypeManager.from_pb_message(pb_message) + + # Assert + self.assertEqual(len(workflow_type_manager_from_pb.get_all_workflows()), 2) From 1220eb23588ae9c50d8d317c5ad63f59891b2ba6 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 27 Jun 2024 15:38:15 +0200 Subject: [PATCH 03/17] add UndefinedWorkflowsException --- src/omotes_sdk/omotes_interface.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 46be0fd..4a564f3 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -73,6 +73,12 @@ def callback_on_status_update_wrapped(self, message: bytes) -> None: self.callback_on_status_update(self.job, status_update) +class UndefinedWorkflowsException(Exception): + """Thrown if the workflows are needed but not defined yet.""" + + ... + + class UnknownWorkflowException(Exception): """Thrown if a job is submitted using an unknown workflow type.""" @@ -282,6 +288,9 @@ def request_available_workflows(self) -> None: request_available_workflows_pb.SerializeToString(), ) - def get_workflow_type_manager(self) -> WorkflowTypeManager | None: + def get_workflow_type_manager(self) -> WorkflowTypeManager: """Get the available workflows.""" - return self.workflow_type_manager + if self.workflow_type_manager: + return self.workflow_type_manager + else: + raise UndefinedWorkflowsException() From 91e0d0c97e85cfff1e1cd3d836c23312ff19b5a6 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Tue, 2 Jul 2024 15:14:18 +0200 Subject: [PATCH 04/17] use workflows received event, add data conversion to workflow parameter classes --- src/omotes_sdk/omotes_interface.py | 10 +- src/omotes_sdk/workflow_type.py | 499 ++++++++++++++++++++--------- 2 files changed, 351 insertions(+), 158 deletions(-) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 4a564f3..63d3211 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -1,5 +1,5 @@ import logging -import time +import threading import uuid from dataclasses import dataclass from datetime import timedelta @@ -92,6 +92,8 @@ class OmotesInterface: """Interface to RabbitMQ broker.""" workflow_type_manager: WorkflowTypeManager | None """All available workflow types.""" + _workflow_config_received: threading.Event + """Event triggered when workflow configuration is received.""" def __init__( self, @@ -106,6 +108,7 @@ def __init__( """ self.broker_if = BrokerInterface(rabbitmq_config) self.workflow_type_manager = None + self._workflow_config_received = threading.Event() def start(self) -> None: """Start any other interfaces and request available workflows.""" @@ -113,9 +116,9 @@ def start(self) -> None: self.connect_to_available_workflows_updates() self.request_available_workflows() - while not self.workflow_type_manager: + while not self._workflow_config_received.is_set(): logger.info("Waiting for workflow definitions to be received from the orchestrator...") - time.sleep(5) + self._workflow_config_received.wait(timeout=5) def stop(self) -> None: """Stop any other interfaces.""" @@ -278,6 +281,7 @@ def callback_on_update_available_workflows(self, message: bytes) -> None: available_workflows_pb = AvailableWorkflows() available_workflows_pb.ParseFromString(message) self.workflow_type_manager = WorkflowTypeManager.from_pb_message(available_workflows_pb) + self._workflow_config_received.set() logger.info("Updated the available workflows") def request_available_workflows(self) -> None: diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index 887fcc5..b3a03d0 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -1,8 +1,9 @@ import json +from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime -from typing import List, Optional, Dict, Union -from typing_extensions import Self +from typing import List, Optional, Dict, Union, Any +from typing_extensions import Self, override from omotes_sdk_protocol.workflow_pb2 import ( AvailableWorkflows, @@ -18,7 +19,7 @@ @dataclass(eq=True, frozen=True) -class WorkflowParameter: +class WorkflowParameter(ABC): """Define a workflow parameter this SDK supports.""" key_name: str = field(hash=True, compare=True) @@ -27,13 +28,54 @@ class WorkflowParameter: """Optionally override the 'snake_case to text' 'key_name' (displayed above the input field).""" description: str | None = field(default=None, hash=True, compare=True) """Optional description (displayed below the input field).""" + type_name: str = "" + """Parameter type name, set in child class.""" + + @abstractmethod + def to_pb_message(self) -> Union[ + StringParameterPb, + BooleanParameterPb, + IntegerParameterPb, + FloatParameterPb, + DateTimeParameterPb, + ]: + """Abstract function to generate a protobuf message from this class. + + :return: Protobuf message representation. + """ + pass + + @classmethod + @abstractmethod + def from_pb_message( + cls, + parameter_pb: WorkflowParameterPb, + parameter_type_pb: Any, + ) -> Self: + """Abstract function to create a class instance from a protobuf message. + + :param parameter_pb: protobuf message containing the base parameters. + :param parameter_type_pb: protobuf message containing the parameter type parameters. + :return: class instance. + """ + pass + + @classmethod + @abstractmethod + def from_json_config(cls, json_config: dict) -> Self: + """Abstract function to create a class instance from json configuration. + + :param json_config: dictionary with configuration. + :return: class instance. + """ + pass def __post_init__(self) -> None: """Check parameter format.""" for name, field_type in self.__annotations__.items(): - # Do not check dataclasses (like KeyDisplayPair) + # Do not check dataclasses (like StringEnumOption) # TODO better way of checking that 'field_type' is dataclass? - if "KeyDisplayPair" not in str(field_type) and not isinstance( + if "StringEnumOption" not in str(field_type) and not isinstance( self.__dict__[name], field_type ): current_type = type(self.__dict__[name]) @@ -44,7 +86,7 @@ def __post_init__(self) -> None: @dataclass(eq=True, frozen=True) -class KeyDisplayPair: +class StringEnumOption: """Define a key display pair this SDK supports.""" key_name: str = field(hash=True, compare=True) @@ -61,9 +103,78 @@ class StringParameter(WorkflowParameter): """Parameter type name.""" default: str | None = field(default=None, hash=False, compare=False) """Optional default value.""" - enum_options: list[KeyDisplayPair] | None = field(default=None, hash=False, compare=False) + enum_options: list[StringEnumOption] | None = field(default=None, hash=False, compare=False) """Optional multiple choice values.""" + @override + def to_pb_message(self) -> StringParameterPb: + """Generate a protobuf message from this class. + + :return: Protobuf message representation. + """ + parameter_type_pb = StringParameterPb(default=self.default) + if self.enum_options: + for _string_enum in self.enum_options: + parameter_type_pb.enum_options.extend( + [ + StringEnumPb( + key_name=_string_enum.key_name, + display_name=_string_enum.display_name, + ) + ] + ) + return parameter_type_pb + + @classmethod + @override + def from_pb_message( + cls, parameter_pb: WorkflowParameterPb, parameter_type_pb: StringParameterPb + ) -> Self: + """Create a class instance from a protobuf message. + + :param parameter_pb: protobuf message containing the base parameters. + :param parameter_type_pb: protobuf message containing the parameter type parameters. + :return: class instance. + """ + parameter = cls( + key_name=parameter_pb.key_name, + title=parameter_pb.title, + description=parameter_pb.description, + default=parameter_type_pb.default, + enum_options=[], + ) + for enum_option_pb in parameter_type_pb.enum_options: + if parameter_type_pb.enum_options and parameter.enum_options is not None: + parameter.enum_options.append( + StringEnumOption( + key_name=enum_option_pb.key_name, + display_name=enum_option_pb.display_name, + ) + ) + return parameter + + @classmethod + @override + def from_json_config(cls, json_config: dict) -> Self: + """Create a class instance from json configuration. + + :param json_config: dictionary with configuration. + :return: class instance. + """ + if "enum_options" in json_config: + enum_options = [] + for enum_option in json_config["enum_options"]: + enum_options.append( + StringEnumOption( + key_name=enum_option["key_name"], + display_name=enum_option["display_name"], + ) + ) + json_config.pop("enum_options") + return cls(**json_config, enum_options=enum_options) + else: + return cls(**json_config) + @dataclass(eq=True, frozen=True) class BooleanParameter(WorkflowParameter): @@ -74,6 +185,42 @@ class BooleanParameter(WorkflowParameter): default: bool | None = field(default=None, hash=False, compare=False) """Optional default value.""" + @override + def to_pb_message(self) -> BooleanParameterPb: + """Generate a protobuf message from this class. + + :return: Protobuf message representation. + """ + return BooleanParameterPb(default=self.default) + + @classmethod + @override + def from_pb_message( + cls, parameter_pb: WorkflowParameterPb, parameter_type_pb: BooleanParameterPb + ) -> Self: + """Create a class instance from a protobuf message. + + :param parameter_pb: protobuf message containing the base parameters. + :param parameter_type_pb: protobuf message containing the parameter type parameters. + :return: class instance. + """ + return cls( + key_name=parameter_pb.key_name, + title=parameter_pb.title, + description=parameter_pb.description, + default=parameter_type_pb.default, + ) + + @classmethod + @override + def from_json_config(cls, json_config: dict) -> Self: + """Create a class instance from json configuration. + + :param json_config: dictionary with configuration. + :return: class instance. + """ + return cls(**json_config) + @dataclass(eq=True, frozen=True) class IntegerParameter(WorkflowParameter): @@ -88,6 +235,48 @@ class IntegerParameter(WorkflowParameter): maximum: int | None = field(default=None, hash=False, compare=False) """Optional maximum allowed value.""" + @override + def to_pb_message(self) -> IntegerParameterPb: + """Generate a protobuf message from this class. + + :return: Protobuf message representation. + """ + return IntegerParameterPb(default=self.default, minimum=self.minimum, maximum=self.maximum) + + @classmethod + @override + def from_pb_message( + cls, parameter_pb: WorkflowParameterPb, parameter_type_pb: IntegerParameterPb + ) -> Self: + """Create a class instance from a protobuf message. + + :param parameter_pb: protobuf message containing the base parameters. + :param parameter_type_pb: protobuf message containing the parameter type parameters. + :return: class instance. + """ + return cls( + key_name=parameter_pb.key_name, + title=parameter_pb.title, + description=parameter_pb.description, + default=parameter_type_pb.default, + minimum=( + parameter_type_pb.minimum if parameter_type_pb.HasField("minimum") else None + ), # protobuf has '0' default value for int instead of None + maximum=( + parameter_type_pb.maximum if parameter_type_pb.HasField("maximum") else None + ), # protobuf has '0' default value for int instead of None + ) + + @classmethod + @override + def from_json_config(cls, json_config: dict) -> Self: + """Create a class instance from json configuration. + + :param json_config: dictionary with configuration. + :return: class instance. + """ + return cls(**json_config) + @dataclass(eq=True, frozen=True) class FloatParameter(WorkflowParameter): @@ -102,6 +291,48 @@ class FloatParameter(WorkflowParameter): maximum: float | None = field(default=None, hash=False, compare=False) """Optional maximum allowed value.""" + @override + def to_pb_message(self) -> FloatParameterPb: + """Generate a protobuf message from this class. + + :return: Protobuf message representation. + """ + return FloatParameterPb(default=self.default, minimum=self.minimum, maximum=self.maximum) + + @classmethod + @override + def from_pb_message( + cls, parameter_pb: WorkflowParameterPb, parameter_type_pb: FloatParameterPb + ) -> Self: + """Create a class instance from a protobuf message. + + :param parameter_pb: protobuf message containing the base parameters. + :param parameter_type_pb: protobuf message containing the parameter type parameters. + :return: class instance. + """ + return cls( + key_name=parameter_pb.key_name, + title=parameter_pb.title, + description=parameter_pb.description, + default=parameter_type_pb.default, + minimum=( + parameter_type_pb.minimum if parameter_type_pb.HasField("minimum") else None + ), # protobuf has '0' default value for int instead of None + maximum=( + parameter_type_pb.maximum if parameter_type_pb.HasField("maximum") else None + ), # protobuf has '0' default value for int instead of None + ) + + @classmethod + @override + def from_json_config(cls, json_config: dict) -> Self: + """Create a class instance from json configuration. + + :param json_config: dictionary with configuration. + :return: class instance. + """ + return cls(**json_config) + @dataclass(eq=True, frozen=True) class DateTimeParameter(WorkflowParameter): @@ -112,6 +343,76 @@ class DateTimeParameter(WorkflowParameter): default: datetime | None = field(default=None, hash=False, compare=False) """Optional default value.""" + @override + def to_pb_message(self) -> DateTimeParameterPb: + """Generate a protobuf message from this class. + + :return: Protobuf message representation. + """ + if self.default is None: + default_value = None + else: + default_value = self.default.isoformat() + return DateTimeParameterPb(default=default_value) + + @classmethod + @override + def from_pb_message( + cls, parameter_pb: WorkflowParameterPb, parameter_type_pb: DateTimeParameterPb + ) -> Self: + """Create a class instance from a protobuf message. + + :param parameter_pb: protobuf message containing the base parameters. + :param parameter_type_pb: protobuf message containing the parameter type parameters. + :return: class instance. + """ + if parameter_type_pb.HasField("default"): + try: + default = datetime.fromisoformat(parameter_type_pb.default) + except TypeError: + raise TypeError( + f"Invalid default datetime format, should be string:" + f" {parameter_type_pb.default}" + ) + except ValueError: + raise ValueError(f"Invalid default datetime value: {parameter_type_pb.default}") + else: + default = None + return cls( + key_name=parameter_pb.key_name, + title=parameter_pb.title, + description=parameter_pb.description, + default=default, + ) + + @classmethod + @override + def from_json_config(cls, json_config: dict) -> Self: + """Create a class instance from json configuration. + + :param json_config: dictionary with configuration. + :return: class instance. + """ + return cls(**json_config) + + +PARAMETER_CLASS_TO_PB_CLASS: dict[ + type[WorkflowParameter], + Union[ + type[StringParameterPb], + type[BooleanParameterPb], + type[IntegerParameterPb], + type[FloatParameterPb], + type[DateTimeParameterPb], + ], +] = { + StringParameter: StringParameterPb, + BooleanParameter: BooleanParameterPb, + IntegerParameter: IntegerParameterPb, + FloatParameter: FloatParameterPb, + DateTimeParameter: DateTimeParameterPb, +} + @dataclass(eq=True, frozen=True) class WorkflowType: @@ -181,53 +482,20 @@ def to_pb_message(self) -> AvailableWorkflows: title=_parameter.title, description=_parameter.description, ) - if isinstance(_parameter, StringParameter): - string_parameter = StringParameterPb(default=_parameter.default) - if _parameter.enum_options: - for _string_enum in _parameter.enum_options: - string_parameter.enum_options.extend( - [ - StringEnumPb( - key_name=_string_enum.key_name, - display_name=_string_enum.display_name, - ) - ] - ) - parameter_pb.string_parameter.CopyFrom(string_parameter) - elif isinstance(_parameter, BooleanParameter): - parameter_pb.boolean_parameter.CopyFrom( - BooleanParameterPb( - default=_parameter.default, - ) - ) - elif isinstance(_parameter, IntegerParameter): - parameter_pb.integer_parameter.CopyFrom( - IntegerParameterPb( - default=_parameter.default, - minimum=_parameter.minimum, - maximum=_parameter.maximum, - ) - ) - elif isinstance(_parameter, FloatParameter): - parameter_pb.float_parameter.CopyFrom( - FloatParameterPb( - default=_parameter.default, - minimum=_parameter.minimum, - maximum=_parameter.maximum, - ) - ) - elif isinstance(_parameter, DateTimeParameter): - if _parameter.default is None: - default_value = None - else: - default_value = _parameter.default.isoformat() - parameter_pb.datetime_parameter.CopyFrom( - DateTimeParameterPb(default=default_value) - ) - else: - raise NotImplementedError( - f"Parameter type {type(_parameter)} not supported" - ) + parameter_type_to_pb_type_oneof = { + StringParameter: parameter_pb.string_parameter, + BooleanParameter: parameter_pb.boolean_parameter, + IntegerParameter: parameter_pb.integer_parameter, + FloatParameter: parameter_pb.float_parameter, + DateTimeParameter: parameter_pb.datetime_parameter, + } + for ( + parameter_type_class, + parameter_type_oneof, + ) in parameter_type_to_pb_type_oneof.items(): + if isinstance(_parameter, parameter_type_class): + parameter_type_oneof.CopyFrom(_parameter.to_pb_message()) + break workflow_pb.parameters.extend([parameter_pb]) available_workflows_pb.workflows.extend([workflow_pb]) return available_workflows_pb @@ -243,81 +511,21 @@ def from_pb_message(cls, available_workflows_pb: AvailableWorkflows) -> Self: for workflow_pb in available_workflows_pb.workflows: workflow_parameters: list[WorkflowParameter] = [] for parameter_pb in workflow_pb.parameters: - base_args = dict( - key_name=parameter_pb.key_name, - title=parameter_pb.title, - description=parameter_pb.description, - ) parameter_type_name = parameter_pb.WhichOneof("parameter_type") if parameter_type_name is None: raise TypeError(f"Parameter protobuf message with invalid type: {parameter_pb}") else: - parameter_type = getattr(parameter_pb, parameter_type_name) - - parameter: Union[ - StringParameter, - BooleanParameter, - IntegerParameter, - FloatParameter, - DateTimeParameter, - ] - if isinstance(parameter_type, StringParameterPb): - parameter = StringParameter( - **base_args, default=parameter_type.default, enum_options=[] - ) - for enum_option_pb in parameter_type.enum_options: - if parameter.enum_options: - parameter.enum_options.append( - KeyDisplayPair( - key_name=enum_option_pb.key_name, - display_name=enum_option_pb.display_name, - ) - ) - elif isinstance(parameter_type, BooleanParameterPb): - parameter = BooleanParameter(**base_args, default=parameter_type.default) - elif isinstance(parameter_type, IntegerParameterPb): - parameter = IntegerParameter( - **base_args, - default=parameter_type.default, - minimum=( - parameter_type.minimum if parameter_type.HasField("minimum") else None - ), # protobuf has '0' default value for int instead of None - maximum=( - parameter_type.maximum if parameter_type.HasField("maximum") else None - ), # protobuf has '0' default value for int instead of None - ) - elif isinstance(parameter_type, FloatParameterPb): - parameter = FloatParameter( - **base_args, - default=parameter_type.default, - minimum=( - parameter_type.minimum if parameter_type.HasField("minimum") else None - ), # protobuf has '0' default value for float instead of None - maximum=( - parameter_type.maximum if parameter_type.HasField("maximum") else None - ), # protobuf has '0' default value for float instead of None - ) - elif isinstance(parameter_type, DateTimeParameterPb): - if parameter_type.HasField("default"): - try: - default = datetime.fromisoformat(parameter_type.default) - except TypeError: - raise TypeError( - f"Invalid default datetime format, should be string:" - f" {parameter_type.default}" - ) - except ValueError: - raise ValueError( - f"Invalid default datetime value: {parameter_type.default}" - ) - else: - default = None - parameter = DateTimeParameter(**base_args, default=default) - else: - raise NotImplementedError( - f"Protobuf parameter type {type(parameter_pb)} not supported" - ) - workflow_parameters.append(parameter) + one_of_parameter_type_pb = getattr(parameter_pb, parameter_type_name) + + parameter = None + for parameter_class, parameter_pb_class in PARAMETER_CLASS_TO_PB_CLASS.items(): + if isinstance(one_of_parameter_type_pb, parameter_pb_class): + parameter = parameter_class.from_pb_message( + parameter_pb, one_of_parameter_type_pb + ) + break + if parameter: + workflow_parameters.append(parameter) workflow_types.append( WorkflowType( workflow_type_name=workflow_pb.type_name, @@ -340,36 +548,17 @@ def from_json_config_file(cls, json_config_file_path: str) -> Self: for _workflow in json_config_dict: workflow_parameters = [] if "workflow_parameters" in _workflow: - for _parameter in _workflow["workflow_parameters"]: - parameter_type = _parameter["parameter_type"] - _parameter.pop("parameter_type") - - parameter: WorkflowParameter - if parameter_type == StringParameter.type_name: - if "enum_options" in _parameter: - enum_options = [] - for enum_option in _parameter["enum_options"]: - enum_options.append( - KeyDisplayPair( - key_name=enum_option["key_name"], - display_name=enum_option["display_name"], - ) - ) - _parameter.pop("enum_options") - parameter = StringParameter(**_parameter, enum_options=enum_options) - else: - parameter = StringParameter(**_parameter) - elif parameter_type == BooleanParameter.type_name: - parameter = BooleanParameter(**_parameter) - elif parameter_type == IntegerParameter.type_name: - parameter = IntegerParameter(**_parameter) - elif parameter_type == FloatParameter.type_name: - parameter = FloatParameter(**_parameter) - elif parameter_type == DateTimeParameter.type_name: - parameter = DateTimeParameter(**_parameter) - else: - raise NotImplementedError(f"Parameter type {parameter_type} not supported") - workflow_parameters.append(parameter) + for parameter_config in _workflow["workflow_parameters"]: + parameter_type_name = parameter_config["parameter_type"] + parameter_config.pop("parameter_type") + + for parameter_type_class in PARAMETER_CLASS_TO_PB_CLASS: + if parameter_type_class.type_name == parameter_type_name: + workflow_parameters.append( + parameter_type_class.from_json_config(parameter_config) + ) + break + workflow_types.append( WorkflowType( workflow_type_name=_workflow["workflow_type_name"], From 782db3edc1d371470770baf8e3bd11ff73e5ad8e Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Tue, 2 Jul 2024 15:15:56 +0200 Subject: [PATCH 05/17] remove left over dangling comment --- src/omotes_sdk/omotes_interface.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 63d3211..fcfe723 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -104,7 +104,6 @@ def __init__( NOTE: Needs to be started separately. :param rabbitmq_config: RabbitMQ configuration how to connect to OMOTES. - workflows are updated. """ self.broker_if = BrokerInterface(rabbitmq_config) self.workflow_type_manager = None From 6ffc370b492ccee0f821ded95f77f9dcbbf76a4e Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 12:36:23 +0200 Subject: [PATCH 06/17] remove unnecessary class variable argument --- .../internal/orchestrator/orchestrator_interface.py | 13 +++++-------- src/omotes_sdk/workflow_type.py | 1 - 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py b/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py index c06e86f..b3c0157 100644 --- a/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py +++ b/src/omotes_sdk/internal/orchestrator/orchestrator_interface.py @@ -116,7 +116,7 @@ def start(self) -> None: self.connect_to_request_available_workflows( callback_on_request_workflows=self.request_workflows_handler ) - self.send_available_workflows(self.workflow_type_manager) + self.send_available_workflows() def stop(self) -> None: """Stop the orchestrator interface.""" @@ -198,14 +198,11 @@ def request_workflows_handler(self, request_workflows: RequestAvailableWorkflows :param request_workflows: Request available work flows. """ logger.info("Received an available workflows request") - self.send_available_workflows(self.workflow_type_manager) + self.send_available_workflows() - def send_available_workflows(self, workflow_type_manager: WorkflowTypeManager) -> None: - """Send the available workflows to the SDK. - - :param workflow_type_manager: Job to which the result belongs. - """ - work_type_manager_pb = workflow_type_manager.to_pb_message() + def send_available_workflows(self) -> None: + """Send the available workflows to the SDK.""" + work_type_manager_pb = self.workflow_type_manager.to_pb_message() self.broker_if.send_message_to( OmotesQueueNames.available_workflows_queue_name(), work_type_manager_pb.SerializeToString(), diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index b3a03d0..d7c994d 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -74,7 +74,6 @@ def __post_init__(self) -> None: """Check parameter format.""" for name, field_type in self.__annotations__.items(): # Do not check dataclasses (like StringEnumOption) - # TODO better way of checking that 'field_type' is dataclass? if "StringEnumOption" not in str(field_type) and not isinstance( self.__dict__[name], field_type ): From e5586e4f6a66c0c5b572adeaa0675c54e6c63ea7 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 17:24:21 +0200 Subject: [PATCH 07/17] update to omotes-sdk-protobuf 0.1.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9f1167e..20ce038 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.3.1", - "omotes-sdk-protocol ~= 0.0.8", + "omotes-sdk-protocol ~= 0.1.1", "celery ~= 5.3.6", "typing-extensions ~= 4.11.0" #"streamcapture ~= 1.2.3", TODO: See #4, this is reenabled once streamcapture removes AGPL dependency From f3b4581f0b65d611182472cb8cd1d51257ab1d9a Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 17:37:59 +0200 Subject: [PATCH 08/17] use typing union for python 3.8 --- src/omotes_sdk/omotes_interface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index fcfe723..4991699 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -3,7 +3,7 @@ import uuid from dataclasses import dataclass from datetime import timedelta -from typing import Callable, Optional +from typing import Callable, Optional, Union from google.protobuf.struct_pb2 import Struct from omotes_sdk.internal.common.broker_interface import BrokerInterface @@ -90,7 +90,7 @@ class OmotesInterface: broker_if: BrokerInterface """Interface to RabbitMQ broker.""" - workflow_type_manager: WorkflowTypeManager | None + workflow_type_manager: Union[WorkflowTypeManager, None] """All available workflow types.""" _workflow_config_received: threading.Event """Event triggered when workflow configuration is received.""" From cfdb32cdfa0f97ef5a5f662f44870b27d8da3ce7 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 17:44:25 +0200 Subject: [PATCH 09/17] use typing union for python 3.8 --- src/omotes_sdk/workflow_type.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index d7c994d..2c9e3ac 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -24,9 +24,9 @@ class WorkflowParameter(ABC): key_name: str = field(hash=True, compare=True) """Key name for the parameter.""" - title: str | None = field(default=None, hash=True, compare=True) + title: Union[str, None] = field(default=None, hash=True, compare=True) """Optionally override the 'snake_case to text' 'key_name' (displayed above the input field).""" - description: str | None = field(default=None, hash=True, compare=True) + description: Union[str, None] = field(default=None, hash=True, compare=True) """Optional description (displayed below the input field).""" type_name: str = "" """Parameter type name, set in child class.""" @@ -100,9 +100,11 @@ class StringParameter(WorkflowParameter): type_name: str = "string" """Parameter type name.""" - default: str | None = field(default=None, hash=False, compare=False) + default: Union[str, None] = field(default=None, hash=False, compare=False) """Optional default value.""" - enum_options: list[StringEnumOption] | None = field(default=None, hash=False, compare=False) + enum_options: Union[list[StringEnumOption], None] = field( + default=None, hash=False, compare=False + ) """Optional multiple choice values.""" @override @@ -181,7 +183,7 @@ class BooleanParameter(WorkflowParameter): type_name: str = "boolean" """Parameter type name.""" - default: bool | None = field(default=None, hash=False, compare=False) + default: Union[bool, None] = field(default=None, hash=False, compare=False) """Optional default value.""" @override @@ -227,11 +229,11 @@ class IntegerParameter(WorkflowParameter): type_name: str = "integer" """Parameter type name.""" - default: int | None = field(default=None, hash=False, compare=False) + default: Union[int, None] = field(default=None, hash=False, compare=False) """Optional default value.""" - minimum: int | None = field(default=None, hash=False, compare=False) + minimum: Union[int, None] = field(default=None, hash=False, compare=False) """Optional minimum allowed value.""" - maximum: int | None = field(default=None, hash=False, compare=False) + maximum: Union[int, None] = field(default=None, hash=False, compare=False) """Optional maximum allowed value.""" @override @@ -283,11 +285,11 @@ class FloatParameter(WorkflowParameter): type_name: str = "float" """Parameter type name.""" - default: float | None = field(default=None, hash=False, compare=False) + default: Union[float, None] = field(default=None, hash=False, compare=False) """Optional default value.""" - minimum: float | None = field(default=None, hash=False, compare=False) + minimum: Union[float, None] = field(default=None, hash=False, compare=False) """Optional minimum allowed value.""" - maximum: float | None = field(default=None, hash=False, compare=False) + maximum: Union[float, None] = field(default=None, hash=False, compare=False) """Optional maximum allowed value.""" @override @@ -339,7 +341,7 @@ class DateTimeParameter(WorkflowParameter): type_name: str = "datetime" """Parameter type name.""" - default: datetime | None = field(default=None, hash=False, compare=False) + default: Union[datetime, None] = field(default=None, hash=False, compare=False) """Optional default value.""" @override @@ -421,7 +423,7 @@ class WorkflowType: """Technical name for the workflow.""" workflow_type_description_name: str = field(hash=False, compare=False) """Human-readable name for the workflow.""" - workflow_parameters: List[WorkflowParameter] | None = field( + workflow_parameters: Union[List[WorkflowParameter], None] = field( default=None, hash=False, compare=False ) """Optional list of non-ESDL workflow parameters.""" From 7ed9bdd4f94eab7b4f666901a0e476981fd52e5f Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 17:53:29 +0200 Subject: [PATCH 10/17] use typing List for python 3.8 --- src/omotes_sdk/workflow_type.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index 2c9e3ac..b04cfea 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -102,7 +102,7 @@ class StringParameter(WorkflowParameter): """Parameter type name.""" default: Union[str, None] = field(default=None, hash=False, compare=False) """Optional default value.""" - enum_options: Union[list[StringEnumOption], None] = field( + enum_options: Union[List[StringEnumOption], None] = field( default=None, hash=False, compare=False ) """Optional multiple choice values.""" @@ -510,7 +510,7 @@ def from_pb_message(cls, available_workflows_pb: AvailableWorkflows) -> Self: """ workflow_types = [] for workflow_pb in available_workflows_pb.workflows: - workflow_parameters: list[WorkflowParameter] = [] + workflow_parameters: List[WorkflowParameter] = [] for parameter_pb in workflow_pb.parameters: parameter_type_name = parameter_pb.WhichOneof("parameter_type") if parameter_type_name is None: From 4fb37adfc374133ae4eb28fb59f4c66c9d189aa6 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 17:57:17 +0200 Subject: [PATCH 11/17] use typing Type for python 3.8 --- src/omotes_sdk/workflow_type.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index b04cfea..09046dc 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime -from typing import List, Optional, Dict, Union, Any +from typing import List, Optional, Dict, Union, Any, Type from typing_extensions import Self, override from omotes_sdk_protocol.workflow_pb2 import ( @@ -398,13 +398,13 @@ def from_json_config(cls, json_config: dict) -> Self: PARAMETER_CLASS_TO_PB_CLASS: dict[ - type[WorkflowParameter], + Type[WorkflowParameter], Union[ - type[StringParameterPb], - type[BooleanParameterPb], - type[IntegerParameterPb], - type[FloatParameterPb], - type[DateTimeParameterPb], + Type[StringParameterPb], + Type[BooleanParameterPb], + Type[IntegerParameterPb], + Type[FloatParameterPb], + Type[DateTimeParameterPb], ], ] = { StringParameter: StringParameterPb, From bc4e4551943c11756eeb018c66bee78ee3a2145e Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 18:03:07 +0200 Subject: [PATCH 12/17] use typing Dict for python 3.8 --- src/omotes_sdk/workflow_type.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index 09046dc..a59b60d 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -62,7 +62,7 @@ def from_pb_message( @classmethod @abstractmethod - def from_json_config(cls, json_config: dict) -> Self: + def from_json_config(cls, json_config: Dict) -> Self: """Abstract function to create a class instance from json configuration. :param json_config: dictionary with configuration. @@ -156,7 +156,7 @@ def from_pb_message( @classmethod @override - def from_json_config(cls, json_config: dict) -> Self: + def from_json_config(cls, json_config: Dict) -> Self: """Create a class instance from json configuration. :param json_config: dictionary with configuration. @@ -214,7 +214,7 @@ def from_pb_message( @classmethod @override - def from_json_config(cls, json_config: dict) -> Self: + def from_json_config(cls, json_config: Dict) -> Self: """Create a class instance from json configuration. :param json_config: dictionary with configuration. @@ -270,7 +270,7 @@ def from_pb_message( @classmethod @override - def from_json_config(cls, json_config: dict) -> Self: + def from_json_config(cls, json_config: Dict) -> Self: """Create a class instance from json configuration. :param json_config: dictionary with configuration. @@ -326,7 +326,7 @@ def from_pb_message( @classmethod @override - def from_json_config(cls, json_config: dict) -> Self: + def from_json_config(cls, json_config: Dict) -> Self: """Create a class instance from json configuration. :param json_config: dictionary with configuration. @@ -388,7 +388,7 @@ def from_pb_message( @classmethod @override - def from_json_config(cls, json_config: dict) -> Self: + def from_json_config(cls, json_config: Dict) -> Self: """Create a class instance from json configuration. :param json_config: dictionary with configuration. @@ -397,7 +397,7 @@ def from_json_config(cls, json_config: dict) -> Self: return cls(**json_config) -PARAMETER_CLASS_TO_PB_CLASS: dict[ +PARAMETER_CLASS_TO_PB_CLASS: Dict[ Type[WorkflowParameter], Union[ Type[StringParameterPb], From f915c2a62e9144248470eac1a8364e244e6b5e60 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 18:11:56 +0200 Subject: [PATCH 13/17] use typing Dict for python 3.8 --- src/omotes_sdk/workflow_type.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index a59b60d..3e402ef 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -1,3 +1,4 @@ +from __future__ import annotations import json from abc import ABC, abstractmethod from dataclasses import dataclass, field From a37a413e036264484aa43a043a1ac57e520cce2d Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 18:15:25 +0200 Subject: [PATCH 14/17] fix dict typing --- src/omotes_sdk/workflow_type.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index 3e402ef..a0f587a 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -1,4 +1,3 @@ -from __future__ import annotations import json from abc import ABC, abstractmethod from dataclasses import dataclass, field @@ -398,7 +397,7 @@ def from_json_config(cls, json_config: Dict) -> Self: return cls(**json_config) -PARAMETER_CLASS_TO_PB_CLASS: Dict[ +PARAMETER_CLASS_TO_PB_CLASS: dict[ Type[WorkflowParameter], Union[ Type[StringParameterPb], From bc352161c38b62ad9fbec2410e972245ec6f6195 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 18:21:34 +0200 Subject: [PATCH 15/17] fix dict typing --- src/omotes_sdk/workflow_type.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index a0f587a..a59b60d 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -397,7 +397,7 @@ def from_json_config(cls, json_config: Dict) -> Self: return cls(**json_config) -PARAMETER_CLASS_TO_PB_CLASS: dict[ +PARAMETER_CLASS_TO_PB_CLASS: Dict[ Type[WorkflowParameter], Union[ Type[StringParameterPb], From 3ac03733f238dbbec3728973ed3db78134ca6a17 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 4 Jul 2024 21:02:58 +0200 Subject: [PATCH 16/17] replace post init for workflow params to checking in from_json_config functions for p38 --- .gitignore | 2 +- src/omotes_sdk/workflow_type.py | 66 ++++++++++++++----- ...rkflow_config_enum_option_missing_key.json | 48 ++++++++++++++ ...kflow_config_enum_options_not_as_list.json | 43 ++++++++++++ .../test_config/workflow_config_happy.json | 3 +- ...workflow_config_wrong_datetime_format.json | 50 ++++++++++++++ unit_test/test_workflow_type.py | 39 ++++++++++- 7 files changed, 232 insertions(+), 19 deletions(-) create mode 100644 unit_test/test_config/workflow_config_enum_option_missing_key.json create mode 100644 unit_test/test_config/workflow_config_enum_options_not_as_list.json create mode 100644 unit_test/test_config/workflow_config_wrong_datetime_format.json diff --git a/.gitignore b/.gitignore index 341f74d..765df51 100644 --- a/.gitignore +++ b/.gitignore @@ -105,7 +105,7 @@ celerybeat.pid # Environments .env -.venv +.venv* env/ venv/ ENV/ diff --git a/src/omotes_sdk/workflow_type.py b/src/omotes_sdk/workflow_type.py index a59b60d..a1c5c5f 100644 --- a/src/omotes_sdk/workflow_type.py +++ b/src/omotes_sdk/workflow_type.py @@ -70,19 +70,6 @@ def from_json_config(cls, json_config: Dict) -> Self: """ pass - def __post_init__(self) -> None: - """Check parameter format.""" - for name, field_type in self.__annotations__.items(): - # Do not check dataclasses (like StringEnumOption) - if "StringEnumOption" not in str(field_type) and not isinstance( - self.__dict__[name], field_type - ): - current_type = type(self.__dict__[name]) - raise TypeError( - f"The field `{name}` was assigned by `{current_type}` ('{self.__dict__[name]}')" - f" instead of `{field_type}`" - ) - @dataclass(eq=True, frozen=True) class StringEnumOption: @@ -162,9 +149,24 @@ def from_json_config(cls, json_config: Dict) -> Self: :param json_config: dictionary with configuration. :return: class instance. """ + if "default" in json_config and not isinstance(json_config["default"], str): + raise TypeError("'default' for StringParameter must be in 'str' format") + + if "enum_options" in json_config and not isinstance(json_config["enum_options"], List): + raise TypeError("'enum_options' for StringParameter must be a 'list'") + if "enum_options" in json_config: enum_options = [] for enum_option in json_config["enum_options"]: + enum_keys = ["key_name", "display_name"] + for enum_key in enum_keys: + if enum_key not in enum_option: + raise TypeError(f"A string enum option must contain a '{enum_key}'") + if enum_key in json_config and not isinstance(json_config[enum_key], str): + raise TypeError( + f"'{enum_key}' for a string enum option must be in 'str' format:" + f" '{json_config[enum_key]}" + ) enum_options.append( StringEnumOption( key_name=enum_option["key_name"], @@ -220,6 +222,11 @@ def from_json_config(cls, json_config: Dict) -> Self: :param json_config: dictionary with configuration. :return: class instance. """ + if "default" in json_config and not isinstance(json_config["default"], bool): + raise TypeError( + f"'default' for BooleanParameter must be in 'bool' format:" + f" '{json_config['default']}'" + ) return cls(**json_config) @@ -276,6 +283,13 @@ def from_json_config(cls, json_config: Dict) -> Self: :param json_config: dictionary with configuration. :return: class instance. """ + int_params = ["default", "minimum", "maximum"] + for int_param in int_params: + if int_param in json_config and not isinstance(json_config[int_param], int): + raise TypeError( + f"'{int_param}' for IntegerParameter must be in 'int' format:" + f" '{json_config[int_param]}'" + ) return cls(**json_config) @@ -332,6 +346,18 @@ def from_json_config(cls, json_config: Dict) -> Self: :param json_config: dictionary with configuration. :return: class instance. """ + float_params = ["default", "minimum", "maximum"] + for float_param in float_params: + if ( + float_param in json_config + and not isinstance(json_config[float_param], float) + and not isinstance(json_config[float_param], int) + ): + raise TypeError( + f"'{float_param}' for FloatParameter must be in 'float' format:" + f" '{json_config[float_param]}'" + ) + return cls(**json_config) @@ -372,11 +398,9 @@ def from_pb_message( default = datetime.fromisoformat(parameter_type_pb.default) except TypeError: raise TypeError( - f"Invalid default datetime format, should be string:" + f"Invalid default datetime format, should be a string in ISO format:" f" {parameter_type_pb.default}" ) - except ValueError: - raise ValueError(f"Invalid default datetime value: {parameter_type_pb.default}") else: default = None return cls( @@ -394,6 +418,16 @@ def from_json_config(cls, json_config: Dict) -> Self: :param json_config: dictionary with configuration. :return: class instance. """ + if "default" in json_config: + try: + default = datetime.fromisoformat(json_config["default"]) + except TypeError: + raise TypeError( + f"Invalid default datetime format, should be a string in ISO format:" + f" '{json_config['default']}'" + ) + json_config["default"] = default + return cls(**json_config) diff --git a/unit_test/test_config/workflow_config_enum_option_missing_key.json b/unit_test/test_config/workflow_config_enum_option_missing_key.json new file mode 100644 index 0000000..0d8dafe --- /dev/null +++ b/unit_test/test_config/workflow_config_enum_option_missing_key.json @@ -0,0 +1,48 @@ +[ + { + "workflow_type_name": "workflow_1", + "workflow_type_description_name": "High fidelity simulator" + }, + { + "workflow_type_name": "workflow_2", + "workflow_type_description_name": "Used for testing purposes. Should not be used in production environments.", + "workflow_parameters": [ + { + "parameter_type": "datetime", + "key_name": "start_time" + }, + { + "parameter_type": "datetime", + "key_name": "end_time" + }, + { + "parameter_type": "integer", + "key_name": "step_size_in_minutes", + "title": "This will override the 'capitalized, underscore to space', key_name", + "description": "Description/explanation.", + "default": 900, + "minimum": 1, + "maximum": 525600 + }, + { + "parameter_type": "integer", + "key_name": "number_of_steps", + "minimum": 2 + }, + { + "parameter_type": "string", + "key_name": "test_enum", + "description": "How to use enum options", + "enum_options": [ + { + "key_name": "key1", + "display_name": "Display name 1" + }, + { + "key_name": "key2" + } + ] + } + ] + } +] \ No newline at end of file diff --git a/unit_test/test_config/workflow_config_enum_options_not_as_list.json b/unit_test/test_config/workflow_config_enum_options_not_as_list.json new file mode 100644 index 0000000..7e3d5c8 --- /dev/null +++ b/unit_test/test_config/workflow_config_enum_options_not_as_list.json @@ -0,0 +1,43 @@ +[ + { + "workflow_type_name": "workflow_1", + "workflow_type_description_name": "High fidelity simulator" + }, + { + "workflow_type_name": "workflow_2", + "workflow_type_description_name": "Used for testing purposes. Should not be used in production environments.", + "workflow_parameters": [ + { + "parameter_type": "datetime", + "key_name": "start_time" + }, + { + "parameter_type": "datetime", + "key_name": "end_time" + }, + { + "parameter_type": "integer", + "key_name": "step_size_in_minutes", + "title": "This will override the 'capitalized, underscore to space', key_name", + "description": "Description/explanation.", + "default": 900, + "minimum": 1, + "maximum": 525600 + }, + { + "parameter_type": "integer", + "key_name": "number_of_steps", + "minimum": 2 + }, + { + "parameter_type": "string", + "key_name": "test_enum", + "description": "How to use enum options", + "enum_options": { + "key_name": "key1", + "display_name": "Display name 1" + } + } + ] + } +] \ No newline at end of file diff --git a/unit_test/test_config/workflow_config_happy.json b/unit_test/test_config/workflow_config_happy.json index 10404c7..4e8b727 100644 --- a/unit_test/test_config/workflow_config_happy.json +++ b/unit_test/test_config/workflow_config_happy.json @@ -9,7 +9,8 @@ "workflow_parameters": [ { "parameter_type": "datetime", - "key_name": "start_time" + "key_name": "start_time", + "default": "2023-12-31T20:00:00" }, { "parameter_type": "datetime", diff --git a/unit_test/test_config/workflow_config_wrong_datetime_format.json b/unit_test/test_config/workflow_config_wrong_datetime_format.json new file mode 100644 index 0000000..7840c77 --- /dev/null +++ b/unit_test/test_config/workflow_config_wrong_datetime_format.json @@ -0,0 +1,50 @@ +[ + { + "workflow_type_name": "workflow_1", + "workflow_type_description_name": "High fidelity simulator" + }, + { + "workflow_type_name": "workflow_2", + "workflow_type_description_name": "Used for testing purposes. Should not be used in production environments.", + "workflow_parameters": [ + { + "parameter_type": "datetime", + "key_name": "start_time", + "default": "2023-12-31T0:00:00" + }, + { + "parameter_type": "datetime", + "key_name": "end_time" + }, + { + "parameter_type": "integer", + "key_name": "step_size_in_minutes", + "title": "This will override the 'capitalized, underscore to space', key_name", + "description": "Description/explanation.", + "default": 900, + "minimum": 1, + "maximum": 525600 + }, + { + "parameter_type": "integer", + "key_name": "number_of_steps", + "minimum": 2 + }, + { + "parameter_type": "string", + "key_name": "test_enum", + "description": "How to use enum options", + "enum_options": [ + { + "key_name": "key1", + "display_name": "Display name 1" + }, + { + "key_name": "key2", + "display_name": "Display name 2" + } + ] + } + ] + } +] \ No newline at end of file diff --git a/unit_test/test_workflow_type.py b/unit_test/test_workflow_type.py index b59f562..577c61e 100644 --- a/unit_test/test_workflow_type.py +++ b/unit_test/test_workflow_type.py @@ -65,8 +65,45 @@ def test__from_json_config_file__integer_minimum_as_float(self) -> None: workflow_json_file = "./unit_test/test_config/workflow_config_int_min_as_float.json" # Act / Assert - with self.assertRaises(TypeError): + with self.assertRaises(TypeError) as error_context: WorkflowTypeManager.from_json_config_file(workflow_json_file) + self.assertEqual( + str(error_context.exception), + "'minimum' for IntegerParameter must be in 'int' format: '1.5'", + ) + + def test__from_json_config_file__enum_option_missing_key(self) -> None: + # Arrange + workflow_json_file = "./unit_test/test_config/workflow_config_enum_option_missing_key.json" + + # Act / Assert + with self.assertRaises(TypeError) as error_context: + WorkflowTypeManager.from_json_config_file(workflow_json_file) + self.assertEqual( + str(error_context.exception), "A string enum option must contain a 'display_name'" + ) + + def test__from_json_config_file__enum_options_not_as_list(self) -> None: + # Arrange + workflow_json_file = "./unit_test/test_config/workflow_config_enum_options_not_as_list.json" + + # Act / Assert + with self.assertRaises(TypeError) as error_context: + WorkflowTypeManager.from_json_config_file(workflow_json_file) + self.assertEqual( + str(error_context.exception), "'enum_options' for StringParameter must be a 'list'" + ) + + def test__from_json_config_file__wrong_datetime_format(self) -> None: + # Arrange + workflow_json_file = "./unit_test/test_config/workflow_config_wrong_datetime_format.json" + + # Act / Assert + with self.assertRaises(ValueError) as error_context: + WorkflowTypeManager.from_json_config_file(workflow_json_file) + self.assertEqual( + str(error_context.exception), "Invalid isoformat string: '2023-12-31T0:00:00'" + ) def test__to_pb_message__happy(self) -> None: # Arrange From 4ee26535dd1304bb02a5c2779b70c7f32e8d7130 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Fri, 5 Jul 2024 11:44:56 +0200 Subject: [PATCH 17/17] to omotes-sdk-protocol 0.1.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index fd091ba..c47b46c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.4.2", - "omotes-sdk-protocol ~= 0.0.8", + "omotes-sdk-protocol ~= 0.1.1", "celery ~= 5.3.6", "typing-extensions ~= 4.11.0" #"streamcapture ~= 1.2.3", TODO: See #4, this is reenabled once streamcapture removes AGPL dependency