Skip to content

Commit

Permalink
Take with timestamp (#242)
Browse files Browse the repository at this point in the history
* add full_take

* remove processor remove

* try to add kafka_timestamp

* tweaks

* add

* add extra param

* add offset

* remove offset in this pr

* fix formatting

* add functional tests

* add another unit test and edge case handling

* change buffer type...

Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
  • Loading branch information
nicor88 and patkivikram committed Dec 17, 2021
1 parent 8d8971a commit a44fb6b
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 0 deletions.
99 changes: 99 additions & 0 deletions faust/streams.py
Expand Up @@ -392,6 +392,105 @@ async def add_to_buffer(value: T) -> T:
self.enable_acks = stream_enable_acks
self._processors.remove(add_to_buffer)

async def take_with_timestamp(
self, max_: int, within: Seconds, timestamp_field_name: str
) -> AsyncIterable[Sequence[T_co]]:
"""Buffer n values at a time and yield a list of buffered values with the timestamp
when the message was added to kafka.
Arguments:
max_: Max number of messages to receive. When more than this
number of messages are received within the specified number of
seconds then we flush the buffer immediately.
within: Timeout for when we give up waiting for another value,
and process the values we have.
Warning: If there's no timeout (i.e. `timeout=None`),
the agent is likely to stall and block buffered events for an
unreasonable length of time(!).
timestamp_field_name: the name of the field containing kafka timestamp,
that is going to be added to the value
"""
buffer: List[T_co] = []
events: List[EventT] = []
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

buffer_consuming: Optional[asyncio.Future] = None

channel_it = aiter(self.channel)

# We add this processor to populate the buffer, and the stream
# is passively consumed in the background (enable_passive below).
async def add_to_buffer(value: T) -> T:
try:
# buffer_consuming is set when consuming buffer after timeout.
nonlocal buffer_consuming
if buffer_consuming is not None:
try:
await buffer_consuming
finally:
buffer_consuming = None
event = self.current_event
if isinstance(value, dict) and timestamp_field_name:
value[timestamp_field_name] = event.message.timestamp
buffer_add(value)
if event is None:
raise RuntimeError("Take buffer found current_event is None")
event_add(event)
if buffer_size() >= max_:
# signal that the buffer is full and should be emptied.
buffer_full.set()
# strict wait for buffer to be consumed after buffer full.
# If max is 1000, we are not allowed to return 1001 values.
buffer_consumed.clear()
await self.wait(buffer_consumed)
except CancelledError: # pragma: no cover
raise
except Exception as exc:
self.log.exception("Error adding to take buffer: %r", exc)
await self.crash(exc)
return value

# Disable acks to ensure this method acks manually
# events only after they are consumed by the user
self.enable_acks = False

self.add_processor(add_to_buffer)
self._enable_passive(cast(ChannelT, channel_it))
try:
while not self.should_stop:
# wait until buffer full, or timeout
await self.wait_for_stopped(buffer_full, timeout=timeout)
if buffer:
# make sure background thread does not add new items to
# buffer while we read.
buffer_consuming = self.loop.create_future()
try:
yield list(buffer)
finally:
buffer.clear()
for event in events:
await self.ack(event)
events.clear()
# allow writing to buffer again
notify(buffer_consuming)
buffer_full.clear()
buffer_consumed.set()
else: # pragma: no cover
pass
else: # pragma: no cover
pass

finally:
# Restore last behaviour of "enable_acks"
self.enable_acks = stream_enable_acks
self._processors.remove(add_to_buffer)

def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:
"""Enumerate values received on this stream.
Expand Down
117 changes: 117 additions & 0 deletions tests/functional/test_streams.py
Expand Up @@ -777,3 +777,120 @@ def current_event(self, event):
assert isinstance(s._crash_reason, RuntimeError)
print("RETURNING")
assert s.enable_acks is True


@pytest.mark.asyncio
async def test_take_wit_timestamp(app):
async with new_stream(app) as s:
assert s.enable_acks is True
await s.channel.send(value={"id": 1})
event = None
async for value in s.take_with_timestamp(
1, within=1, timestamp_field_name="test_timestamp"
):
assert "test_timestamp" in value[0].keys()
assert isinstance(value[0]["test_timestamp"], float)
assert s.enable_acks is False
event = mock_stream_event_ack(s)
break

assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0)
await asyncio.sleep(0)

if not event.ack.called:
assert event.message.acked
assert not event.message.refcount
assert s.enable_acks is True


@pytest.mark.asyncio
async def test_take_wit_timestamp_wit_simple_value(app):
async with new_stream(app) as s:
assert s.enable_acks is True
await s.channel.send(value=1)
event = None
async for value in s.take_with_timestamp(
1, within=1, timestamp_field_name="test_timestamp"
):
assert value == [1]
assert s.enable_acks is False
event = mock_stream_event_ack(s)
break

assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0)
await asyncio.sleep(0)

if not event.ack.called:
assert event.message.acked
assert not event.message.refcount
assert s.enable_acks is True


@pytest.mark.asyncio
async def test_take_wit_timestamp_without_timestamp_field(app):
async with new_stream(app) as s:
assert s.enable_acks is True
await s.channel.send(value=1)
event = None
async for value in s.take_with_timestamp(
1, within=1, timestamp_field_name=None
):
assert value == [1]
assert s.enable_acks is False
event = mock_stream_event_ack(s)
break

assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0)
await asyncio.sleep(0)

if not event.ack.called:
assert event.message.acked
assert not event.message.refcount
assert s.enable_acks is True


@pytest.mark.asyncio
async def test_take_wit_timestamp__5(app, loop):
s = new_stream(app)
async with s:
assert s.enable_acks is True
for i in range(5):
await s.channel.send(value={"id": i})

event = None
buffer_processor = s.take_with_timestamp(
5, within=10.0, timestamp_field_name="test_timestamp"
)
async for batch in buffer_processor:
assert len(batch) == 5
assert all("test_timestamp" in _m.keys() for _m in batch)
assert s.enable_acks is False

event = mock_stream_event_ack(s)
break

try:
await buffer_processor.athrow(asyncio.CancelledError())
except asyncio.CancelledError:
pass

assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason

if not event.ack.called:
assert event.message.acked
assert not event.message.refcount
assert s.enable_acks is True

0 comments on commit a44fb6b

Please sign in to comment.