Skip to content

Commit

Permalink
Failing tests:
Browse files Browse the repository at this point in the history
- metadata doesn't persist
  after full metadata refresh
- fast metadata refresh doesn't stop
  when no leader change happened
- stale metadata doesn't migrate back the partition
  to corresponding broker while validating an offset
- a metadata call for an existing topic, just after subscription,
  must not cause a UNKNOWN_TOPIC_OR_PART error
  • Loading branch information
emasab committed Apr 3, 2024
1 parent 8c5d68b commit 14dd831
Show file tree
Hide file tree
Showing 12 changed files with 511 additions and 40 deletions.
84 changes: 82 additions & 2 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,39 @@ rd_kafka_mock_partition_assign_replicas(rd_kafka_mock_partition_t *mpart,
mpart, mpart->replicas[rd_jitter(0, replica_cnt - 1)]);
}

/**
* @brief Push a partition leader response to passed \p mpart .
*/
static void
rd_kafka_mock_partition_push_leader_response0(rd_kafka_mock_partition_t *mpart,
int32_t leader_id,
int32_t leader_epoch) {
rd_kafka_mock_partition_leader_t *leader_response;

leader_response = rd_calloc(1, sizeof(*leader_response));
leader_response->leader_id = leader_id;
leader_response->leader_epoch = leader_epoch;
TAILQ_INSERT_TAIL(&mpart->leader_responses, leader_response, link);
}

/**
* @brief Return the first mocked partition leader response in \p mpart ,
* if available.
*/
rd_kafka_mock_partition_leader_t *
rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart) {
return TAILQ_FIRST(&mpart->leader_responses);
}

/**
* @brief Unlink and destroy a partition leader response
*/
void rd_kafka_mock_partition_leader_destroy(
rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_partition_leader_t *mpart_leader) {
TAILQ_REMOVE(&mpart->leader_responses, mpart_leader, link);
rd_free(mpart_leader);
}

/**
* @brief Unlink and destroy committed offset
Expand Down Expand Up @@ -546,13 +578,18 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) {
rd_kafka_mock_msgset_t *mset, *tmp;
rd_kafka_mock_committed_offset_t *coff, *tmpcoff;
rd_kafka_mock_partition_leader_t *mpart_leader, *tmp_mpart_leader;

TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp)
rd_kafka_mock_msgset_destroy(mpart, mset);

TAILQ_FOREACH_SAFE(coff, &mpart->committed_offsets, link, tmpcoff)
rd_kafka_mock_committed_offset_destroy(mpart, coff);

TAILQ_FOREACH_SAFE(mpart_leader, &mpart->leader_responses, link,
tmp_mpart_leader)
rd_kafka_mock_partition_leader_destroy(mpart, mpart_leader);

rd_list_destroy(&mpart->pidstates);

rd_free(mpart->replicas);
Expand All @@ -579,6 +616,7 @@ static void rd_kafka_mock_partition_init(rd_kafka_mock_topic_t *mtopic,
mpart->update_follower_end_offset = rd_true;

TAILQ_INIT(&mpart->committed_offsets);
TAILQ_INIT(&mpart->leader_responses);

rd_list_init(&mpart->pidstates, 0, rd_free);

Expand Down Expand Up @@ -2096,6 +2134,23 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
int partition,
int32_t leader_id,
int32_t leader_epoch) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);
rko->rko_u.mock.name = rd_strdup(topic);
rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE;
rko->rko_u.mock.partition = partition;
rko->rko_u.mock.leader_id = leader_id;
rko->rko_u.mock.leader_epoch = leader_epoch;

return rd_kafka_op_err_destroy(
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster,
int32_t broker_id) {
Expand Down Expand Up @@ -2379,6 +2434,23 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
mpart->update_follower_end_offset = rd_false;
}
break;
case RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE:
mpart = rd_kafka_mock_partition_get(
mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition);
if (!mpart)
return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Push %s [%" PRId32 "] leader response: (%" PRId32
", %" PRId32 ")",
rko->rko_u.mock.name, rko->rko_u.mock.partition,
rko->rko_u.mock.leader_id,
rko->rko_u.mock.leader_epoch);

rd_kafka_mock_partition_push_leader_response0(
mpart, rko->rko_u.mock.leader_id,
rko->rko_u.mock.leader_epoch);
break;

/* Broker commands */
case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN:
Expand Down Expand Up @@ -2673,8 +2745,16 @@ rd_kafka_mock_request_copy(rd_kafka_mock_request_t *mrequest) {
return request;
}

void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *element) {
rd_free(element);
void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mrequest) {
rd_free(mrequest);
}

void rd_kafka_mock_request_destroy_array(rd_kafka_mock_request_t **mrequests,
size_t mrequest_cnt) {
size_t i;
for (i = 0; i < mrequest_cnt; i++)
rd_kafka_mock_request_destroy(mrequests[i]);
rd_free(mrequests);
}

static void rd_kafka_mock_request_free(void *element) {
Expand Down
26 changes: 26 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2019-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -281,6 +282,24 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster,
int64_t lo,
int64_t hi);

/**
* @brief Push \p cnt Metadata leader response
* onto the cluster's stack for the given \p topic and \p partition.
*
* @param topic Topic to change
* @param partition Partition to change in \p topic
* @param leader_id Broker id of the leader node
* @param leader_epoch Leader epoch corresponding to the given \p leader_id
*
* @return Push operation error code
*/
RD_EXPORT
rd_kafka_resp_err_t
rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
int partition,
int32_t leader_id,
int32_t leader_epoch);

/**
* @brief Disconnects the broker and disallows any new connections.
Expand Down Expand Up @@ -388,6 +407,13 @@ typedef struct rd_kafka_mock_request_s rd_kafka_mock_request_t;
*/
RD_EXPORT void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mreq);

/**
* @brief Destroy a rd_kafka_mock_request_t * array and deallocate it.
*/
RD_EXPORT void
rd_kafka_mock_request_destroy_array(rd_kafka_mock_request_t **mreqs,
size_t mreq_cnt);

/**
* @brief Get the broker id to which \p mreq was sent.
*/
Expand Down
51 changes: 39 additions & 12 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,8 @@ static int rd_kafka_mock_handle_ApiVersion(rd_kafka_mock_connection_t *mconn,
* @param mtopic may be NULL
*/
static void
rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp,
rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_buf_t *resp,
int16_t ApiVersion,
const char *topic,
const rd_kafka_mock_topic_t *mtopic,
Expand All @@ -880,20 +881,46 @@ rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp,
rd_kafka_buf_write_arraycnt(resp, partition_cnt);

for (i = 0; mtopic && i < partition_cnt; i++) {
const rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i];
rd_kafka_mock_partition_leader_t *mpart_leader;
rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i];
int r;

/* Response: ..Partitions.ErrorCode */
rd_kafka_buf_write_i16(resp, 0);
/* Response: ..Partitions.PartitionIndex */
rd_kafka_buf_write_i32(resp, mpart->id);
/* Response: ..Partitions.Leader */
rd_kafka_buf_write_i32(resp,
mpart->leader ? mpart->leader->id : -1);

if (ApiVersion >= 7) {
/* Response: ..Partitions.LeaderEpoch */
rd_kafka_buf_write_i32(resp, mpart->leader_epoch);
mpart_leader =
rd_kafka_mock_partition_next_leader_response(mpart);
if (mpart_leader) {
rd_kafka_dbg(
mcluster->rk, MOCK, "MOCK",
"MetadataRequest: using next leader response "
"(%" PRId32 ", %" PRId32 ")",
mpart_leader->leader_id,
mpart_leader->leader_epoch);

/* Response: ..Partitions.Leader */
rd_kafka_buf_write_i32(resp, mpart_leader->leader_id);

if (ApiVersion >= 7) {
/* Response: ..Partitions.LeaderEpoch */
rd_kafka_buf_write_i32(
resp, mpart_leader->leader_epoch);
}
rd_kafka_mock_partition_leader_destroy(mpart,
mpart_leader);
mpart_leader = NULL;
} else {
/* Response: ..Partitions.Leader */
rd_kafka_buf_write_i32(
resp, mpart->leader ? mpart->leader->id : -1);

if (ApiVersion >= 7) {
/* Response: ..Partitions.LeaderEpoch */
rd_kafka_buf_write_i32(resp,
mpart->leader_epoch);
}
}

/* Response: ..Partitions.#ReplicaNodes */
Expand Down Expand Up @@ -1010,8 +1037,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,

TAILQ_FOREACH(mtopic, &mcluster->topics, link) {
rd_kafka_mock_buf_write_Metadata_Topic(
resp, rkbuf->rkbuf_reqhdr.ApiVersion, mtopic->name,
mtopic, mtopic->err);
mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion,
mtopic->name, mtopic, mtopic->err);
}

} else if (requested_topics) {
Expand All @@ -1033,8 +1060,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn,
err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

rd_kafka_mock_buf_write_Metadata_Topic(
resp, rkbuf->rkbuf_reqhdr.ApiVersion, rktpar->topic,
mtopic, err ? err : mtopic->err);
mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion,
rktpar->topic, mtopic, err ? err : mtopic->err);
}

} else {
Expand Down
21 changes: 21 additions & 0 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ typedef struct rd_kafka_mock_committed_offset_s {
rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */
} rd_kafka_mock_committed_offset_t;

/**
* @struct Leader id and epoch to return in a Metadata call.
*/
typedef struct rd_kafka_mock_partition_leader_s {
/**< Link to prev/next entries */
TAILQ_ENTRY(rd_kafka_mock_partition_leader_s) link;
int32_t leader_id; /**< Leader id */
int32_t leader_epoch; /**< Leader epoch */
} rd_kafka_mock_partition_leader_t;


TAILQ_HEAD(rd_kafka_mock_msgset_tailq_s, rd_kafka_mock_msgset_s);

Expand Down Expand Up @@ -276,6 +286,10 @@ typedef struct rd_kafka_mock_partition_s {
int32_t follower_id; /**< Preferred replica/follower */

struct rd_kafka_mock_topic_s *topic;

/**< Leader responses */
TAILQ_HEAD(, rd_kafka_mock_partition_leader_s)
leader_responses;
} rd_kafka_mock_partition_t;


Expand Down Expand Up @@ -477,6 +491,13 @@ int64_t rd_kafka_mock_partition_offset_for_leader_epoch(
const rd_kafka_mock_partition_t *mpart,
int32_t leader_epoch);

rd_kafka_mock_partition_leader_t *
rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart);

void rd_kafka_mock_partition_leader_destroy(
rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_partition_leader_t *mpart_leader);


/**
* @returns true if the ApiVersion is supported, else false.
Expand Down
12 changes: 11 additions & 1 deletion src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ struct rd_kafka_op_s {
RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE,
RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,
RD_KAFKA_MOCK_CMD_BROKER_SET_RTT,
RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
Expand All @@ -579,14 +580,17 @@ struct rd_kafka_op_s {
* PART_SET_FOLLOWER
* PART_SET_FOLLOWER_WMARKS
* BROKER_SET_RACK
* COORD_SET (key_type) */
* COORD_SET (key_type)
* PART_PUSH_LEADER_RESPONSE
*/
char *str; /**< For:
* COORD_SET (key) */
int32_t partition; /**< For:
* PART_SET_FOLLOWER
* PART_SET_FOLLOWER_WMARKS
* PART_SET_LEADER
* APIVERSION_SET (ApiKey)
* PART_PUSH_LEADER_RESPONSE
*/
int32_t broker_id; /**< For:
* PART_SET_FOLLOWER
Expand All @@ -606,6 +610,12 @@ struct rd_kafka_op_s {
* PART_SET_FOLLOWER_WMARKS
* APIVERSION_SET (maxver)
*/
int32_t leader_id; /**< Leader id, for:
* PART_PUSH_LEADER_RESPONSE
*/
int32_t leader_epoch; /**< Leader epoch, for:
* PART_PUSH_LEADER_RESPONSE
*/
} mock;

struct {
Expand Down
Loading

0 comments on commit 14dd831

Please sign in to comment.