Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol. #1008

Open
mjunaidca opened this issue May 18, 2024 · 4 comments

Comments

@mjunaidca
Copy link

Describe the bug
When trying to create a topic using AIOKafkaAdminClient, the following error is encountered:

IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol.

Expected behaviour
The topic should be created successfully without throwing an IncompatibleBrokerVersion error.

Environment (please complete the following information):

  • aiokafka version is 0.10.0 and I am using apache/kafka:3.7.0 docker hub image to run kafka broker
  • Kafka Broker version (kafka-topics.sh --version):
    3.7.0
  • Other information (Confluent Cloud version, etc.):
    apache/kafka:3.7.0

Here's the compose.yaml file:
https://github.com/mjunaidca/kafka-playground/blob/main/python-kafka/compose.yml

Reproducible example

import logging
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
from aiokafka.errors import KafkaError, TopicAlreadyExistsError

# Kafka settings (replace with actual values)
KAFKA_BOOTSTRAP_SERVER = 'broker:19092'

# Logging configuration
logging.basicConfig(level=logging.INFO)

async def startup_topic_event():
    admin_client = AIOKafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVER)
    try:
        topic1 = NewTopic(
            name="PET",
            num_partitions=3,
            replication_factor=1,
        )
        # Create topic if it doesn't exist
        await admin_client.create_topics(
            new_topics=[topic1], validate_only=False
        )
        logging.info(f"Topic 'PET' created successfully.")
    except TopicAlreadyExistsError:
        logging.info(f"Topic 'PET' already exists.")
    except KafkaError as e:
        logging.error(f"Failed to create topic 'PET': {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
    finally:
        await admin_client.close()

# Note: This function is not used in the current implementation due to the compatibility issue.
# The good thing is that the producer/consumer auto-creates the topic.

Additional context
The auto-creation of topics by producers/consumers works as expected. However, explicit creation using the admin client fails due to the broker version compatibility issue. This was tested with the Kafka broker version specified in the environment section.

@ods
Copy link
Collaborator

ods commented May 22, 2024

AIOKafkaAdminClient must be bootstrapped before use, the preferred way is

async with AIOKafkaAdminClient(...) as client:
    ...

@davidhuser
Copy link

davidhuser commented Jul 16, 2024

I have the same issue as OP. If I use the suggested context manager I get

TypeError: 'AIOKafkaAdminClient' object does not support the asynchronous context manager protocol

so I wrap it into a asynccontextmanager:

from contextlib import asynccontextmanager
from aiokafka.admin import AIOKafkaAdminClient, NewTopic

@asynccontextmanager
async def get_admin_client():
    admin_client = AIOKafkaAdminClient(
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
    )
    try:
        yield admin_client
    finally:
        await admin_client.close()


async def create_topic():
    async with get_admin_client() as admin_client:
        try:
            await admin_client.create_topics(
                [
                    NewTopic(
                        name='mytopic',
                        num_partitions=1,
                        replication_factor=1,
                    )
                ]
            )
        except Exception as e:
            print("oh no")

Still get the error as OP:

2024-07-16 15:34:23.329 | ERROR    | kafka_utils:create_topic:70 - Failed to create topic 'costs': IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol.

How can we create topics with aiokafka?


Version 0.11.0

@trentbitterman
Copy link

@davidhuser I had the same problem as you, but I realized I never called the admin_client.start method. Once I added that to my context manager function it started working. Hopefully that helps you too.

@asynccontextmanager
async def get_admin_client():
    admin_client = AIOKafkaAdminClient(
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
    )
    try:
        await admin_client.start()
        yield admin_client
    finally:
        await admin_client.close()

@davidhuser
Copy link

works now, thanks @trentbitterman !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants