Skip to content

Commit

Permalink
0.2.8: clarify _LoggingPipe logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Aug 1, 2023
1 parent 21fd769 commit 9be5f25
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
12 changes: 7 additions & 5 deletions kioss/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,20 @@ def __next__(self) -> T:
try:
elem = super().__next__()
except StopIteration:
if self.yields_count % 2:
if self.yields_count != self.last_log_at_yields_count:
self._log()
raise

self.yields_count += 1
if isinstance(elem, Exception):
self.errors_count += 1

if self.yields_count >= 2 * self.last_log_at_yields_count:
if (
self.yields_count == 1
or self.yields_count == 2 * self.last_log_at_yields_count
):
self._log()
self.last_log_at_yields_count = self.yields_count + self.errors_count
self.last_log_at_yields_count = self.yields_count

return elem

Expand Down Expand Up @@ -417,6 +420,7 @@ class _BatchingPipe(Pipe[List[T]]):
- the time elapsed between the first next() call on input iterator and last received elements is grater than secs
- the next element reception thrown an exception (it is stored in self.to_be_raised and will be raised during the next call to self.__next__)
"""

def __init__(self, iterator: Iterator[T], size: int, secs: float) -> None:
super().__init__(iterator)
self.size = size
Expand Down Expand Up @@ -446,8 +450,6 @@ def __next__(self) -> List[T]:
raise




class _ConcurrentlyMergingPipe(Pipe[T]):
MAX_NUM_WAITING_ELEMS_PER_THREAD = 16

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='kioss',
version='0.2.7',
version='0.2.8',
packages=['kioss'],
url='http://github.com/bonnal-enzo/kioss',
license='Apache 2.',
Expand Down
31 changes: 25 additions & 6 deletions test/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,35 @@ def test_batch(self):
self.assertEqual(len(Pipe(range(8)).slow(10).batch(secs=0.09).collect()), 7)
# assert batch gracefully yields if next elem throw exception
self.assertListEqual(
Pipe("01234-56789").map(int).batch(2).catch(ValueError, ignore=True).collect(),
[[0, 1], [2, 3], [4], [5, 6], [7, 8], [9]]
Pipe("01234-56789")
.map(int)
.batch(2)
.catch(ValueError, ignore=True)
.collect(),
[[0, 1], [2, 3], [4], [5, 6], [7, 8], [9]],
)
self.assertListEqual(
Pipe("0123-56789").map(int).batch(2).catch(ValueError, ignore=True).collect(),
[[0, 1], [2, 3], [5, 6], [7, 8], [9]]
Pipe("0123-56789")
.map(int)
.batch(2)
.catch(ValueError, ignore=True)
.collect(),
[[0, 1], [2, 3], [5, 6], [7, 8], [9]],
)
self.assertListEqual(
Pipe("0123-56789").map(int).batch(2).catch(ValueError, ignore=False).map(lambda potential_error: [potential_error] if isinstance(potential_error, Exception) else potential_error).flatten().map(type).collect(),
[int, int, int, int, ValueError, int, int, int, int, int]
Pipe("0123-56789")
.map(int)
.batch(2)
.catch(ValueError, ignore=False)
.map(
lambda potential_error: [potential_error]
if isinstance(potential_error, Exception)
else potential_error
)
.flatten()
.map(type)
.collect(),
[int, int, int, int, ValueError, int, int, int, int, int],
)

@parameterized.expand(
Expand Down

0 comments on commit 9be5f25

Please sign in to comment.