Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatadogMonitor - IndexError: deque index out of range #113

Closed
pwilczynskiclearcode opened this issue Feb 25, 2021 · 3 comments
Closed

DatadogMonitor - IndexError: deque index out of range #113

pwilczynskiclearcode opened this issue Feb 25, 2021 · 3 comments

Comments

@pwilczynskiclearcode
Copy link
Contributor

pwilczynskiclearcode commented Feb 25, 2021

Steps to reproduce

import faust
from faust.sensors.datadog import DatadogMonitor

app = faust.App(..., monitor=DatadogMonitor(prefix="faustapp"))

process a stream with manual acknowledgement:

@app.agent(users_topic)
async def process_users(users_stream):
    async for event in users_stream.noack().events():
        ...
        await users_stream.ack(event)
        yield None

Expected behavior

stream to be processed without issues

Actual behavior

Agent crashes and restarts

Full traceback

[^----Agent*: __main__.process_users]: Crashed reason="IndexError('deque index out of range')"
IndexError: deque index out of range
  File "faust/agents/agent.py", line 688, in _execute_actor
    await coro
  File "faust/agents/agent.py", line 710, in _slurp
    async for value in it:
  File "faust.py", line 458, in process_users
    await process_users.ack(event)
  File "faust/streams.py", line 961, in ack
    self._on_stream_event_out(tp, offset, self, event)
  File "faust/sensors/base.py", line 196, in on_stream_event_out
    sensor.on_stream_event_out(
  File "faust/sensors/datadog.py", line 186, in on_stream_event_out
    self.secs_to_ms(self.events_runtime[-1]),

Versions

  • Python version 3.8
  • Faust version 0.5.2
  • Operating system alpine 3.12.1
  • Kafka version 2.4.1.1
  • RocksDB version (if applicable) -not used
pwilczynskiclearcode pushed a commit to pwilczynskiclearcode/faust that referenced this issue Mar 5, 2021
pwilczynskiclearcode pushed a commit to pwilczynskiclearcode/faust that referenced this issue Mar 5, 2021
@pwilczynskiclearcode
Copy link
Contributor Author

class Stream(StreamT[T_co], Service):
    ....

    async def _py_aiter(self) -> AsyncIterator[T_co]:
       ...

        try:
            while not self.should_stop:
                event = None
                do_ack = self.enable_acks  # set to False to not ack event.
                # wait for next message
                value: Any = None
                # we iterate until on_merge gives value.
                while value is None and event is None:
                    ...

                    if isinstance(channel_value, event_cls):
                       ...

                        # call Sensors
                        sensor_state = on_stream_event_in(tp, offset, self, event)
                       ...
                     ...
                if value is skipped_value:
                    continue
                self.events_total += 1
                try:
                    yield value
                finally:
                    self.current_event = None
                    if do_ack and event is not None:
                        # This inlines self.ack
                        last_stream_to_ack = event.ack()
                        message = event.message
                        tp = event.message.tp
                        offset = event.message.offset
                        on_stream_event_out(tp, offset, self, event, sensor_state)
                        ...

    ...

    async def ack(self, event: EventT) -> bool:
        ...
        # WARNING: This function is duplicated in __aiter__
        last_stream_to_ack = event.ack()
        message = event.message
        tp = message.tp
        offset = message.offset
        self._on_stream_event_out(tp, offset, self, event)
        ...

When calling Stream.ack() manually don't have access to sensor_state which is different to _py_aiter and _c_aiter which propagate sensor_state to on_stream_event_out.

@pwilczynskiclearcode
Copy link
Contributor Author

#119 partial fix

pwilczynskiclearcode pushed a commit to pwilczynskiclearcode/faust that referenced this issue Mar 5, 2021
patkivikram added a commit that referenced this issue Mar 12, 2021
…be called with no state (#119)

Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
@pwilczynskiclearcode
Copy link
Contributor Author

I think that the issue in general still requires proper solution. #119 is a fix preventing exceptions but full solution is to write the code in a way that manually acknowledged events produce datadog/statsd/prometheus metrics correctly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants