Skip to content

Commit

Permalink
Merge pull request #7 from JonatanMartens/bugfix/worker
Browse files Browse the repository at this point in the history
Fixed some things
  • Loading branch information
JonatanMartens committed Aug 23, 2020
2 parents dfbb202 + d464570 commit bc64f94
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Test pyzeebe

on:
push:
branches: [ master, development, feature/* ]
branches: [ master, development, feature/*, bugfix/* ]
pull_request:
branches: [ master, development, feature/* ]
branches: [ master, development, feature/*, bugfix/* ]

jobs:
build:
Expand Down
10 changes: 9 additions & 1 deletion pyzeebe/grpc_internals/zeebe_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
class ZeebeAdapter:
def __init__(self, hostname: str = None, port: int = None, channel: grpc.Channel = None, **kwargs):
self._connection_uri = f'{hostname}:{port}' or os.getenv('ZEEBE_ADDRESS') or 'localhost:26500'
self._channel = channel or grpc.insecure_channel(self._connection_uri)
if channel:
self._channel = channel
else:
if hostname or port:
self._connection_uri = f'{hostname or "localhost"}:{port or 26500}'
else:
self._connection_uri = os.getenv('ZEEBE_ADDRESS') or 'localhost:26500'
self._channel = grpc.insecure_channel(self._connection_uri)

self.connected = False
self.retrying_connection = True
self._channel.subscribe(self._check_connectivity, try_to_connect=True)
Expand Down
3 changes: 1 addition & 2 deletions pyzeebe/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from pyzeebe.task.task_status_controller import TaskStatusController


# TODO: Add support for async tasks
class Task(ZeebeDecoratorBase):
def __init__(self, task_type: str, task_handler: Callable[..., Dict],
exception_handler: Callable[[Exception, TaskContext, TaskStatusController], None],
timeout: int = 0, max_jobs_to_activate: int = 32, variables_to_fetch: List[str] = None,
timeout: int = 10000, max_jobs_to_activate: int = 32, variables_to_fetch: List[str] = None,
before: List = None, after: List = None):
super().__init__(before=before, after=after)

Expand Down
5 changes: 2 additions & 3 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from pyzeebe.task.task_status_controller import TaskStatusController


# TODO: Add support for async tasks
class ZeebeWorker(ZeebeDecoratorBase):
"""A zeebe worker that can connect to a zeebe instance and perform tasks."""

Expand All @@ -34,7 +33,7 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N

def work(self):
if len(self.tasks) > 0:
executor = ThreadPoolExecutor(max_workers=len(self.tasks))
executor = ThreadPoolExecutor(max_workers=len(self.tasks), thread_name_prefix='TASK_HANDLER_THREAD')
executor.map(self._handle_task, self.tasks)
executor.shutdown(wait=True)
else:
Expand All @@ -48,7 +47,7 @@ def _handle_task(self, task: Task):
self._handle_task_contexts(task)

def _handle_task_contexts(self, task: Task):
executor = ThreadPoolExecutor()
executor = ThreadPoolExecutor(thread_name_prefix='JOB_HANDLER_THREAD')
executor.map(task.handler, self._get_task_contexts(task))
executor.shutdown(wait=False) # Do not wait for tasks to finish

Expand Down

0 comments on commit bc64f94

Please sign in to comment.