Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_metadata can cause a UNKNOWN_TOPIC_OR_PART message #4589

Closed
5 of 7 tasks
sshanks-kx opened this issue Jan 23, 2024 · 10 comments
Closed
5 of 7 tasks

rd_kafka_metadata can cause a UNKNOWN_TOPIC_OR_PART message #4589

sshanks-kx opened this issue Jan 23, 2024 · 10 comments
Labels

Comments

@sshanks-kx
Copy link

sshanks-kx commented Jan 23, 2024

Description

Using a client with rd_kafka_queue_io_event_enable that is a consumer using rd_kafka_subscribe.

This works fine until metadata is requested via rd_kafka_metadata.

If the metadata is requested any time after making the subscription, the poll returns a message with error set to UNKNOWN_TOPIC_OR_PART (offset -1001) for every topic subscribed to (although the metadata returns correct results and the subscription continues getting messages).

If the metadata is requested prior to making any subscription, I dont get an error message from the poll.

How to reproduce

example client program (given topic exists called topic1 with partition 0, broker localhost:9092) purely to show the error message.

gcc main.c -lrdkafka -lz -lpthread -lssl

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <librdkafka/rdkafka.h>

int main(int argc,char** argv){
    rd_kafka_t *rk=0;
    rd_kafka_conf_t *conf=0;
    char *brokers = "localhost:9092";
    rd_kafka_queue_t *queue=0;
    rd_kafka_resp_err_t err=0;
    rd_kafka_topic_partition_list_t *t_partition;
    rd_kafka_message_t *msg;
    char errstr[512];
    int spair[2];

    if(pipe(spair)==-1){
        fprintf(stderr, "pipe fail\n");
        exit(1);
    }

    conf = rd_kafka_conf_new();
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (rd_kafka_conf_set(conf, "group.id","0", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (rd_kafka_conf_set(conf, "debug","all", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr)))) {
      fprintf(stderr,"Failed to create new consumer: %s\n",errstr);
      exit(1);
    }

    err=rd_kafka_poll_set_consumer(rk);
    if(RD_KAFKA_RESP_ERR_NO_ERROR != err){
        fprintf(stderr,"rd_kafka_poll_set_consumer err %s\n",rd_kafka_err2str(err));
        exit(1);
    }
    queue = rd_kafka_queue_get_consumer(rk);
    if (!queue){
        fprintf(stderr, "queue fail\n");
        exit(1);
    }
    rd_kafka_queue_io_event_enable(queue,spair[1],"X",1);
    rd_kafka_set_log_queue(rk,NULL);

    ///////// subscribe
    printf("Subscribing...\n");
    t_partition = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(t_partition,"test1",RD_KAFKA_PARTITION_UA);
    if(RD_KAFKA_RESP_ERR_NO_ERROR != (err= rd_kafka_subscribe(rk, t_partition))){
        fprintf(stderr,"rd_kafka_subscribe err %s\n",rd_kafka_err2str(err));
        exit(1);
    }
    rd_kafka_topic_partition_list_destroy(t_partition);

    //////// metadata (comment out this section to prevent error)
    {
        printf("Get metadata...\n");
        const struct rd_kafka_metadata *metadata;
        err = rd_kafka_metadata(rk, 1, 0, &metadata,5000);
          if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
             fprintf(stderr,"Failed to acquire metadata: %s\n",rd_kafka_err2str(err));
             exit(1);
        }
        rd_kafka_metadata_destroy(metadata);
    }

    ///////// poll
    printf("Poll...\n");
    while((msg= rd_kafka_consumer_poll(rk, 10000))) {
        if (msg->err)
            fprintf(stderr,"Got error msg %d (%s) [topic %s offset %lld]\n",msg->err,rd_kafka_err2name(msg->err),rd_kafka_topic_name(msg->rkt),msg->offset);
        else
            fprintf(stderr,"Got msg [topic %s offset %lld]\n",rd_kafka_topic_name(msg->rkt),msg->offset);
        rd_kafka_message_destroy(msg);
    }

    printf("Finished...\n");

    return 1;
}

