Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set length of header value to 0 if None #2004

Merged
merged 1 commit into from
Mar 2, 2020

Conversation

kvfi
Copy link
Contributor

@kvfi kvfi commented Feb 24, 2020

This pull request adds a check when consuming messages containing a header whose value is None. In that case, it sets the length to 0.


This change is Reviewable

@kvfi kvfi requested a review from dpkp February 25, 2020 08:53
@jeffwidman
Copy link
Collaborator

When this fix isn't in, what happens? Do you get a None or a Exception or something else?

@kvfi
Copy link
Contributor Author

kvfi commented Feb 29, 2020

Indeed I get an Exception because of None being passed to len(). I'm not at the office so I can't paste a proper log.

@jeffwidman
Copy link
Collaborator

I think this is probably good to merge, but first I'd appreciate if you can post a stacktrace when you get into the office on Monday... I just want to see the code flow here to doublecheck.

@tvoinarovskyi
Copy link
Collaborator

@jeffwidman Seems legit, it's not possible with kafka-python, because we explicitly assert types in headers, but if I send the message from aiokafka and try to get it using Kafka-Python it is possible to get this behaviour:

Producer:

from aiokafka import AIOKafkaProducer
import asyncio

loop = asyncio.get_event_loop()

async def send_one():
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers='localhost:9092')
    # Get cluster layout and topic/partition allocation
    await producer.start()
    try:
        # Produce messages
        res = await producer.send_and_wait(
            "test-topic", b"Super message", headers=[("header", None)])
        print(res)
    finally:
        await producer.stop()

loop.run_until_complete(send_one())

Consumer:

from kafka import KafkaConsumer

cc = KafkaConsumer(bootstrap_servers="localhost:9092")
cc.subscribe(["test-topic"])
for msg in cc:
    print(msg)

From the Kafka protocol perspective, this is a valid case, as BYTES can be encoded using null string and value is defined as BYTES. Keys should not be a problem, as it is not correct to have null strings. See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java - it defines a strict requirement for the key to be not null, but not value

Copy link
Owner

@dpkp dpkp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- thanks!

@kvfi
Copy link
Contributor Author

kvfi commented Mar 2, 2020

Here you go:

Traceback (most recent call last):
  File "$PATH/consume.py", line 6, in <module>
    for message in consumer:
  File "$PATH\venv\lib\site-packages\kafka\consumer\group.py", line 1192, in __next__
    return self.next_v2()
  File "$PATH\venv\lib\site-packages\kafka\consumer\group.py", line 1200, in next_v2
    return next(self._iterator)
  File "$PATH\venv\lib\site-packages\kafka\consumer\group.py", line 1115, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "$PATH\venv\lib\site-packages\kafka\consumer\group.py", line 654, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "$PATH\venv\lib\site-packages\kafka\consumer\group.py", line 707, in _poll_once
    records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
  File "$PATH\venv\lib\site-packages\kafka\consumer\fetcher.py", line 344, in fetched_records
    self._next_partition_records = self._parse_fetched_data(completion)
  File "$PATH\venv\lib\site-packages\kafka\consumer\fetcher.py", line 816, in _parse_fetched_data
    unpacked = list(self._unpack_message_set(tp, records))
  File "$PATH\venv\lib\site-packages\kafka\consumer\fetcher.py", line 477, in _unpack_message_set
    header_size = sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1
  File "$PATH\venv\lib\site-packages\kafka\consumer\fetcher.py", line 477, in <genexpr>
    header_size = sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1
TypeError: object of type 'NoneType' has no len()

Process finished with exit code 1

And here's the debugged headers variable:

headers = {list: 9} [('axon-metadata-traceId', b'8a9ea22d-b03c-47da-bead-e3e206c53daa'), ('axon-metadata-correlationId', b'8a9ea22d-b03c-47da-bead-e3e206c53daa'), ('axon-message-id', b'fbb90aed-1df3-446d-852d-71a5db698d7d'), ('axon-message-aggregate-seq', b'\x00\x00\x00\x00\x
 0 = {tuple: 2} ('axon-metadata-traceId', b'8a9ea22d-b03c-47da-bead-e3e206c53daa')
 1 = {tuple: 2} ('axon-metadata-correlationId', b'8a9ea22d-b03c-47da-bead-e3e206c53daa')
 2 = {tuple: 2} ('axon-message-id', b'fbb90aed-1df3-446d-852d-71a5db698d7d')
 3 = {tuple: 2} ('axon-message-aggregate-seq', b'XXXX')
 4 = {tuple: 2} ('axon-message-aggregate-type', b'SwapAggregate')
 5 = {tuple: 2} ('axon-message-revision', None)
 6 = {tuple: 2} ('axon-message-timestamp', b'XXXX')
 7 = {tuple: 2} ('axon-message-type', b'.com.example')
 8 = {tuple: 2} ('axon-message-aggregate-id', b'6b5baaea-898b-40c6-899d-17d8f8356612')
 __len__ = {int} 9

@jeffwidman
Copy link
Collaborator

jeffwidman commented Mar 2, 2020

From the Kafka protocol perspective, this is a valid case, as BYTES can be encoded using null string and value is defined as BYTES. Keys should not be a problem, as it is not correct to have null strings. See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java - it defines a strict requirement for the key to be not null, but not value

Thanks @tvoinarovskyi for this research. That was the crux of what I was wondering... should we be preventing this from ever being None in the first place. (I just didn't quite realize that's what I was hesitant about until I read your post). Given that it's a valid value, looks like we should merge this.

@jeffwidman jeffwidman merged commit d1dfb6d into dpkp:master Mar 2, 2020
@kvfi kvfi deleted the fix-undefined-header-value branch March 3, 2020 09:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants