Skip to content

Commit

Permalink
chore(parallel_collect): Allow importing ParallelCollectStep (#43)
Browse files Browse the repository at this point in the history
Since the multi storage consumer has its own factory we need to export
ParallelCollectStep. Also addressed the comments from the previous
PR #41
  • Loading branch information
nikhars committed Mar 4, 2022
1 parent 2e7ce26 commit 4bc2e95
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
3 changes: 2 additions & 1 deletion arroyo/processing/strategies/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from .collect import CollectStep
from .collect import CollectStep, ParallelCollectStep
from .factory import KafkaConsumerStrategyFactory
from .filter import FilterStep
from .transform import ParallelTransformStep, TransformStep

__all__ = [
"CollectStep",
"ParallelCollectStep",
"FilterStep",
"ParallelTransformStep",
"TransformStep",
Expand Down
5 changes: 4 additions & 1 deletion arroyo/processing/strategies/streaming/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,13 @@ def __finish_batch(batch: Batch[TPayload]) -> None:
logger.info("Completed processing %r.", batch)

def join(self, timeout: Optional[float] = None) -> None:
work_time = 0.0
# We should finish the previous batch before proceeding to the finish the existing one.
if self.future is not None:
previous_time = time.time()
self.future.result(timeout)
work_time = time.time() - previous_time

self.__threadpool.shutdown()

super().join(timeout)
super().join((timeout - work_time) if timeout else None)
6 changes: 3 additions & 3 deletions arroyo/processing/strategies/streaming/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
input_block_size: Optional[int],
output_block_size: Optional[int],
initialize_parallel_transform: Optional[Callable[[], None]] = None,
parallel_collect_step: Optional[bool] = False,
parallel_collect: bool = False,
) -> None:
self.__prefilter = prefilter
self.__process_message = process_message
Expand All @@ -82,7 +82,7 @@ def __init__(
self.__input_block_size = input_block_size
self.__output_block_size = output_block_size
self.__initialize_parallel_transform = initialize_parallel_transform
self.__parallel_collect_step = parallel_collect_step
self.__parallel_collect = parallel_collect

def __should_accept(self, message: Message[TPayload]) -> bool:
assert self.__prefilter is not None
Expand All @@ -95,7 +95,7 @@ def create(
ParallelCollectStep(
self.__collector, commit, self.__max_batch_size, self.__max_batch_time
)
if self.__parallel_collect_step
if self.__parallel_collect
else CollectStep(
self.__collector,
commit,
Expand Down
2 changes: 1 addition & 1 deletion tests/processing/strategies/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def create_step_factory() -> ProcessingStrategy[int]:

class ByPassProcessingStep(ProcessingStrategy[int]):
"""
ProcessingStep implementation that acquires a lock when join is called to mimic a long wait.
ProcessingStep implementation that does nothing
"""

def submit(self, message: Message[int]) -> None:
Expand Down

0 comments on commit 4bc2e95

Please sign in to comment.