Skip to content

Commit

Permalink
Merge pull request #9 from JonatanMartens/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
JonatanMartens committed Aug 24, 2020
2 parents a40f58a + 61bf6a1 commit b0daea8
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 37 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Test pyzeebe

on:
push:
branches: [ master, development, feature/* ]
branches: [ master, development, feature/*, bugfix/* ]
pull_request:
branches: [ master, development, feature/* ]
branches: [ master, development, feature/*, bugfix/* ]

jobs:
build:
Expand Down
6 changes: 0 additions & 6 deletions pyzeebe/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,2 @@
# TODO: improve these

class TaskNotFoundException(Exception):
pass


class NotEnoughTasksException(Exception):
pass
19 changes: 17 additions & 2 deletions pyzeebe/grpc_internals/zeebe_adapter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os.path
from typing import List, Generator, Dict

Expand All @@ -12,20 +13,32 @@
class ZeebeAdapter:
def __init__(self, hostname: str = None, port: int = None, channel: grpc.Channel = None, **kwargs):
self._connection_uri = f'{hostname}:{port}' or os.getenv('ZEEBE_ADDRESS') or 'localhost:26500'
self._channel = channel or grpc.insecure_channel(self._connection_uri)
if channel:
self._channel = channel
else:
if hostname or port:
self._connection_uri = f'{hostname or "localhost"}:{port or 26500}'
else:
self._connection_uri = os.getenv('ZEEBE_ADDRESS') or 'localhost:26500'
self._channel = grpc.insecure_channel(self._connection_uri)

self.connected = False
self.retrying_connection = True
self._channel.subscribe(self._check_connectivity, try_to_connect=True)
self.gateway_stub = GatewayStub(self._channel)

def _check_connectivity(self, value: grpc.ChannelConnectivity) -> None:
logging.debug(f'Grpc channel connectivity changed to: {value}')
if value in [grpc.ChannelConnectivity.READY, grpc.ChannelConnectivity.IDLE]:
logging.debug('Connected to Zeebe')
self.connected = True
self.retrying_connection = False
elif value in [grpc.ChannelConnectivity.CONNECTING, grpc.ChannelConnectivity.TRANSIENT_FAILURE]:
logging.warning('No connection to Zeebe, recoverable. Reconnecting...')
self.connected = False
self.retrying_connection = True
elif value == grpc.ChannelConnectivity.SHUTDOWN:
logging.error('Failed to establish connection to Zeebe. Non recoverable')
self.connected = False
self.retrying_connection = False
raise ConnectionAbortedError(f'Lost connection to {self._connection_uri}')
Expand All @@ -37,7 +50,9 @@ def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_a
maxJobsToActivate=max_jobs_to_activate,
fetchVariable=variables_to_fetch, requestTimeout=request_timeout)):
for job in response.jobs:
yield self._create_task_context_from_job(job)
context = self._create_task_context_from_job(job)
logging.debug(f'Got job: {context} from zeebe')
yield context

@staticmethod
def _create_task_context_from_job(job) -> TaskContext:
Expand Down
7 changes: 5 additions & 2 deletions pyzeebe/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from pyzeebe.task.task_status_controller import TaskStatusController


# TODO: Add support for async tasks
class Task(ZeebeDecoratorBase):
def __init__(self, task_type: str, task_handler: Callable[..., Dict],
exception_handler: Callable[[Exception, TaskContext, TaskStatusController], None],
timeout: int = 0, max_jobs_to_activate: int = 32, variables_to_fetch: List[str] = None,
timeout: int = 10000, max_jobs_to_activate: int = 32, variables_to_fetch: List[str] = None,
before: List = None, after: List = None):
super().__init__(before=before, after=after)

Expand All @@ -20,3 +19,7 @@ def __init__(self, task_type: str, task_handler: Callable[..., Dict],
self.max_jobs_to_activate = max_jobs_to_activate
self.variables_to_fetch = variables_to_fetch or []
self.handler: Callable[[TaskContext], TaskContext] = None

def __repr__(self):
return str({'type': self.type, 'timeout': self.timeout, 'max_jobs_to_activate': self.max_jobs_to_activate,
'variables_to_fetch': self.variables_to_fetch})
10 changes: 9 additions & 1 deletion pyzeebe/task/task_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def __init__(self, key: int, _type: str, workflow_instance_key: int, bpmn_proces
custom_headers: Dict, worker: str, retries: int, deadline: int, variables: Dict):
self.key = key
self.type = _type
self.wokflow_instance_key = workflow_instance_key
self.workflow_instance_key = workflow_instance_key
self.bpmn_process_id = bpmn_process_id
self.workflow_definition_version = workflow_definition_version
self.workflow_key = workflow_key
Expand All @@ -18,3 +18,11 @@ def __init__(self, key: int, _type: str, workflow_instance_key: int, bpmn_proces
self.retries = retries
self.deadline = deadline
self.variables = variables

def __repr__(self):
return str({'jobKey': self.key, 'taskType': self.type, 'workflowInstanceKey': self.workflow_instance_key,
'bpmnProcessId': self.bpmn_process_id,
'workflowDefinitionVersion': self.workflow_definition_version, 'workflowKey': self.workflow_key,
'elementId': self.element_id, 'elementInstanceKey': self.element_instance_key,
'customHeaders': self.custom_headers, 'worker': self.worker, 'retries': self.retries,
'deadline': self.deadline, 'variables': self.variables})
32 changes: 18 additions & 14 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import socket
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable, Generator, Dict, Tuple
from typing import List, Callable, Generator, Tuple

from pyzeebe.common.exceptions import TaskNotFoundException, NotEnoughTasksException
from pyzeebe.common.exceptions import TaskNotFoundException
from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.task.task import Task
Expand All @@ -11,7 +12,6 @@
from pyzeebe.task.task_status_controller import TaskStatusController


# TODO: Add support for async tasks
class ZeebeWorker(ZeebeDecoratorBase):
"""A zeebe worker that can connect to a zeebe instance and perform tasks."""

Expand All @@ -33,26 +33,28 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N
self.tasks = []

def work(self):
if len(self.tasks) > 0:
executor = ThreadPoolExecutor(max_workers=len(self.tasks))
executor.map(self._handle_task, self.tasks)
executor.shutdown(wait=True)
else:
raise NotEnoughTasksException('Worker needs tasks in order to work')
with ThreadPoolExecutor(thread_name_prefix='zeebe-task') as executor:
for task in self.tasks:
executor.submit(self._handle_task, task)

def _handle_task(self, task: Task):
logging.debug(f'Handling task {task}')
while self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection:
if self.zeebe_adapter.retrying_connection:
logging.debug(f'Retrying connection to {self.zeebe_adapter._connection_uri}')
continue

self._handle_task_contexts(task)

def _handle_task_contexts(self, task: Task):
executor = ThreadPoolExecutor()
executor.map(task.handler, self._get_task_contexts(task))
executor.shutdown(wait=False) # Do not wait for tasks to finish
executor = ThreadPoolExecutor(thread_name_prefix=f'zeebe-job-{task.type}')
for task_context in self._get_task_contexts(task):
logging.debug(f'Running job: {task_context}')
executor.submit(task.handler, task_context)
executor.shutdown(wait=False)

def _get_task_contexts(self, task: Task) -> Generator[TaskContext, None, None]:
logging.debug(f'Activating jobs for task: {task}')
return self.zeebe_adapter.activate_jobs(task_type=task.type, worker=self.name, timeout=task.timeout,
max_jobs_to_activate=task.max_jobs_to_activate,
variables_to_fetch=task.variables_to_fetch,
Expand All @@ -62,7 +64,7 @@ def add_task(self, task: Task) -> None:
task.handler = self._create_zeebe_task_handler(task)
self.tasks.append(task)

def _create_zeebe_task_handler(self, task: Task) -> Callable[[TaskContext], Dict]:
def _create_zeebe_task_handler(self, task: Task) -> Callable[[TaskContext], TaskContext]:
before_decorator_runner = self._create_before_decorator_runner(task)
after_decorator_runner = self._create_after_decorator_runner(task)

Expand All @@ -71,9 +73,11 @@ def task_handler(context: TaskContext):
context = before_decorator_runner(context)
context.variables = task.inner_function(**context.variables)
context = after_decorator_runner(context)
logging.debug(f'Completing job: {context}')
self.zeebe_adapter.complete_job(job_key=context.key, variables=context.variables)
return context.variables
return context
except Exception as e:
logging.debug(f'Failed job: {context}. Error: {e}.')
task.exception_handler(e, context, TaskStatusController(context, self.zeebe_adapter))
return e

Expand Down
13 changes: 4 additions & 9 deletions pyzeebe/worker/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from pyzeebe.common.exceptions import TaskNotFoundException, NotEnoughTasksException
from pyzeebe.common.exceptions import TaskNotFoundException
from pyzeebe.common.random_utils import random_task_context
from pyzeebe.task.task import Task
from pyzeebe.task.task_context import TaskContext
Expand Down Expand Up @@ -40,7 +40,7 @@ def test_add_task():
context = random_task_context(task)
context.variables = {'x': str(uuid4())}
with patch('pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter.complete_job') as mock:
assert isinstance(task.handler(context), dict)
assert isinstance(task.handler(context), TaskContext)
mock.assert_called_with(job_key=context.key, variables=context.variables)


Expand All @@ -54,7 +54,7 @@ def test_before_task_decorator_called():
task.before(decorator)
zeebe_worker.add_task(task)
with patch('pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter.complete_job') as grpc_mock:
assert isinstance(task.handler(context), dict)
assert isinstance(task.handler(context), TaskContext)
grpc_mock.assert_called_with(job_key=context.key, variables=context.variables)
mock.assert_called_with(context)

Expand All @@ -70,7 +70,7 @@ def test_after_task_decorator_called():
zeebe_worker.add_task(task)

with patch('pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter.complete_job') as grpc_mock:
assert isinstance(task.handler(context), dict)
assert isinstance(task.handler(context), TaskContext)
grpc_mock.assert_called_with(job_key=context.key, variables=context.variables)
mock.assert_called_with(context)

Expand Down Expand Up @@ -206,8 +206,3 @@ def test_handle_many_jobs():
task_handler_mock.return_value = {'x': str(uuid4())}
zeebe_worker._handle_task_contexts(task)
task_handler_mock.assert_called_with(context)


def test_work_without_tasks():
with pytest.raises(NotEnoughTasksException):
zeebe_worker.work()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pyzeebe",
version="1.0.0",
version="1.0.1",
author="Jonatan Martens",
author_email="jonatanmartenstav@gmail.com",
description="Zeebe client api",
Expand Down

0 comments on commit b0daea8

Please sign in to comment.