Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_destroy doesn't seem to close the socket properly #5

Closed
paultrout opened this issue Mar 7, 2013 · 3 comments
Closed

rd_kafka_destroy doesn't seem to close the socket properly #5

paultrout opened this issue Mar 7, 2013 · 3 comments

Comments

@paultrout
Copy link

I'm using this library in the end of message callback of a milter application. I've got a JSON object of the email message, and I need to hand it off to Kafka before I delete it from the Postfix queue. Here is the code:

  rk=rd_kafka_new(RD_KAFKA_PRODUCER, kafka_url, NULL);
  if(rk != NULL) {
    rd_kafka_produce(rk,"Trout-test",0,RD_KAFKA_OP_F_FREE,scratch,strlen(scratch));
    fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - Sent %d bytes to Trout-test:0\n",(long int)syscall(SYS_gettid),time(NULL),strlen(scratch));
    while (rd_kafka_outq_len(rk) > 0) {                                    /* Pulled this from the rdkafka_example */
            usleep(50000);
    }
    fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - kafka outq is now 0\n",(long int)syscall(SYS_gettid),time(NULL));
    //free(scratch);                                                         /* the rd_kafka_produce call is freeing scratch (RD_KAFKA_OP_F_FREE) */
    usleep(500000);
    rd_kafka_destroy(rk);                                                  /* Destroy the Kafka handle */
  }

When I run this code, everything works fine, until I've sent in about 1000 messages through the MTA. At that point, the rd_kafka_new started to fail with this message: Failed to create inet socket: Too many open files

So I upped my open files with ulimit to a number greater than 200000 (I was sending in batches of 100000 messages), and then it started failing at around 30000 messages because there were no more ephemeral sockets available to make connections to the broker.

When I look at the source, I see the close call on the socket, but when I follow the execution with lsof, or just netstat, the sockets are all established. Am I using the rd_kafka_new, rd_kafka_produce, rd_kafka_destroy improperly (once for each message), or is this an actual problem?

Thank you,
Paul

@edenhill
Copy link
Contributor

edenhill commented Mar 8, 2013

The rd_kafka_t as created by rd_kafka_new*() is intended to be a persistent object.
So typically you create it on startup and hang on to it for the duration of the program execution.
Suggest you change your program accordingly.

This also means moving the rd_kafka_outq_len() wait loop to just before rd_kafka_destroy() in your program termination funciton, this is not something you should do per message as the produce API is intended to be asynchronous, not requiring the application to wait for messages to be sent during normal program execution.

Having said that, rdkafka shouldn't leak fds and I will fix that.

@paultrout
Copy link
Author

Thank you.  I will make that change and retest.

Paul

Magnus Edenhill notifications@github.com wrote:The rd_kafka_t as created by rd_kafka_new*() is intended to be a persistent object.
So typically you create it on startup and hang on to it for the duration of the program execution.
Suggest you change your program accordingly.

This also means moving the rd_kafka_outq_len() wait loop to just before rd_kafka_destroy() in your program termination funciton, this is not something you should do per message as the produce API is intended to be asynchronous, not requiring the application to wait for messages to be sent during normal program execution.

Having said that, rdkafka shouldn't leak fds and I will fix that.


Reply to this email directly or view it on GitHub.

@paultrout
Copy link
Author

I've made the changes and re-run my tests, and am no longer leaking fds/sockets.

Thanks for the help,
Paul

@winbatch winbatch mentioned this issue Feb 10, 2014
@jcalcote jcalcote mentioned this issue Jun 28, 2019
7 tasks
azat pushed a commit to azat-archive/librdkafka that referenced this issue Sep 7, 2022
…llback-fix

Trigger op callbacks regardless for unhandled types in consume_batch_queue() et.al.
azat added a commit to azat-archive/librdkafka that referenced this issue Feb 29, 2024
TSan report (founded by ClickHouse CI):

    Exception: Sanitizer assert found for instance �==================
    WARNING: ThreadSanitizer: data race (pid=1)
      Read of size 8 at 0x7b7800127158 by thread T987 (mutexes: read M0, write M1, write M2):
        #0 __tsan_memcpy <null> (clickhouse+0x74eebb7) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_avg_rollover build_docker/./contrib/librdkafka/src/rdavg.h:153:22 (clickhouse+0x1e39753b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#2 rd_kafka_stats_emit_avg build_docker/./contrib/librdkafka/src/rdkafka.c:1354:9 (clickhouse+0x1e39753b)
        confluentinc#3 rd_kafka_stats_emit_all build_docker/./contrib/librdkafka/src/rdkafka.c:1717:17 (clickhouse+0x1e395c8b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_stats_emit_tmr_cb build_docker/./contrib/librdkafka/src/rdkafka.c:1898:2 (clickhouse+0x1e395c8b)
        confluentinc#5 rd_kafka_timers_run build_docker/./contrib/librdkafka/src/rdkafka_timer.c:288:4 (clickhouse+0x1e46498a) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 rd_kafka_thread_main build_docker/./contrib/librdkafka/src/rdkafka.c:2021:3 (clickhouse+0x1e3919e9) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#7 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

      Previous write of size 8 at 0x7b7800127158 by thread T986:
        #0 rd_avg_calc build_docker/./contrib/librdkafka/src/rdavg.h:104:38 (clickhouse+0x1e37d71d) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_kafka_broker_timeout_scan build_docker/./contrib/librdkafka/src/rdkafka_broker.c:880:25 (clickhouse+0x1e37d71d)
        confluentinc#2 rd_kafka_broker_ops_io_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:3416:17 (clickhouse+0x1e37d71d)
        confluentinc#3 rd_kafka_broker_consumer_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:4975:17 (clickhouse+0x1e378e5e) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_broker_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5080:17 (clickhouse+0x1e378e5e)
        confluentinc#5 rd_kafka_broker_thread_main build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5237:25 (clickhouse+0x1e372619) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

Refs: ClickHouse/ClickHouse#60443
azat added a commit to azat-archive/librdkafka that referenced this issue Feb 29, 2024
TSan report (founded by ClickHouse CI):

    Exception: Sanitizer assert found for instance �==================
    WARNING: ThreadSanitizer: data race (pid=1)
      Read of size 8 at 0x7b7800127158 by thread T987 (mutexes: read M0, write M1, write M2):
        #0 __tsan_memcpy <null> (clickhouse+0x74eebb7) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_avg_rollover build_docker/./contrib/librdkafka/src/rdavg.h:153:22 (clickhouse+0x1e39753b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#2 rd_kafka_stats_emit_avg build_docker/./contrib/librdkafka/src/rdkafka.c:1354:9 (clickhouse+0x1e39753b)
        confluentinc#3 rd_kafka_stats_emit_all build_docker/./contrib/librdkafka/src/rdkafka.c:1717:17 (clickhouse+0x1e395c8b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_stats_emit_tmr_cb build_docker/./contrib/librdkafka/src/rdkafka.c:1898:2 (clickhouse+0x1e395c8b)
        confluentinc#5 rd_kafka_timers_run build_docker/./contrib/librdkafka/src/rdkafka_timer.c:288:4 (clickhouse+0x1e46498a) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 rd_kafka_thread_main build_docker/./contrib/librdkafka/src/rdkafka.c:2021:3 (clickhouse+0x1e3919e9) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#7 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

      Previous write of size 8 at 0x7b7800127158 by thread T986:
        #0 rd_avg_calc build_docker/./contrib/librdkafka/src/rdavg.h:104:38 (clickhouse+0x1e37d71d) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_kafka_broker_timeout_scan build_docker/./contrib/librdkafka/src/rdkafka_broker.c:880:25 (clickhouse+0x1e37d71d)
        confluentinc#2 rd_kafka_broker_ops_io_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:3416:17 (clickhouse+0x1e37d71d)
        confluentinc#3 rd_kafka_broker_consumer_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:4975:17 (clickhouse+0x1e378e5e) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_broker_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5080:17 (clickhouse+0x1e378e5e)
        confluentinc#5 rd_kafka_broker_thread_main build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5237:25 (clickhouse+0x1e372619) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

Refs: ClickHouse/ClickHouse#60443
kwdubuc pushed a commit to SolaceDev/librdkafka that referenced this issue Apr 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants