Skip to content

Faust commits the wrong offset in case of a gap in acks #312

@ekerstens

Description

@ekerstens

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

  • Create an agent consuming from a topic defined like
counter = 0
async def test_event_loop2(stream: Stream) -> AsyncIterable[None]:
    async for event in stream.noack().events():
        counter += 1
        print(event.message.refcount)
        print(event.message.offset)
        print(app.consumer._acked)
        if not event.message.offset % 5 == 0:
            event.ack()
        elif event.message.offset % 10 == 0: 
            breakpoint()
        
        if counter == 30:
            return
  • Acked messages should appear in app.consumer._acked, but with gaps for the uncommitted offsets every 5th offset
  • The consumer will continue to commit offsets, despite the skipped messages.

Expected behavior

Faust should be able to identify a gap between the committed offset and the first acked message to avoid skipping messages. Per the documentation in consumer._new_offset

    # We iterate over it until we find a gap
    # then return the offset before that.
    # For example if acked[tp] is:
    #   1 2 3 4 5 6 7 8 9
    # the return value will be: 10
    # If acked[tp] is:
    #  34 35 36 40 41 42 43 44
    #          ^--- gap
    # the return value will be: 37

Since the above scenario, we never ack every 5th message, the offset should never continue as there is a gap.

Actual behavior

  • Suppose app.consumer._acked == [1,2,3,4,6,7]
  • When Consumer._new_offset is called for the first time, a gap is identified at 5 and so offset 5 is committed.
  • After this, app.consumer._acked == [6,7]
  • Even if no more messages are consumed or acked, when Consumer._new_offset is called next, there is no gap, so the consumer commits offset 8.
  • However, this is incorrect because in reality, there is still a gap when you consider the current committed offset.

Versions

  • Python version: Python 3.8.12
  • Faust version: faust-streaming==0.8.4

Traceback/Logs

