diff --git a/examples/describe_topics.c b/examples/describe_topics.c index cf38a70e21..5b7425ef8c 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -255,7 +255,7 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc, fprintf(stderr, "No matching topics found\n"); return 1; } else { - fprintf(stderr, "No topics in cluster\n"); + fprintf(stderr, "No topics requested\n"); } } @@ -281,7 +281,7 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { rd_kafka_error_t *error; int retval = 0; int topics_cnt = 0; - const int min_argc = 2; + const int min_argc = 1; int include_topic_authorized_operations; if (argc < min_argc) diff --git a/examples/list_offsets.c b/examples/list_offsets.c index d01c975030..f84c11c121 100644 --- a/examples/list_offsets.c +++ b/examples/list_offsets.c @@ -125,12 +125,21 @@ static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { * @brief Print list offsets result information. */ static int -print_list_offsets_result_info(const rd_kafka_ListOffsets_result_t *result) { +print_list_offsets_result_info(const rd_kafka_ListOffsets_result_t *result, + int req_cnt) { const rd_kafka_ListOffsetsResultInfo_t **result_infos; size_t cnt; size_t i; result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); printf("ListOffsets results:\n"); + if (cnt == 0) { + if (req_cnt > 0) { + fprintf(stderr, "No matching partitions found\n"); + return 1; + } else { + fprintf(stderr, "No partitions requested\n"); + } + } for (i = 0; i < cnt; i++) { const rd_kafka_topic_partition_t *topic_partition = rd_kafka_ListOffsetsResultInfo_topic_partition( @@ -177,7 +186,8 @@ static void cmd_list_offsets(rd_kafka_conf_t *conf, int argc, char **argv) { rd_kafka_event_t *event = NULL; rd_kafka_error_t *error = NULL; int i; - int retval = 0; + int retval = 0; + int partitions = 0; rd_kafka_topic_partition_list_t *rktpars; if ((argc - 1) % 3 != 0) { @@ -193,6 +203,7 @@ static void cmd_list_offsets(rd_kafka_conf_t *conf, int argc, char **argv) { rktpars, argv[i], parse_int("partition", argv[i + 1])) ->offset = parse_int("offset", argv[i + 2]); } + partitions = rktpars->cnt; /* * Create consumer instance @@ -254,7 +265,7 @@ static void cmd_list_offsets(rd_kafka_conf_t *conf, int argc, char **argv) { * partitions may have errors. */ const rd_kafka_ListOffsets_result_t *result; result = rd_kafka_event_ListOffsets_result(event); - retval = print_list_offsets_result_info(result); + retval = print_list_offsets_result_info(result, partitions); } diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 388a7fd24b..4184d1cdc6 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -4450,28 +4450,44 @@ void rd_kafka_ListOffsets(rd_kafka_t *rk, rd_kafka_admin_request_op_result_cb_set( rko_fanout, rd_kafka_ListOffsets_handle_result); - if (topic_partitions->cnt == 0) { - rd_kafka_admin_result_fail( - rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, - "At least one partition is required"); - goto err; - } + if (topic_partitions->cnt) { + for (i = 0; i < topic_partitions->cnt; i++) { + if (!topic_partitions->elems[i].topic[0]) { + rd_kafka_admin_result_fail( + rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Partition topic name at index %d must be " + "non-empty", + i); + goto err; + } + if (topic_partitions->elems[i].partition < 0) { + rd_kafka_admin_result_fail( + rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Partition at index %d cannot be negative", + i); + goto err; + } + } - topic_partitions_sorted = rd_list_new( - topic_partitions->cnt, rd_kafka_topic_partition_destroy_free); - for (i = 0; i < topic_partitions->cnt; i++) - rd_list_add( - topic_partitions_sorted, - rd_kafka_topic_partition_copy(&topic_partitions->elems[i])); - rd_list_sort(topic_partitions_sorted, rd_kafka_topic_partition_cmp); - if (rd_list_find_duplicate(topic_partitions_sorted, - rd_kafka_topic_partition_cmp)) { + topic_partitions_sorted = + rd_list_new(topic_partitions->cnt, + rd_kafka_topic_partition_destroy_free); + for (i = 0; i < topic_partitions->cnt; i++) + rd_list_add(topic_partitions_sorted, + rd_kafka_topic_partition_copy( + &topic_partitions->elems[i])); - rd_kafka_admin_result_fail( - rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, - "Partitions must not contain duplicates"); - goto err; + rd_list_sort(topic_partitions_sorted, + rd_kafka_topic_partition_cmp); + if (rd_list_find_duplicate(topic_partitions_sorted, + rd_kafka_topic_partition_cmp)) { + + rd_kafka_admin_result_fail( + rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Partitions must not contain duplicates"); + goto err; + } } for (i = 0; i < topic_partitions->cnt; i++) { @@ -4493,14 +4509,24 @@ void rd_kafka_ListOffsets(rd_kafka_t *rk, rd_list_add(&rko_fanout->rko_u.admin_request.args, copied_topic_partitions); - /* Async query for partition leaders */ - rd_kafka_topic_partition_list_query_leaders_async( - rk, copied_topic_partitions, - rd_kafka_admin_timeout_remains(rko_fanout), - RD_KAFKA_REPLYQ(rk->rk_ops, 0), - rd_kafka_ListOffsets_leaders_queried_cb, rko_fanout); + if (topic_partitions->cnt) { + /* Async query for partition leaders */ + rd_kafka_topic_partition_list_query_leaders_async( + rk, copied_topic_partitions, + rd_kafka_admin_timeout_remains(rko_fanout), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_ListOffsets_leaders_queried_cb, rko_fanout); + } else { + /* Empty list */ + rd_kafka_op_t *rko_result = + rd_kafka_admin_result_new(rko_fanout); + /* Enqueue empty result on application queue, we're done. */ + rd_kafka_admin_result_enq(rko_fanout, rko_result); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + } - rd_list_destroy(topic_partitions_sorted); + RD_IF_FREE(topic_partitions_sorted, rd_list_destroy); return; err: RD_IF_FREE(topic_partitions_sorted, rd_list_destroy); @@ -8721,39 +8747,60 @@ void rd_kafka_DescribeTopics(rd_kafka_t *rk, rk, RD_KAFKA_OP_DESCRIBETOPICS, RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT, &cbs, options, rkqu->rkqu_q); - if (topics->topics_cnt == 0) { - rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, - "No topics to describe"); - rd_kafka_admin_common_worker_destroy(rk, rko, - rd_true /*destroy*/); - return; - } - rd_list_init(&rko->rko_u.admin_request.args, (int)topics->topics_cnt, rd_free); for (i = 0; i < topics->topics_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_strdup(topics->topics[i])); - /* Check for duplicates. - * Make a temporary copy of the topic list and sort it to check for - * duplicates, we don't want the original list sorted since we want - * to maintain ordering. */ - rd_list_init(&dup_list, rd_list_cnt(&rko->rko_u.admin_request.args), - NULL); - rd_list_copy_to(&dup_list, &rko->rko_u.admin_request.args, NULL, NULL); - rd_list_sort(&dup_list, rd_kafka_DescribeTopics_cmp); - if (rd_list_find_duplicate(&dup_list, rd_kafka_DescribeTopics_cmp)) { + if (rd_list_cnt(&rko->rko_u.admin_request.args)) { + int j; + char *topic_name; + /* Check for duplicates. + * Make a temporary copy of the topic list and sort it to check + * for duplicates, we don't want the original list sorted since + * we want to maintain ordering. */ + rd_list_init(&dup_list, + rd_list_cnt(&rko->rko_u.admin_request.args), NULL); + rd_list_copy_to(&dup_list, &rko->rko_u.admin_request.args, NULL, + NULL); + rd_list_sort(&dup_list, rd_kafka_DescribeTopics_cmp); + if (rd_list_find_duplicate(&dup_list, + rd_kafka_DescribeTopics_cmp)) { + rd_list_destroy(&dup_list); + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate topics not allowed"); + rd_kafka_admin_common_worker_destroy( + rk, rko, rd_true /*destroy*/); + return; + } + + /* Check for empty topics. */ + RD_LIST_FOREACH(topic_name, &rko->rko_u.admin_request.args, j) { + if (!topic_name[0]) { + rd_list_destroy(&dup_list); + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Empty topic name at index %d isn't " + "allowed", + j); + rd_kafka_admin_common_worker_destroy( + rk, rko, rd_true /*destroy*/); + return; + } + } + rd_list_destroy(&dup_list); - rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, - "Duplicate topics not allowed"); + rd_kafka_q_enq(rk->rk_ops, rko); + } else { + /* Empty list */ + rd_kafka_op_t *rko_result = rd_kafka_admin_result_new(rko); + /* Enqueue empty result on application queue, we're done. */ + rd_kafka_admin_result_enq(rko, rko_result); rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); - return; } - - rd_list_destroy(&dup_list); - rd_kafka_q_enq(rk->rk_ops, rko); } /**@}*/ diff --git a/tests/0081-admin.c b/tests/0081-admin.c index f788983986..0690217a3c 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3160,7 +3160,7 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_queue_t *q; #define TEST_DESCRIBE_TOPICS_CNT 3 char *topic_names[TEST_DESCRIBE_TOPICS_CNT]; - rd_kafka_TopicCollection_t *topics; + rd_kafka_TopicCollection_t *topics, *empty_topics; rd_kafka_AdminOptions_t *options; rd_kafka_event_t *rkev; const rd_kafka_error_t *error; @@ -3197,11 +3197,11 @@ static void do_test_DescribeTopics(const char *what, } topics = rd_kafka_TopicCollection_of_topic_names( (const char **)topic_names, TEST_DESCRIBE_TOPICS_CNT); + empty_topics = rd_kafka_TopicCollection_of_topic_names(NULL, 0); test_CreateTopics_simple(rk, NULL, topic_names, 1, 1, NULL); test_wait_topic_exists(rk, topic_names[0], 10000); - /* Call DescribeTopics. */ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBETOPICS); TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( @@ -3210,10 +3210,40 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_AdminOptions_set_include_authorized_operations( options, include_authorized_operations)); - TIMING_START(&timing, "DescribeTopics"); + /* Call DescribeTopics with empty topics. */ + TIMING_START(&timing, "DescribeTopics empty"); + rd_kafka_DescribeTopics(rk, empty_topics, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* Check DescribeTopics results. */ + rkev = test_wait_admin_result(q, RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT, + tmout_multip(20 * 1000)); + TEST_ASSERT(rkev, "Expected DescribeTopicsResult on queue"); + + /* Extract result. */ + res = rd_kafka_event_DescribeTopics_result(rkev); + TEST_ASSERT(res, "Expected DescribeTopics result, not %s", + rd_kafka_event_name(rkev)); + + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(!err, "Expected success, not %s: %s", + rd_kafka_err2name(err), errstr2); + + result_topics = + rd_kafka_DescribeTopics_result_topics(res, &result_topics_cnt); + + /* Check no result is received. */ + TEST_ASSERT((int)result_topics_cnt == 0, + "Expected 0 topics in result, got %d", + (int)result_topics_cnt); + + rd_kafka_event_destroy(rkev); + + /* Call DescribeTopics with all of them. */ + TIMING_START(&timing, "DescribeTopics all"); rd_kafka_DescribeTopics(rk, topics, options, q); TIMING_ASSERT_LATER(&timing, 0, 50); - rd_kafka_AdminOptions_destroy(options); /* Check DescribeTopics results. */ rkev = test_wait_admin_result(q, RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT, @@ -3302,6 +3332,7 @@ static void do_test_DescribeTopics(const char *what, "Authorized operations should be NULL when not requested"); } + rd_kafka_AdminOptions_destroy(options); rd_kafka_event_destroy(rkev); /* If we don't have authentication/authorization set up in our @@ -3410,6 +3441,7 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_queue_destroy(q); rd_kafka_TopicCollection_destroy(topics); + rd_kafka_TopicCollection_destroy(empty_topics); TEST_LATER_CHECK(); @@ -4946,8 +4978,11 @@ static void do_test_ListOffsets(const char *what, rd_kafka_event_t *event; rd_kafka_queue_t *q; rd_kafka_t *p; - size_t i = 0; - rd_kafka_topic_partition_list_t *topic_partitions; + size_t i = 0, cnt = 0; + rd_kafka_topic_partition_list_t *topic_partitions, + *empty_topic_partitions; + const rd_kafka_ListOffsets_result_t *result; + const rd_kafka_ListOffsetsResultInfo_t **result_infos; int64_t basetimestamp = 10000000; int64_t timestamps[] = { basetimestamp + 100, @@ -5011,9 +5046,29 @@ static void do_test_ListOffsets(const char *what, TEST_CALL_ERROR__(rd_kafka_AdminOptions_set_isolation_level( options, RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED)); - topic_partitions = rd_kafka_topic_partition_list_new(1); + topic_partitions = rd_kafka_topic_partition_list_new(1); + empty_topic_partitions = rd_kafka_topic_partition_list_new(0); rd_kafka_topic_partition_list_add(topic_partitions, topic, 0); + /* Call ListOffsets with empty partition list */ + rd_kafka_ListOffsets(rk, empty_topic_partitions, options, q); + rd_kafka_topic_partition_list_destroy(empty_topic_partitions); + /* Wait for results */ + event = rd_kafka_queue_poll(q, -1 /*indefinitely*/); + if (!event) + TEST_FAIL("Event missing"); + + TEST_CALL_ERR__(rd_kafka_event_error(event)); + + result = rd_kafka_event_ListOffsets_result(event); + result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); + rd_kafka_event_destroy(event); + + TEST_ASSERT(!cnt, + "Expected empty result info array, got %" PRIusz + " result infos", + cnt); + for (i = 0; i < RD_ARRAY_SIZE(test_fixtures); i++) { rd_bool_t retry = rd_true; rd_kafka_topic_partition_list_t *topic_partitions_copy; @@ -5036,6 +5091,7 @@ static void do_test_ListOffsets(const char *what, topic_partitions_copy->elems[0].offset = test_fixture.query; while (retry) { + size_t j; rd_kafka_resp_err_t err; /* Call ListOffsets */ rd_kafka_ListOffsets(rk, topic_partitions_copy, options, @@ -5056,10 +5112,6 @@ static void do_test_ListOffsets(const char *what, rd_kafka_err2name(err)); } - const rd_kafka_ListOffsets_result_t *result; - const rd_kafka_ListOffsetsResultInfo_t **result_infos; - size_t cnt; - size_t j; result = rd_kafka_event_ListOffsets_result(event); result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt);