Skip to content

Commit

Permalink
Fixes the Kafka provider's max message limit error (#32926) (#33321)
Browse files Browse the repository at this point in the history
* Fixes kafka provider failing reading messages

Fixes the issue(#32926) where kafka provider returns an
error when max messages is not set since it keeps
reading and messages left goes into negative.

Makes sure that the kafka provider works when
max messages isn't passed into the operator
  • Loading branch information
aritra24 committed Aug 12, 2023
1 parent d0c94d6 commit c9d0fcd
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
3 changes: 2 additions & 1 deletion airflow/providers/apache/kafka/operators/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ def execute(self, context) -> Any:
batch_size = self.max_batch_size

msgs = consumer.consume(num_messages=batch_size, timeout=self.poll_timeout)
messages_left -= len(msgs)
if not self.read_to_end:
messages_left -= len(msgs)

if not msgs: # No messages + messages_left is being used.
self.log.info("Reached end of log. Exiting.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def setup_method(self):
extra=json.dumps(
{
"socket.timeout.ms": 10,
"bootstrap.servers": "localhost:9092",
"bootstrap.servers": "broker:29092",
"group.id": f"operator.consumer.test.integration.test_{num}",
"enable.auto.commit": False,
"auto.offset.reset": "beginning",
Expand Down Expand Up @@ -135,7 +135,7 @@ def test_consumer_operator_test_3(self):
operator = ConsumeFromTopicOperator(
kafka_config_id=TOPIC,
topics=[TOPIC],
apply_function=_batch_tester,
apply_function_batch=_batch_tester,
apply_function_kwargs={"test_string": TOPIC},
task_id="test",
poll_timeout=0.0001,
Expand Down
29 changes: 29 additions & 0 deletions tests/providers/apache/kafka/operators/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import logging
from typing import Any
from unittest import mock

from airflow.models import Connection

Expand Down Expand Up @@ -79,3 +80,31 @@ def test_operator_callable(self):

# execute the operator (this is essentially a no op as the broker isn't setup)
operator.execute(context={})

@mock.patch("airflow.providers.apache.kafka.hooks.consume.KafkaConsumerHook.get_consumer")
def test_operator_consume_max(self, mock_get_consumer):
mock_consumer = mock.MagicMock()

mocked_messages = ["test_messages" for i in range(1001)]

def mock_consume(num_messages=0, timeout=-1):
nonlocal mocked_messages
if num_messages < 0:
raise Exception("Number of messages needs to be positive")
msg_count = min(num_messages, len(mocked_messages))
returned_messages = mocked_messages[:msg_count]
mocked_messages = mocked_messages[msg_count:]
return returned_messages

mock_consumer.consume = mock_consume
mock_get_consumer.return_value = mock_consumer

operator = ConsumeFromTopicOperator(
kafka_config_id="kafka_d",
topics=["test"],
task_id="test",
poll_timeout=0.0001,
)

# execute the operator (this is essentially a no op as we're mocking the consumer)
operator.execute(context={})

0 comments on commit c9d0fcd

Please sign in to comment.