Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for outdated sensors after rebalancing #72

Conversation

krzysieksulejczak
Copy link
Contributor

@krzysieksulejczak krzysieksulejczak commented Jan 13, 2021

Fix for improper sensors after rebalancing.

I use faust-streaming with prometheus sensors. When my workers get rebalanced they report metrics for topics and/or topic-partitions they do not handle anymore and I end up with multiple metrics in Prometheus for the same labels, some of them stalled.

@patkivikram patkivikram merged commit 0a4d059 into faust-streaming:master Jan 14, 2021
@austinnichols101
Copy link
Contributor

@krzysieksulejczak - two issues:

  1. This line in my code now throws an error:
    app.monitor = PrometheusMonitor(app, pattern='/metrics')
  File "/home/mdyer/test-faust//app.py", line 113, in <module>
    app.monitor = PrometheusMonitor(app, pattern='/metrics')
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/sensors/prometheus.py", line 308, in __init__
    super().__init__(**kwargs)
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/sensors/monitor.py", line 301, in __init__
    Service.__init__(self, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'pattern'

I'm able to work around the problem by removing pattern

  1. After removing the pattern I receive the following trace:
2021-01-15 15:41:49,947] [14716] [ERROR] [^-App]: Error while stopping child <Consumer: crashed >: AttributeError("'App' object has no attribute 'consumer_commit_latency'") 
Traceback (most recent call last):
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/mode/services.py", line 886, in _default_stop_children
    await asyncio.shield(child.stop())
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/mode/services.py", line 862, in stop
    await self.on_stop()
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 248, in on_stop
    await super().on_stop()
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/transport/consumer.py", line 817, in on_stop
    await self.commit_and_end_transactions()
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/transport/consumer.py", line 810, in commit_and_end_transactions
    await self.commit(start_new_transaction=False)
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/transport/consumer.py", line 865, in commit
    return await self.force_commit(
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/mode/services.py", line 460, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/transport/consumer.py", line 904, in force_commit
    self.app.sensors.on_commit_completed(self, sensor_state)
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/sensors/base.py", line 234, in on_commit_completed
    sensor.on_commit_completed(consumer, state[sensor])
  File "/home/mdyer/.virtualenvs/test-faust/lib/python3.8/site-packages/faust/sensors/prometheus.py", line 391, in on_commit_completed
    self._metrics.consumer_commit_latency.observe(
AttributeError: 'App' object has no attribute 'consumer_commit_latency'
[2021-01-15 15:41:49,948] [14716] [INFO] [^--Web]: Stopping... 

Works fine with 0.4.0

@krzysieksulejczak
Copy link
Contributor Author

@austinnichols101
Hi,

use the following code to setup the sensors:

from faust.sensors.prometheus import setup_prometheus_sensors
app = faust.App('example', broker='kafka://')
setup_prometheus_sensors(app, pattern='/metrics')

I should have prepared compatibility helpers between old and new code. Sorry for inconvenience.
I will fix it in separate PR.

cheers

@austinnichols101
Copy link
Contributor

@krzysieksulejczak - Using setup_prometheus_sensors worked for me. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants