Skip to content

Commit

Permalink
fix(reduce): Add missing call to next_step.terminate() (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker authored Jul 11, 2023
1 parent 0d690f0 commit 73bb12d
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 5 deletions.
3 changes: 1 addition & 2 deletions arroyo/processing/strategies/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
TStrategyPayload,
Value,
)

from arroyo.utils.metrics import get_metrics

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -118,4 +117,4 @@ def terminate(self) -> None:
def join(self, timeout: Optional[float] = None) -> None:
self.__flush_uncommitted_offsets(time.time())
self.__next_step.close()
self.__next_step.join(timeout)
self.__next_step.join(timeout=timeout)
2 changes: 1 addition & 1 deletion arroyo/processing/strategies/guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def poll(self) -> None:
self.__next_step.poll()

def join(self, timeout: Optional[float] = None) -> None:
self.__next_step.join(timeout)
self.__next_step.join(timeout=timeout)

def close(self) -> None:
self.__next_step.close()
Expand Down
1 change: 1 addition & 0 deletions arroyo/processing/strategies/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def close(self) -> None:
def terminate(self) -> None:
self.__closed = True
self.__batch_builder = None
self.__next_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion arroyo/processing/strategies/run_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def poll(self) -> None:
self.__next_step.poll()

def join(self, timeout: Optional[float] = None) -> None:
self.__next_step.join(timeout)
self.__next_step.join(timeout=timeout)

def close(self) -> None:
self.__next_step.close()
Expand Down
2 changes: 1 addition & 1 deletion arroyo/processing/strategies/run_task_in_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def join(self, timeout: Optional[float] = None) -> None:

self.__next_step.close()
self.__executor.shutdown()
self.__next_step.join(timeout)
self.__next_step.join(timeout=timeout)

def close(self) -> None:
self.__closed = True
Expand Down
23 changes: 23 additions & 0 deletions tests/processing/strategies/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,26 @@ def protected_call(method_name: str, *args: Any) -> bool:
)
for partition, offset, type_ in message_pattern
]


@pytest.mark.parametrize("strategy_factory", FACTORIES)
def test_terminate(strategy_factory: StrategyFactory) -> None:
next_step = Mock()

step = strategy_factory(next_step)
step.terminate()

assert next_step.terminate.call_args_list == [call()]


@pytest.mark.parametrize("strategy_factory", FACTORIES)
def test_join(strategy_factory: StrategyFactory) -> None:
next_step = Mock()

step = strategy_factory(next_step)
step.close()
step.join()

assert next_step.close.call_args_list == [call()]

assert next_step.join.call_args_list == [call(timeout=None)]

0 comments on commit 73bb12d

Please sign in to comment.