Skip to content

Commit

Permalink
0.2.9: robustify _LoggingPipe and _BatchingPipe against several call …
Browse files Browse the repository at this point in the history
…after exhaustion
  • Loading branch information
ebonnal committed Aug 1, 2023
1 parent 9be5f25 commit 80eaddb
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
18 changes: 13 additions & 5 deletions kioss/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def __init__(self, iterator: Iterator[T], what: str) -> None:
self.errors_count = 0
self.last_log_at_yields_count = 0
self.start_time = time.time()
self._is_exhausted = False

def _log(self) -> None:
logging.info(
Expand All @@ -353,9 +354,12 @@ def _log(self) -> None:
)

def __next__(self) -> T:
if self._is_exhausted:
raise StopIteration
try:
elem = super().__next__()
except StopIteration:
self._is_exhausted = True
if self.yields_count != self.last_log_at_yields_count:
self._log()
raise
Expand Down Expand Up @@ -425,12 +429,15 @@ def __init__(self, iterator: Iterator[T], size: int, secs: float) -> None:
super().__init__(iterator)
self.size = size
self.secs = secs
self.to_be_raised: Exception = None
self._to_be_raised: Exception = None
self._is_exhausted = False

def __next__(self) -> List[T]:
if self.to_be_raised:
e = self.to_be_raised
self.to_be_raised = None
if self._is_exhausted:
raise StopIteration
if self._to_be_raised:
e = self._to_be_raised
self._to_be_raised = None
raise e
start_time = time.time()
batch = None
Expand All @@ -440,12 +447,13 @@ def __next__(self) -> List[T]:
batch.append(super().__next__())
return batch
except StopIteration:
self._is_exhausted = True
if batch:
return batch
raise
except Exception as e:
if batch:
self.to_be_raised = e
self._to_be_raised = e
return batch
raise

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='kioss',
version='0.2.8',
version='0.2.9',
packages=['kioss'],
url='http://github.com/bonnal-enzo/kioss',
license='Apache 2.',
Expand Down

0 comments on commit 80eaddb

Please sign in to comment.