Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmentation fault happens when trying to commit a message after the max.poll.interval.ms passed without polling #1707

Open
6 tasks done
abdallahashraf22 opened this issue Feb 6, 2024 · 3 comments · May be fixed by #1754
Labels
bug usage Incorrect usage

Comments

@abdallahashraf22
Copy link

abdallahashraf22 commented Feb 6, 2024

Description

when trying to commit a message after not polling for anything more than the max.poll.interval.ms, I'm getting a non-recoverable segmentation fault that I can't handle in an exception handler, causing the python interpreter to exit

How to reproduce

consumer = Consumer({
                "bootstrap.servers": cls._bootstrap_servers,
                "group.id": group_id,
                "enable.auto.commit": False,
                "auto.offset.reset": "earliest"
            })
consumer.subscribe([topic_name])
while True:
       logger.info("polling...")
       message = consumer.poll(timeout=10.0)
       if message is None:
            continue
      consumer.pause(consumer.assignment())
      logger.info("consumer has been paused")
      try:
          output = message.value()
          logger.info(f"received message from kafka {output}")
          output = json.loads(output)
          logger.info("-------Consumed Message------")
          logger.info(output)
          processing_output_function(output)    # this processing should take more than the max.poll.interval.ms, which would be 5 minutes by default
     except TypeError as e:
          logger.error(f"json type error: {e}")
     except json.decoder.JSONDecodeError as e:
          logger.error(f"json decode error: {e}")
     except Exception as e:
          logger.error(f"JAIS General Exception: {e}")
     finally:
          logger.info("committing message")
          consumer.commit(message)
          logger.info("message has been committed")
          consumer.resume(consumer.assignment())
          logger.info("consumer has been resumed")

in the section above, if processing_output_function(output) takes more than the max.poll.interval.ms, that particular loop with end correctly, commit the message, and then on the next one, I consume a message that says

Application maximum poll interval (300000ms) exceeded by 231ms

, decoding this fails with " json.decoder.JSONDecodeError" exception, when going to finally commit the message, the logs says
"commiting message"
segmentation fault

and I don't reach the part about
"message has been commited"

I'm not sure if this is a working as intended situation, but it seems weird that it will stop my pthon execution and with no way to handle an exit exception
for now I increased the polling max time but can fetch logs later on if requested

Checklist

Please provide the following information:

  • confluent-kafka-python: ('2.3.0', 33751040)
  • librdkafka version: ('2.3.0', 33751295)
  • Apache Kafka broker version: 2.13-2.8.1
  • Client configuration: {...}: "enable.auto.commit": False, "auto.offset.reset": "earliest"
  • Operating system: linux
  • Critical issue: yes I think
@abdallahashraf22
Copy link
Author

abdallahashraf22 commented Feb 8, 2024

confirmed the problem not from anything regarding the processing output function, as reproduced the problem with time.sleep

config = {
        "bootstrap.servers": "some host",
        "group.id": "some group id",
        "enable.auto.commit": False,
        "auto.offset.reset": "earliest",
        "max.poll.interval.ms": 60000,
    }
consumer = Consumer(config)
consumer.subscribe(["some_topic"])
while True:
   logger.info("polling...")
   message = consumer.poll(timeout=10.0)
   if message is None:
       continue
   consumer.pause(consumer.assignment())
   logger.info("consumer has been paused")
   try:
       output = message.value()
       logger.info(f"received message from kafka {output}")
       output = json.loads(output)
       if isinstance(output, dict) and output.get("task") == "sleep":
            time.sleep(65)
       logger.info("-------Consumed Message------")
       logger.info(output)
   except TypeError as e:
        logger.error(f"json type error: {e}")
        except json.decoder.JSONDecodeError as e:
        logger.error(f"json decode error: {e}")
        except Exception as e:
        logger.error(f"General Exception: {e}")
   finally:
        logger.info("committing message")
        consumer.commit(message)
        logger.info("message has been committed")
        consumer.resume(consumer.assignment())
        logger.info("consumer has been resumed")

logs

INFO:     __main__|polling...
INFO:     __main__|polling...
INFO:     __main__|polling...
INFO:     __main__|consumer has been paused
INFO:     __main__|received message from kafka b'{'id': 'testing 2', 'task': 'sleep', 'text': 'something'}'
INFO:     __main__|committing message
INFO:     __main__|message has been committed
INFO:     __main__|received message from kafka b'Application maximum poll interval (60000ms) exceeded by 500ms'
ERROR:    __main__|json decode error: Expecting value: line 1 column 1 (char 0)
INFO:     __main__|:committing message
Segmentation fault (core dumped)

@pranavrth pranavrth added the investigate further It's unclear what the issue is at this time but there is enough interest to look into it label Feb 26, 2024
@pranavrth
Copy link
Member

pranavrth commented May 27, 2024

I am able to reproduce this issue.

Output of the message consumer.poll(timeout=10.0) can be a valid message or can contain an error. In your case, it contains an error and you are trying to commit that error message. Error messages doesn't have valid topic name or offset and hence it is giving SegFault when trying to commit.

Check valid usage in Consumer example

    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

@pranavrth pranavrth added usage Incorrect usage and removed investigate further It's unclear what the issue is at this time but there is enough interest to look into it labels May 27, 2024
@pranavrth
Copy link
Member

I think we shouldn't through SegFault in this case even though this is not the correct usage.

@pranavrth pranavrth added the bug label May 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug usage Incorrect usage
Projects
None yet
2 participants