Skip to content

Commit

Permalink
[KIP-848] HB Error Code, Partial ack flow, OffsetCommit Request, Resp…
Browse files Browse the repository at this point in the history
…onse and various fixes (#4634)

- Added error handling to ConsumerGroupHeartbeat API
- Added type new errors - UNRELEASED_INSTANCE_ID and UNSUPPORTED_ASSIGNOR
- Added partial acknowledgement flow
- Upgraded OffsetCommit Request and response to v9
- Fixed metadata being called with duplicate topic id
- Fixed next_target_assignment not getting reset to NULL
- Fixed member stuck if fenced during rebalancing
- Fixed segfault with current and target assignment while resetting consumer group
- Fixed segfault due to deleted topic in metadata
- Fixed leave not being called if the consumer without any assignment leaves
  • Loading branch information
pranavrth authored and anchitj committed Jun 10, 2024
1 parent ceb1b42 commit e84b47a
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 222 deletions.
80 changes: 40 additions & 40 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1967,50 +1967,50 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf

### Supported protocol versions

"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.5.0, while
"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.7.0, while
"librdkafka max" is the maximum ApiVersion supported in the latest
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------ |-----------|----------------|
| 0 | Produce | 10 | 8 |
| 1 | Fetch | 16 | 15 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |

| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ----------------------------- | ---------- |----------------|
| 0 | Produce | 10 | 8 |
| 1 | Fetch | 16 | 15 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 9 | 9 |
| 9 | OffsetFetch | 9 | 9 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |
| 68 | ConsumerGroupHeartbeat | 0 | 0 |

# Recommendations for language binding developers

Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ int main(int argc, char **argv) {
rd_kafka_destroy(rk);

return 0;
}
}
6 changes: 6 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"),
_ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
"Broker: The member epoch is fenced by the group coordinator"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID,
"Broker: The instance ID is still used by another member in the "
"consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR,
"Broker: The assignor or its version range is not supported by "
"the consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
"Broker: The member epoch is stale"),
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ typedef enum {
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100,
/** The member epoch is fenced by the group coordinator */
RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110,
/** The instance ID is still used by another member in the
* consumer group */
RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111,
/** The assignor or its version range is not supported by the consumer
* group */
RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112,
/** The member epoch is stale */
RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113,
RD_KAFKA_RESP_ERR_END_ALL,
Expand Down
Loading

0 comments on commit e84b47a

Please sign in to comment.