outputs

Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]
Finished...

if you edit the code to put the metadata request before the subscription, the error msg no longer appears.
The debug log gets a TOPICERR entry only when metadata requested after subscription.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): 2.3.0
  • Apache Kafka version: 7.0.1
  • librdkafka client configuration: see example above
  • Operating system: OSX(x64)>
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@sshanks-kx
Copy link
Author

good.log
bad.log

bad.log - running with debug on with metadata request after subscription (gives error msg)
good.log - running with debug on, with metadata request before subscription (no error msg)

@sshanks-kx
Copy link
Author

On our program that wraps librdkafka, you can see this bad msg appearing prior to the successful subscription messages (first msg only appears when requesting metadata after subscription call) e.g.

mtype                 topic client partition offset msgtime                       data                                                                        key      headers                 rcvtime                      
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
UNKNOWN_TOPIC_OR_PART test1 0      0         -1001                                "Subscribed topic not available: test1: Broker: Unknown topic or partition" `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:53.817149000
                      test1 0      0         0      2024.01.23D16:02:24.373000000 "2024.01.23D16:02:24.373020000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045098000
                      test1 0      0         1      2024.01.23D16:02:26.614000000 "2024.01.23D16:02:26.614183000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045232000
                      test1 0      0         2      2024.01.23D16:02:28.616000000 "2024.01.23D16:02:28.614183000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045352000
                      test1 0      0         3      2024.01.23D16:02:30.616000000 "2024.01.23D16:02:30.614183000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.046034000

@sshanks-kx
Copy link
Author

This seems to happens every time when using rd_kafka_metadata after a valid subscription is made to an existing topic, where rd_kafka_queue_io_event_enable is used enable notifications (I need to use the file descriptor type notifications of items appearing in the queue).

On a similar app that just does publishing, it doesnt seem to occur when rd_kafka_metadata called after publishing.
When trying with an example subscription program that doesnt use rd_kafka_queue_io_event_enable, it doesnt seem to happen either.

@emasab
Copy link
Collaborator

emasab commented Mar 26, 2024

I think this due to a bug discovered recently, affecting 2.1.0+ versions.
When you subscribe a full metadata request is done. This request was emptying the metadata cache and causing those errors.

return rd_kafka_metadata_refresh_all(rk, rkb, reason);

It's fixed in this PR:
#4660

could you test if it solves your problem?

@emasab emasab added the bug label Mar 26, 2024
@sshanks-kx
Copy link
Author

Thanks, will let you know either way, once Ive tried.

@sshanks-kx
Copy link
Author

@emasab Bug still occurring when testing with #4660

Running my full program I still see the issue.

Running with test program above, it still shows issue e.g.

Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]

Still same effect, that if I move the rd_kafka_metadata before the rd_kafka_subscribe, you dont see the problem...but doing a rd_kafka_metadata after a rd_kafka_subscribe always shows the problem.

@sshanks-kx
Copy link
Author

@emasab FYI: I tried librdkafka v2.0.2 & I dont see the bug occurring with that version.

@emasab
Copy link
Collaborator

emasab commented Mar 28, 2024

Could you check latest pushed commit in same PR? I've reproduced and fixed your case, will add a new test too.

What happens is that when you call subscribe, the cache isn't populated until it joins the group, but if in the meantime it receives application metadata calls, it shouldn't update the group, because that causes this error.

@emasab
Copy link
Collaborator

emasab commented Mar 28, 2024

@emasab FYI: I tried librdkafka v2.0.2 & I dont see the bug occurring with that version.

Yeah, there were changes with KIP 320 in 2.1.0

@sshanks-kx
Copy link
Author

@emasab looks ok with latest change on PR , have tested again with original app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants