Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaTimeoutError #1184

Closed
hugoren opened this issue Aug 25, 2017 · 15 comments
Closed

KafkaTimeoutError #1184

hugoren opened this issue Aug 25, 2017 · 15 comments

Comments

@hugoren
Copy link

hugoren commented Aug 25, 2017

the err:
KafkaTimeoutError: Batch for TopicPartition(topic='test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time

my code:

def kafka_producer():
     producer = KafkaProducer(bootstrap_servers='{0}:{1}'.format(HOST, PORT),
                         api_version=(0, 10, 1)
                         )
   try:
       future = producer.send('test', b'hello kafka')
       producer.flush()
       record_metadata = future.get(timeout=10)
       print(record_metadata)
except KafkaError as e:
       print(e)    
@dpkp dpkp closed this as completed Oct 22, 2017
@dpkp
Copy link
Owner

dpkp commented Oct 22, 2017

Yes, KafkaTimeout is an exception you may get when using the asynchronous producer.

@tylerjharden
Copy link

@dpkp That's great and all, but how do you resolve it / why does it occur?

@dpkp
Copy link
Owner

dpkp commented Jan 23, 2018

I'm not sure what is causing your error. You'll have to check your error logs, both producer and server. Something is preventing your producer from successfully sending the message to your cluster. Perhaps you have a network partition? Maybe a broker went down and new leadership election failed?

@skskcco2o17
Copy link

skskcco2o17 commented May 15, 2020

I am facing the same issue. My Kafka Broker (v 2.4.0) is running in OpenShift container. My Python producer code is similar to the above and that is causing the same issue.

KafkaTimeoutError: Batch for TopicPartition(topic='my_test_topic', partition=0) containing 1 record(s) expired: 31 seconds have passed since batch creation plus linger time
   File "C:\Users\XXXXXXX\Anaconda3\lib\site-packages\kafka\producer\future.py", line 65, in get
    raise self.exception # pylint: disable-msg=raising-bad-type

My kafka-python version is 2.0.1

I am fighting on the issue for last two days. I am frustrated. Please assist how do you resolve it / why does it occur?

I have tested using the Kafka sh script to send the topic it's working. So where is the problem in python?

@sushen-indifi
Copy link

sushen-indifi commented May 18, 2020

Having the same issue

KafkaTimeoutError: Batch for TopicPartition(topic='test', partition=0) containing 1 record(s) expired: 30 seconds have passed since last append

I am able to connect, as the below function is returning True
producer.bootstrap_connected()

Also able to get topics but not send any.

@skskcco2o17
Copy link

Why Kafka-python team is not responding this sever issue, which lot's of development team are facing? If team is not supporting then how we rely and use this library to the enterprise application for commercialization?

@tvoinarovskyi
Copy link
Collaborator

tvoinarovskyi commented May 26, 2020

@skskcco2o17 Kafka-python is supported by volunteers and does not have any commercial structure funding the project, so we all do it in our free time. If you need production enterprise support - there are a lot of companies (like Confluent) providing it. Please be patient and feel free to ask questions outside this issue tracker (for example on Stackoverflow) if you need community help with an issue.

@0Jvang
Copy link

0Jvang commented Dec 3, 2020

i solve it

@0Jvang
Copy link

0Jvang commented Dec 3, 2020

My Kafka is deployed in a Docker container, and the container's network mode is the bridge,The host and container use port mappings,and i I changed the default port to 9102 for the Kafka server.

The configuration items in server.properties to solve the problem are these two:
listeners
advertised.listeners

I tried several combinations:
success:

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

server can't start:

listeners=PLAINTEXT://192.168.0.136:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

timeout error:

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://:9102

@AprilJoy
Copy link

you can add the param in KafkaProducer:
request_timeout_ms = 60000

@Anthony0722
Copy link

I hava the same issue,and solve it, my problem is use ip to connect kafka, but to the server turn into host, so I add the map between ip and host in /etc/hosts in your pc,then you can try again, good luck

@jonaslimads
Copy link

I had this same issue while using Kafka through Docker.
As others had pointed out, I needed to set advertised.listeners:

For docker-compose.yml service, the env var is:

- KAFKA_ADVERTISED_LISTENERS=INSIDE://192.168.0.9:9092,OUTSIDE://192.168.0.9:9094

This host IP (192.168.0.9) was found through:

HOST_IP=$(ip -o route get to 8.8.8.8 | sed -n 's/.*src \([0-9.]\+\).*/\1/p')

@AlexBlack2202
Copy link

My Kafka is deployed in a Docker container, and the container's network mode is the bridge,The host and container use port mappings,and i I changed the default port to 9102 for the Kafka server.

The configuration items in server.properties to solve the problem are these two:
listeners
advertised.listeners

I tried several combinations:
success:

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

server can't start:

listeners=PLAINTEXT://192.168.0.136:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

timeout error:

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://:9102

This work for me. Thank.

@changjiangbailang
Copy link

you can retry send 10 times or more times, to ensure send success。

def send_message(producer_topic, msg):
    cnt = 0
    while cnt < 10:
        try:
             future = producer.send('test', b'hello kafka')
             record_metadata = future.get(timeout=10)
             break
        except Exception as e:
              time.sleep(1)
              cnt = cnt + 1
              continue

def kafka_producer():
      producer = KafkaProducer(bootstrap_servers='{0}:{1}'.format(HOST, PORT), api_version=(0, 10, 1))
      send_message('test', b'hello kafka')

@ankitjgd
Copy link

ankitjgd commented Jun 26, 2022

Solved it.
OS: Mac OS
My docker file:

version: '3'
services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    ports:
      - "9093:9093"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=EXTERNAL://:9093,CLIENT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT 
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  
  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka

Actually, I was not using external & internal client variables in the composer file. So either Kafka was working with UI or was working with python producer. To work with both you need to provide both listeners.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests