diff --git a/pyzeebe/common/exceptions.py b/pyzeebe/common/exceptions.py index ae0af194..96b6896e 100644 --- a/pyzeebe/common/exceptions.py +++ b/pyzeebe/common/exceptions.py @@ -1,8 +1,2 @@ -# TODO: improve these - class TaskNotFoundException(Exception): pass - - -class NotEnoughTasksException(Exception): - pass diff --git a/pyzeebe/grpc_internals/zeebe_adapter.py b/pyzeebe/grpc_internals/zeebe_adapter.py index ae16b2e4..0b6f3a59 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_adapter.py @@ -1,4 +1,5 @@ import json +import logging import os.path from typing import List, Generator, Dict @@ -27,13 +28,17 @@ def __init__(self, hostname: str = None, port: int = None, channel: grpc.Channel self.gateway_stub = GatewayStub(self._channel) def _check_connectivity(self, value: grpc.ChannelConnectivity) -> None: + logging.debug(f'Grpc channel connectivity changed to: {value}') if value in [grpc.ChannelConnectivity.READY, grpc.ChannelConnectivity.IDLE]: + logging.debug('Connected to Zeebe') self.connected = True self.retrying_connection = False elif value in [grpc.ChannelConnectivity.CONNECTING, grpc.ChannelConnectivity.TRANSIENT_FAILURE]: + logging.warning('No connection to Zeebe, recoverable. Reconnecting...') self.connected = False self.retrying_connection = True elif value == grpc.ChannelConnectivity.SHUTDOWN: + logging.error('Failed to establish connection to Zeebe. Non recoverable') self.connected = False self.retrying_connection = False raise ConnectionAbortedError(f'Lost connection to {self._connection_uri}') @@ -45,7 +50,9 @@ def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_a maxJobsToActivate=max_jobs_to_activate, fetchVariable=variables_to_fetch, requestTimeout=request_timeout)): for job in response.jobs: - yield self._create_task_context_from_job(job) + context = self._create_task_context_from_job(job) + logging.debug(f'Got job: {context} from zeebe') + yield context @staticmethod def _create_task_context_from_job(job) -> TaskContext: diff --git a/pyzeebe/task/task.py b/pyzeebe/task/task.py index 1e544d27..0181a885 100644 --- a/pyzeebe/task/task.py +++ b/pyzeebe/task/task.py @@ -19,3 +19,7 @@ def __init__(self, task_type: str, task_handler: Callable[..., Dict], self.max_jobs_to_activate = max_jobs_to_activate self.variables_to_fetch = variables_to_fetch or [] self.handler: Callable[[TaskContext], TaskContext] = None + + def __repr__(self): + return str({'type': self.type, 'timeout': self.timeout, 'max_jobs_to_activate': self.max_jobs_to_activate, + 'variables_to_fetch': self.variables_to_fetch}) diff --git a/pyzeebe/task/task_context.py b/pyzeebe/task/task_context.py index 2e822cd3..748451e3 100644 --- a/pyzeebe/task/task_context.py +++ b/pyzeebe/task/task_context.py @@ -7,7 +7,7 @@ def __init__(self, key: int, _type: str, workflow_instance_key: int, bpmn_proces custom_headers: Dict, worker: str, retries: int, deadline: int, variables: Dict): self.key = key self.type = _type - self.wokflow_instance_key = workflow_instance_key + self.workflow_instance_key = workflow_instance_key self.bpmn_process_id = bpmn_process_id self.workflow_definition_version = workflow_definition_version self.workflow_key = workflow_key @@ -18,3 +18,11 @@ def __init__(self, key: int, _type: str, workflow_instance_key: int, bpmn_proces self.retries = retries self.deadline = deadline self.variables = variables + + def __repr__(self): + return str({'jobKey': self.key, 'taskType': self.type, 'workflowInstanceKey': self.workflow_instance_key, + 'bpmnProcessId': self.bpmn_process_id, + 'workflowDefinitionVersion': self.workflow_definition_version, 'workflowKey': self.workflow_key, + 'elementId': self.element_id, 'elementInstanceKey': self.element_instance_key, + 'customHeaders': self.custom_headers, 'worker': self.worker, 'retries': self.retries, + 'deadline': self.deadline, 'variables': self.variables}) diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index 79cb7542..2d15c9b4 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -1,8 +1,9 @@ +import logging import socket from concurrent.futures import ThreadPoolExecutor -from typing import List, Callable, Generator, Dict, Tuple +from typing import List, Callable, Generator, Tuple -from pyzeebe.common.exceptions import TaskNotFoundException, NotEnoughTasksException +from pyzeebe.common.exceptions import TaskNotFoundException from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.task.task import Task @@ -32,26 +33,28 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N self.tasks = [] def work(self): - if len(self.tasks) > 0: - executor = ThreadPoolExecutor(max_workers=len(self.tasks), thread_name_prefix='TASK_HANDLER_THREAD') - executor.map(self._handle_task, self.tasks) - executor.shutdown(wait=True) - else: - raise NotEnoughTasksException('Worker needs tasks in order to work') + with ThreadPoolExecutor(thread_name_prefix='zeebe-task') as executor: + for task in self.tasks: + executor.submit(self._handle_task, task) def _handle_task(self, task: Task): + logging.debug(f'Handling task {task}') while self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection: if self.zeebe_adapter.retrying_connection: + logging.debug(f'Retrying connection to {self.zeebe_adapter._connection_uri}') continue self._handle_task_contexts(task) def _handle_task_contexts(self, task: Task): - executor = ThreadPoolExecutor(thread_name_prefix='JOB_HANDLER_THREAD') - executor.map(task.handler, self._get_task_contexts(task)) - executor.shutdown(wait=False) # Do not wait for tasks to finish + executor = ThreadPoolExecutor(thread_name_prefix=f'zeebe-job-{task.type}') + for task_context in self._get_task_contexts(task): + logging.debug(f'Running job: {task_context}') + executor.submit(task.handler, task_context) + executor.shutdown(wait=False) def _get_task_contexts(self, task: Task) -> Generator[TaskContext, None, None]: + logging.debug(f'Activating jobs for task: {task}') return self.zeebe_adapter.activate_jobs(task_type=task.type, worker=self.name, timeout=task.timeout, max_jobs_to_activate=task.max_jobs_to_activate, variables_to_fetch=task.variables_to_fetch, @@ -61,7 +64,7 @@ def add_task(self, task: Task) -> None: task.handler = self._create_zeebe_task_handler(task) self.tasks.append(task) - def _create_zeebe_task_handler(self, task: Task) -> Callable[[TaskContext], Dict]: + def _create_zeebe_task_handler(self, task: Task) -> Callable[[TaskContext], TaskContext]: before_decorator_runner = self._create_before_decorator_runner(task) after_decorator_runner = self._create_after_decorator_runner(task) @@ -70,9 +73,11 @@ def task_handler(context: TaskContext): context = before_decorator_runner(context) context.variables = task.inner_function(**context.variables) context = after_decorator_runner(context) + logging.debug(f'Completing job: {context}') self.zeebe_adapter.complete_job(job_key=context.key, variables=context.variables) - return context.variables + return context except Exception as e: + logging.debug(f'Failed job: {context}. Error: {e}.') task.exception_handler(e, context, TaskStatusController(context, self.zeebe_adapter)) return e diff --git a/pyzeebe/worker/worker_test.py b/pyzeebe/worker/worker_test.py index 9baf1263..0ad19e44 100644 --- a/pyzeebe/worker/worker_test.py +++ b/pyzeebe/worker/worker_test.py @@ -4,7 +4,7 @@ import pytest -from pyzeebe.common.exceptions import TaskNotFoundException, NotEnoughTasksException +from pyzeebe.common.exceptions import TaskNotFoundException from pyzeebe.common.random_utils import random_task_context from pyzeebe.task.task import Task from pyzeebe.task.task_context import TaskContext @@ -40,7 +40,7 @@ def test_add_task(): context = random_task_context(task) context.variables = {'x': str(uuid4())} with patch('pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter.complete_job') as mock: - assert isinstance(task.handler(context), dict) + assert isinstance(task.handler(context), TaskContext) mock.assert_called_with(job_key=context.key, variables=context.variables) @@ -54,7 +54,7 @@ def test_before_task_decorator_called(): task.before(decorator) zeebe_worker.add_task(task) with patch('pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter.complete_job') as grpc_mock: - assert isinstance(task.handler(context), dict) + assert isinstance(task.handler(context), TaskContext) grpc_mock.assert_called_with(job_key=context.key, variables=context.variables) mock.assert_called_with(context) @@ -70,7 +70,7 @@ def test_after_task_decorator_called(): zeebe_worker.add_task(task) with patch('pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter.complete_job') as grpc_mock: - assert isinstance(task.handler(context), dict) + assert isinstance(task.handler(context), TaskContext) grpc_mock.assert_called_with(job_key=context.key, variables=context.variables) mock.assert_called_with(context) @@ -206,8 +206,3 @@ def test_handle_many_jobs(): task_handler_mock.return_value = {'x': str(uuid4())} zeebe_worker._handle_task_contexts(task) task_handler_mock.assert_called_with(context) - - -def test_work_without_tasks(): - with pytest.raises(NotEnoughTasksException): - zeebe_worker.work() diff --git a/setup.py b/setup.py index 772617a7..68a99f1c 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="pyzeebe", - version="1.0.0", + version="1.0.1", author="Jonatan Martens", author_email="jonatanmartenstav@gmail.com", description="Zeebe client api",