──────────┴────────┘
2022/05/31 17:15:19.639 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.640 WARNI module=mode.redirect.logging.write Message=2781
2022/05/31 17:15:19.640 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {})
2022/05/31 17:15:19.643 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.643 WARNI module=mode.redirect.logging.write Message=2782
2022/05/31 17:15:19.643 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2781]})
2022/05/31 17:15:19.644 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2783   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:15:19.646 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.646 WARNI module=mode.redirect.logging.write Message=2783
2022/05/31 17:15:19.647 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): []})
2022/05/31 17:15:19.648 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.648 WARNI module=mode.redirect.logging.write Message=2784
2022/05/31 17:15:19.648 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783]})
2022/05/31 17:15:19.649 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.650 WARNI module=mode.redirect.logging.write Message=2785
2022/05/31 17:15:19.650 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784]})
2022/05/31 17:15:19.651 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.651 WARNI module=mode.redirect.logging.write Message=2786
2022/05/31 17:15:19.651 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784]})
2022/05/31 17:15:19.652 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.653 WARNI module=mode.redirect.logging.write Message=2787
2022/05/31 17:15:19.653 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786]})
2022/05/31 17:15:19.824 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.825 WARNI module=mode.redirect.logging.write Message=2788
2022/05/31 17:15:19.825 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787]})
2022/05/31 17:15:19.827 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.827 WARNI module=mode.redirect.logging.write Message=2789
2022/05/31 17:15:19.827 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787, 2788]})
2022/05/31 17:15:19.829 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:19.829 WARNI module=mode.redirect.logging.write Message=2790
2022/05/31 17:15:19.829 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787, 2788, 2789]})
2022/05/31 17:15:19.835 WARNI module=mode.redirect.logging.write Message=> test.py(222)test_event_loop2()
-> if self.counter == 30:
2022/05/31 17:15:19.835 WARNI module=mode.redirect.logging.write Message=(Pdb)
c
2022/05/31 17:15:50.917 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.917 WARNI module=mode.redirect.logging.write Message=2791
2022/05/31 17:15:50.917 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2783, 2784, 2786, 2787, 2788, 2789]})
2022/05/31 17:15:50.919 INFO  module=mode.timers.timers.tick Message=Timer Monitor.sampler woke up too late, with a drift of +31.028453718987294 runtime=1.8850085325539112e-05 sleeptime=32.02845371898729
2022/05/31 17:15:50.921 INFO  module=mode.timers.timers.tick Message=Timer vap_kafka_event.faust.app.VapFaustApp.monitor_reoccuring_stats woke up too late, with a drift of +30.052488100947812 runtime=0.0016850839601829648 sleeptime=40.05248810094781
2022/05/31 17:15:50.921 INFO  module=mode.timers.timers.tick Message=Timer Recovery.stats woke up too late, with a drift of +29.30016235099174 runtime=7.025082595646381e-06 sleeptime=34.30016235099174
2022/05/31 17:15:50.922 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2785   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:15:50.924 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.924 WARNI module=mode.redirect.logging.write Message=2792
2022/05/31 17:15:50.924 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791]})
2022/05/31 17:15:50.925 INFO  module=mode.timers.timers.tick Message=Timer livelock woke up too late, with a drift of +27.104460972943343 runtime=0.0026684380136430264 sleeptime=34.10446097294334
2022/05/31 17:15:50.925 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.925 WARNI module=mode.redirect.logging.write Message=2793
2022/05/31 17:15:50.926 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792]})
2022/05/31 17:15:50.927 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.927 WARNI module=mode.redirect.logging.write Message=2794
2022/05/31 17:15:50.927 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793]})
2022/05/31 17:15:50.928 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.928 WARNI module=mode.redirect.logging.write Message=2795
2022/05/31 17:15:50.928 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794]})
2022/05/31 17:15:50.929 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.929 WARNI module=mode.redirect.logging.write Message=2796
2022/05/31 17:15:50.929 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794]})
2022/05/31 17:15:50.930 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.931 WARNI module=mode.redirect.logging.write Message=2797
2022/05/31 17:15:50.931 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796]})
2022/05/31 17:15:50.932 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.932 WARNI module=mode.redirect.logging.write Message=2798
2022/05/31 17:15:50.932 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797]})
2022/05/31 17:15:50.933 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.933 WARNI module=mode.redirect.logging.write Message=2799
2022/05/31 17:15:50.933 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798]})
2022/05/31 17:15:50.934 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=2800
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799]})
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=> test.py(222)test_event_loop2()
-> if self.counter == 30:
2022/05/31 17:15:50.935 WARNI module=mode.redirect.logging.write Message=(Pdb)
c
2022/05/31 17:16:10.286 INFO  module=mode.timers.timers.tick Message=Timer commit woke up too late, with a drift of +28.317679641931317 runtime=19.386381829041056 sleeptime=31.117679641931318
2022/05/31 17:16:10.286 WARNI module=mode.timers.timers.tick Message=Timer commit is overlapping (interval=2.8 runtime=19.386381829041056)
2022/05/31 17:16:10.288 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.288 WARNI module=mode.redirect.logging.write Message=2801
2022/05/31 17:16:10.288 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799]})
2022/05/31 17:16:10.290 INFO  module=mode.timers.timers.tick Message=Timer Monitor.sampler woke up too late, with a drift of +18.39256372093223 runtime=1.7469050362706184e-05 sleeptime=19.39256372093223
2022/05/31 17:16:10.290 INFO  module=mode.timers.timers.tick Message=Timer Recovery.stats woke up too late, with a drift of +14.390604294952936 runtime=6.500980816781521e-06 sleeptime=19.390604294952936
2022/05/31 17:16:10.293 INFO  module=mode.timers.timers.tick Message=Timer vap_kafka_event.faust.app.VapFaustApp.monitor_reoccuring_stats woke up too late, with a drift of +9.391143504995853 runtime=0.003148776013404131 sleeptime=19.391143504995853
2022/05/31 17:16:10.294 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.294 WARNI module=mode.redirect.logging.write Message=2802
2022/05/31 17:16:10.294 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801]})
2022/05/31 17:16:10.296 INFO  module=mode.timers.timers.tick Message=Timer livelock woke up too late, with a drift of +12.38731794198975 runtime=0.0053144460543990135 sleeptime=19.38731794198975
2022/05/31 17:16:10.296 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.296 WARNI module=mode.redirect.logging.write Message=2803
2022/05/31 17:16:10.296 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802]})
2022/05/31 17:16:10.298 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.298 WARNI module=mode.redirect.logging.write Message=2804
2022/05/31 17:16:10.298 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803]})
2022/05/31 17:16:10.299 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.299 WARNI module=mode.redirect.logging.write Message=2805
2022/05/31 17:16:10.300 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804]})
2022/05/31 17:16:10.301 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.301 WARNI module=mode.redirect.logging.write Message=2806
2022/05/31 17:16:10.301 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804]})
2022/05/31 17:16:10.303 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.303 WARNI module=mode.redirect.logging.write Message=2807
2022/05/31 17:16:10.303 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806]})
2022/05/31 17:16:10.304 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.305 WARNI module=mode.redirect.logging.write Message=2808
2022/05/31 17:16:10.305 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806, 2807]})
2022/05/31 17:16:10.306 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.306 WARNI module=mode.redirect.logging.write Message=2809
2022/05/31 17:16:10.306 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806, 2807, 2808]})
2022/05/31 17:16:10.307 WARNI module=mode.redirect.logging.write Message=1
2022/05/31 17:16:10.308 WARNI module=mode.redirect.logging.write Message=2810
2022/05/31 17:16:10.308 WARNI module=mode.redirect.logging.write Message=defaultdict(<class 'list'>, {TopicPartition(topic='example_topic', partition=0): [2786, 2787, 2788, 2789, 2791, 2792, 2793, 2794, 2796, 2797, 2798, 2799, 2801, 2802, 2803, 2804, 2806, 2807, 2808, 2809]})
2022/05/31 17:16:10.308 WARNI module=mode.redirect.logging.write Message=> test.py(222)test_event_loop2()
-> if self.counter == 30:
2022/05/31 17:16:10.309 WARNI module=mode.redirect.logging.write Message=(Pdb)
c
2022/05/31 17:16:30.163 INFO  module=mode.timers.timers.tick Message=Timer Monitor.sampler woke up too late, with a drift of +18.872945420909673 runtime=1.928501296788454e-05 sleeptime=19.872945420909673
2022/05/31 17:16:30.163 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2790   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:30.164 INFO  module=mode.timers.timers.tick Message=Timer Recovery.stats woke up too late, with a drift of +14.873592204065062 runtime=6.855931133031845e-06 sleeptime=19.873592204065062
2022/05/31 17:16:30.167 INFO  module=mode.timers.timers.tick Message=Timer vap_kafka_event.faust.app.VapFaustApp.monitor_reoccuring_stats woke up too late, with a drift of +9.87149757402949 runtime=0.002534606959670782 sleeptime=19.87149757402949
2022/05/31 17:16:30.168 INFO  module=aiokafka.conn.conn._do_sasl_handshake Message=Authenticated as X3OXJPRL4A7DJDAV via PLAIN
2022/05/31 17:16:30.169 INFO  module=mode.timers.timers.tick Message=Timer livelock woke up too late, with a drift of +12.869328050990589 runtime=0.0044637060491368175 sleeptime=19.86932805099059
2022/05/31 17:16:30.281 INFO  module=mode.timers.timers.tick Message=Timer commit woke up too late, with a drift of +17.07697979202494 runtime=0.11823615396860987 sleeptime=19.87697979202494
2022/05/31 17:16:32.983 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2795   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:35.971 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2800   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:38.740 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2805   │
└────────────────────────────────────────────────────────────┴────────┘
2022/05/31 17:16:41.737 INFO  module=faust.transport.drivers.aiokafka.consumer._commit_offsets Message=[^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────┬────────┐
│ TP                                                         │ Offset │
├────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='example_topic', partition=0) │ 2810   │
└────────────────────────────────────────────────────────────┴────────┘

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions