From 4edb445a2ff23dcb694de4d76ffec95bad3c2e1a Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 21 Aug 2023 16:05:57 -0400 Subject: [PATCH 1/9] Bump submodule to librdkafka 2.2.0 --- rdkafka-sys/librdkafka | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rdkafka-sys/librdkafka b/rdkafka-sys/librdkafka index 9b72ca3aa..e75de5be1 160000 --- a/rdkafka-sys/librdkafka +++ b/rdkafka-sys/librdkafka @@ -1 +1 @@ -Subproject commit 9b72ca3aa6c49f8f57eea02f70aadb1453d3ba1f +Subproject commit e75de5be191b6b8e9602efc969f4af64071550de From e8d4ef32fd0f16cdfddad4d66e3b9b00e535ba27 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 21 Aug 2023 16:08:48 -0400 Subject: [PATCH 2/9] Update bindgen output for v2.2.0. --- rdkafka-sys/src/bindings.rs | 571 +++++++++++++++++++++++++++++++++++- 1 file changed, 567 insertions(+), 4 deletions(-) diff --git a/rdkafka-sys/src/bindings.rs b/rdkafka-sys/src/bindings.rs index 2388d36d4..a303c4d5a 100644 --- a/rdkafka-sys/src/bindings.rs +++ b/rdkafka-sys/src/bindings.rs @@ -1,10 +1,10 @@ -/* automatically generated by rust-bindgen 0.65.1 */ +/* automatically generated by rust-bindgen 0.66.1 */ use libc::{c_char, c_int, c_void, sockaddr, FILE}; use num_enum::TryFromPrimitive; -pub const RD_KAFKA_VERSION: i32 = 17367807; -pub const RD_KAFKA_DEBUG_CONTEXTS : & [u8 ; 138usize] = b"all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf\0" ; +pub const RD_KAFKA_VERSION: i32 = 33685759; +pub const RD_KAFKA_DEBUG_CONTEXTS : & [u8 ; 138] = b"all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf\0" ; pub const RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE: i32 = 8; pub const RD_KAFKA_OFFSET_BEGINNING: i32 = -2; pub const RD_KAFKA_OFFSET_END: i32 = -1; @@ -39,6 +39,13 @@ pub const RD_KAFKA_EVENT_BACKGROUND: i32 = 512; pub const RD_KAFKA_EVENT_CREATEACLS_RESULT: i32 = 1024; pub const RD_KAFKA_EVENT_DESCRIBEACLS_RESULT: i32 = 2048; pub const RD_KAFKA_EVENT_DELETEACLS_RESULT: i32 = 4096; +pub const RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT: i32 = 8192; +pub const RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT: i32 = 16384; +pub const RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT: i32 = 32768; +pub const RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT: i32 = 65536; +pub const RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT: i32 = 131072; +pub const RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT: i32 = 262144; +pub const RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: i32 = 524288; extern "C" { pub fn rd_kafka_version() -> c_int; } @@ -197,6 +204,7 @@ pub enum rd_kafka_resp_err_t { RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST = -142, RD_KAFKA_RESP_ERR__NOOP = -141, RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET = -140, + RD_KAFKA_RESP_ERR__LOG_TRUNCATION = -139, RD_KAFKA_RESP_ERR__END = -100, RD_KAFKA_RESP_ERR_UNKNOWN = -1, RD_KAFKA_RESP_ERR_NO_ERROR = 0, @@ -382,6 +390,17 @@ pub type rd_kafka_topic_partition_t = rd_kafka_topic_partition_s; extern "C" { pub fn rd_kafka_topic_partition_destroy(rktpar: *mut rd_kafka_topic_partition_t); } +extern "C" { + pub fn rd_kafka_topic_partition_set_leader_epoch( + rktpar: *mut rd_kafka_topic_partition_t, + leader_epoch: i32, + ); +} +extern "C" { + pub fn rd_kafka_topic_partition_get_leader_epoch( + rktpar: *const rd_kafka_topic_partition_t, + ) -> i32; +} #[repr(C)] #[derive(Debug, Copy, Clone)] pub struct rd_kafka_topic_partition_list_s { @@ -619,6 +638,9 @@ pub enum rd_kafka_msg_status_t { extern "C" { pub fn rd_kafka_message_status(rkmessage: *const rd_kafka_message_t) -> rd_kafka_msg_status_t; } +extern "C" { + pub fn rd_kafka_message_leader_epoch(rkmessage: *const rd_kafka_message_t) -> i32; +} #[repr(i32)] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub enum rd_kafka_conf_res_t { @@ -831,6 +853,20 @@ extern "C" { closesocket_cb: Option c_int>, ); } +extern "C" { + pub fn rd_kafka_conf_set_resolve_cb( + conf: *mut rd_kafka_conf_t, + resolve_cb: Option< + unsafe extern "C" fn( + node: *const c_char, + service: *const c_char, + hints: *const addrinfo, + res: *mut *mut addrinfo, + opaque: *mut c_void, + ) -> c_int, + >, + ); +} extern "C" { pub fn rd_kafka_conf_set_ssl_cert_verify_cb( conf: *mut rd_kafka_conf_t, @@ -1175,6 +1211,13 @@ extern "C" { extern "C" { pub fn rd_kafka_sasl_background_callbacks_enable(rk: *mut rd_kafka_t) -> *mut rd_kafka_error_t; } +extern "C" { + pub fn rd_kafka_sasl_set_credentials( + rk: *mut rd_kafka_t, + username: *const c_char, + password: *const c_char, + ) -> *mut rd_kafka_error_t; +} extern "C" { pub fn rd_kafka_queue_get_consumer(rk: *mut rd_kafka_t) -> *mut rd_kafka_queue_t; } @@ -1312,6 +1355,11 @@ extern "C" { offsets: *mut rd_kafka_topic_partition_list_t, ) -> rd_kafka_resp_err_t; } +extern "C" { + pub fn rd_kafka_offset_store_message( + rkmessage: *mut rd_kafka_message_t, + ) -> *mut rd_kafka_error_t; +} extern "C" { pub fn rd_kafka_subscribe( rk: *mut rd_kafka_t, @@ -1544,6 +1592,21 @@ extern "C" { } #[repr(C)] #[derive(Debug, Copy, Clone)] +pub struct rd_kafka_Node_s { + _unused: [u8; 0], +} +pub type rd_kafka_Node_t = rd_kafka_Node_s; +extern "C" { + pub fn rd_kafka_Node_id(node: *const rd_kafka_Node_t) -> c_int; +} +extern "C" { + pub fn rd_kafka_Node_host(node: *const rd_kafka_Node_t) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_Node_port(node: *const rd_kafka_Node_t) -> u16; +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] pub struct rd_kafka_group_member_info { pub member_id: *mut c_char, pub client_id: *mut c_char, @@ -1553,6 +1616,17 @@ pub struct rd_kafka_group_member_info { pub member_assignment: *mut c_void, pub member_assignment_size: c_int, } +#[repr(u32)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +pub enum rd_kafka_consumer_group_state_t { + RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN = 0, + RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE = 1, + RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE = 2, + RD_KAFKA_CONSUMER_GROUP_STATE_STABLE = 3, + RD_KAFKA_CONSUMER_GROUP_STATE_DEAD = 4, + RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY = 5, + RD_KAFKA_CONSUMER_GROUP_STATE__CNT = 6, +} #[repr(C)] #[derive(Debug, Copy, Clone)] pub struct rd_kafka_group_info { @@ -1579,6 +1653,16 @@ extern "C" { timeout_ms: c_int, ) -> rd_kafka_resp_err_t; } +extern "C" { + pub fn rd_kafka_consumer_group_state_name( + state: rd_kafka_consumer_group_state_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_consumer_group_state_code( + name: *const c_char, + ) -> rd_kafka_consumer_group_state_t; +} extern "C" { pub fn rd_kafka_group_list_destroy(grplist: *const rd_kafka_group_list); } @@ -1715,10 +1799,17 @@ pub type rd_kafka_DescribeAcls_result_t = rd_kafka_event_t; pub type rd_kafka_DeleteAcls_result_t = rd_kafka_event_t; pub type rd_kafka_CreatePartitions_result_t = rd_kafka_event_t; pub type rd_kafka_AlterConfigs_result_t = rd_kafka_event_t; +pub type rd_kafka_IncrementalAlterConfigs_result_t = rd_kafka_event_t; pub type rd_kafka_DescribeConfigs_result_t = rd_kafka_event_t; pub type rd_kafka_DeleteRecords_result_t = rd_kafka_event_t; +pub type rd_kafka_ListConsumerGroups_result_t = rd_kafka_event_t; +pub type rd_kafka_DescribeConsumerGroups_result_t = rd_kafka_event_t; pub type rd_kafka_DeleteGroups_result_t = rd_kafka_event_t; pub type rd_kafka_DeleteConsumerGroupOffsets_result_t = rd_kafka_event_t; +pub type rd_kafka_AlterConsumerGroupOffsets_result_t = rd_kafka_event_t; +pub type rd_kafka_ListConsumerGroupOffsets_result_t = rd_kafka_event_t; +pub type rd_kafka_DescribeUserScramCredentials_result_t = rd_kafka_event_t; +pub type rd_kafka_AlterUserScramCredentials_result_t = rd_kafka_event_t; extern "C" { pub fn rd_kafka_event_CreateTopics_result( rkev: *mut rd_kafka_event_t, @@ -1739,6 +1830,11 @@ extern "C" { rkev: *mut rd_kafka_event_t, ) -> *const rd_kafka_AlterConfigs_result_t; } +extern "C" { + pub fn rd_kafka_event_IncrementalAlterConfigs_result( + rkev: *mut rd_kafka_event_t, + ) -> *const rd_kafka_IncrementalAlterConfigs_result_t; +} extern "C" { pub fn rd_kafka_event_DescribeConfigs_result( rkev: *mut rd_kafka_event_t, @@ -1749,6 +1845,16 @@ extern "C" { rkev: *mut rd_kafka_event_t, ) -> *const rd_kafka_DeleteRecords_result_t; } +extern "C" { + pub fn rd_kafka_event_ListConsumerGroups_result( + rkev: *mut rd_kafka_event_t, + ) -> *const rd_kafka_ListConsumerGroups_result_t; +} +extern "C" { + pub fn rd_kafka_event_DescribeConsumerGroups_result( + rkev: *mut rd_kafka_event_t, + ) -> *const rd_kafka_DescribeConsumerGroups_result_t; +} extern "C" { pub fn rd_kafka_event_DeleteGroups_result( rkev: *mut rd_kafka_event_t, @@ -1774,6 +1880,26 @@ extern "C" { rkev: *mut rd_kafka_event_t, ) -> *const rd_kafka_DeleteAcls_result_t; } +extern "C" { + pub fn rd_kafka_event_ListConsumerGroupOffsets_result( + rkev: *mut rd_kafka_event_t, + ) -> *const rd_kafka_ListConsumerGroupOffsets_result_t; +} +extern "C" { + pub fn rd_kafka_event_AlterConsumerGroupOffsets_result( + rkev: *mut rd_kafka_event_t, + ) -> *const rd_kafka_AlterConsumerGroupOffsets_result_t; +} +extern "C" { + pub fn rd_kafka_event_DescribeUserScramCredentials_result( + rkev: *mut rd_kafka_event_t, + ) -> *const rd_kafka_DescribeUserScramCredentials_result_t; +} +extern "C" { + pub fn rd_kafka_event_AlterUserScramCredentials_result( + rkev: *mut rd_kafka_event_t, + ) -> *const rd_kafka_AlterUserScramCredentials_result_t; +} extern "C" { pub fn rd_kafka_queue_poll( rkqu: *mut rd_kafka_queue_t, @@ -1897,6 +2023,17 @@ pub type rd_kafka_interceptor_f_on_thread_exit_t = Option< ic_opaque: *mut c_void, ) -> rd_kafka_resp_err_t, >; +pub type rd_kafka_interceptor_f_on_broker_state_change_t = Option< + unsafe extern "C" fn( + rk: *mut rd_kafka_t, + broker_id: i32, + secproto: *const c_char, + name: *const c_char, + port: c_int, + state: *const c_char, + ic_opaque: *mut c_void, + ) -> rd_kafka_resp_err_t, +>; extern "C" { pub fn rd_kafka_conf_interceptor_add_on_conf_set( conf: *mut rd_kafka_conf_t, @@ -2001,6 +2138,14 @@ extern "C" { ic_opaque: *mut c_void, ) -> rd_kafka_resp_err_t; } +extern "C" { + pub fn rd_kafka_interceptor_add_on_broker_state_change( + rk: *mut rd_kafka_t, + ic_name: *const c_char, + on_broker_state_change: rd_kafka_interceptor_f_on_broker_state_change_t, + ic_opaque: *mut c_void, + ) -> rd_kafka_resp_err_t; +} extern "C" { pub fn rd_kafka_topic_result_error( topicres: *const rd_kafka_topic_result_t, @@ -2042,7 +2187,14 @@ pub enum rd_kafka_admin_op_t { RD_KAFKA_ADMIN_OP_CREATEACLS = 9, RD_KAFKA_ADMIN_OP_DESCRIBEACLS = 10, RD_KAFKA_ADMIN_OP_DELETEACLS = 11, - RD_KAFKA_ADMIN_OP__CNT = 12, + RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS = 12, + RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS = 13, + RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS = 14, + RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS = 15, + RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS = 16, + RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS = 17, + RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS = 18, + RD_KAFKA_ADMIN_OP__CNT = 19, } #[repr(C)] #[derive(Debug, Copy, Clone)] @@ -2091,6 +2243,19 @@ extern "C" { errstr_size: usize, ) -> rd_kafka_resp_err_t; } +extern "C" { + pub fn rd_kafka_AdminOptions_set_require_stable_offsets( + options: *mut rd_kafka_AdminOptions_t, + true_or_false: c_int, + ) -> *mut rd_kafka_error_t; +} +extern "C" { + pub fn rd_kafka_AdminOptions_set_match_consumer_group_states( + options: *mut rd_kafka_AdminOptions_t, + consumer_group_states: *const rd_kafka_consumer_group_state_t, + consumer_group_states_cnt: usize, + ) -> *mut rd_kafka_error_t; +} extern "C" { pub fn rd_kafka_AdminOptions_set_opaque( options: *mut rd_kafka_AdminOptions_t, @@ -2303,6 +2468,15 @@ pub enum rd_kafka_ResourcePatternType_t { RD_KAFKA_RESOURCE_PATTERN_PREFIXED = 4, RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT = 5, } +#[repr(u32)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +pub enum rd_kafka_AlterConfigOpType_t { + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET = 0, + RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE = 1, + RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND = 2, + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT = 3, + RD_KAFKA_ALTER_CONFIG_OP_TYPE__CNT = 4, +} extern "C" { pub fn rd_kafka_ResourcePatternType_name( resource_pattern_type: rd_kafka_ResourcePatternType_t, @@ -2339,6 +2513,14 @@ extern "C" { value: *const c_char, ) -> rd_kafka_resp_err_t; } +extern "C" { + pub fn rd_kafka_ConfigResource_add_incremental_config( + config: *mut rd_kafka_ConfigResource_t, + name: *const c_char, + op_type: rd_kafka_AlterConfigOpType_t, + value: *const c_char, + ) -> *mut rd_kafka_error_t; +} extern "C" { pub fn rd_kafka_ConfigResource_configs( config: *const rd_kafka_ConfigResource_t, @@ -2378,6 +2560,21 @@ extern "C" { cntp: *mut usize, ) -> *mut *const rd_kafka_ConfigResource_t; } +extern "C" { + pub fn rd_kafka_IncrementalAlterConfigs( + rk: *mut rd_kafka_t, + configs: *mut *mut rd_kafka_ConfigResource_t, + config_cnt: usize, + options: *const rd_kafka_AdminOptions_t, + rkqu: *mut rd_kafka_queue_t, + ); +} +extern "C" { + pub fn rd_kafka_IncrementalAlterConfigs_result_resources( + result: *const rd_kafka_IncrementalAlterConfigs_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_ConfigResource_t; +} extern "C" { pub fn rd_kafka_DescribeConfigs( rk: *mut rd_kafka_t, @@ -2429,6 +2626,156 @@ extern "C" { } #[repr(C)] #[derive(Debug, Copy, Clone)] +pub struct rd_kafka_ConsumerGroupListing_s { + _unused: [u8; 0], +} +pub type rd_kafka_ConsumerGroupListing_t = rd_kafka_ConsumerGroupListing_s; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_ListConsumerGroupsResult_s { + _unused: [u8; 0], +} +pub type rd_kafka_ListConsumerGroupsResult_t = rd_kafka_ListConsumerGroupsResult_s; +extern "C" { + pub fn rd_kafka_ListConsumerGroups( + rk: *mut rd_kafka_t, + options: *const rd_kafka_AdminOptions_t, + rkqu: *mut rd_kafka_queue_t, + ); +} +extern "C" { + pub fn rd_kafka_ConsumerGroupListing_group_id( + grplist: *const rd_kafka_ConsumerGroupListing_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupListing_is_simple_consumer_group( + grplist: *const rd_kafka_ConsumerGroupListing_t, + ) -> c_int; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupListing_state( + grplist: *const rd_kafka_ConsumerGroupListing_t, + ) -> rd_kafka_consumer_group_state_t; +} +extern "C" { + pub fn rd_kafka_ListConsumerGroups_result_valid( + result: *const rd_kafka_ListConsumerGroups_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_ConsumerGroupListing_t; +} +extern "C" { + pub fn rd_kafka_ListConsumerGroups_result_errors( + result: *const rd_kafka_ListConsumerGroups_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_error_t; +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_ConsumerGroupDescription_s { + _unused: [u8; 0], +} +pub type rd_kafka_ConsumerGroupDescription_t = rd_kafka_ConsumerGroupDescription_s; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_MemberDescription_s { + _unused: [u8; 0], +} +pub type rd_kafka_MemberDescription_t = rd_kafka_MemberDescription_s; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_MemberAssignment_s { + _unused: [u8; 0], +} +pub type rd_kafka_MemberAssignment_t = rd_kafka_MemberAssignment_s; +extern "C" { + pub fn rd_kafka_DescribeConsumerGroups( + rk: *mut rd_kafka_t, + groups: *mut *const c_char, + groups_cnt: usize, + options: *const rd_kafka_AdminOptions_t, + rkqu: *mut rd_kafka_queue_t, + ); +} +extern "C" { + pub fn rd_kafka_DescribeConsumerGroups_result_groups( + result: *const rd_kafka_DescribeConsumerGroups_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_ConsumerGroupDescription_t; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_group_id( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_error( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + ) -> *const rd_kafka_error_t; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_is_simple_consumer_group( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + ) -> c_int; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_partition_assignor( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_state( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + ) -> rd_kafka_consumer_group_state_t; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_coordinator( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + ) -> *const rd_kafka_Node_t; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_member_count( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + ) -> usize; +} +extern "C" { + pub fn rd_kafka_ConsumerGroupDescription_member( + grpdesc: *const rd_kafka_ConsumerGroupDescription_t, + idx: usize, + ) -> *const rd_kafka_MemberDescription_t; +} +extern "C" { + pub fn rd_kafka_MemberDescription_client_id( + member: *const rd_kafka_MemberDescription_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_MemberDescription_group_instance_id( + member: *const rd_kafka_MemberDescription_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_MemberDescription_consumer_id( + member: *const rd_kafka_MemberDescription_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_MemberDescription_host( + member: *const rd_kafka_MemberDescription_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_MemberDescription_assignment( + member: *const rd_kafka_MemberDescription_t, + ) -> *const rd_kafka_MemberAssignment_t; +} +extern "C" { + pub fn rd_kafka_MemberAssignment_partitions( + assignment: *const rd_kafka_MemberAssignment_t, + ) -> *const rd_kafka_topic_partition_list_t; +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] pub struct rd_kafka_DeleteGroup_s { _unused: [u8; 0], } @@ -2462,6 +2809,82 @@ extern "C" { } #[repr(C)] #[derive(Debug, Copy, Clone)] +pub struct rd_kafka_ListConsumerGroupOffsets_s { + _unused: [u8; 0], +} +pub type rd_kafka_ListConsumerGroupOffsets_t = rd_kafka_ListConsumerGroupOffsets_s; +extern "C" { + pub fn rd_kafka_ListConsumerGroupOffsets_new( + group_id: *const c_char, + partitions: *const rd_kafka_topic_partition_list_t, + ) -> *mut rd_kafka_ListConsumerGroupOffsets_t; +} +extern "C" { + pub fn rd_kafka_ListConsumerGroupOffsets_destroy( + list_grpoffsets: *mut rd_kafka_ListConsumerGroupOffsets_t, + ); +} +extern "C" { + pub fn rd_kafka_ListConsumerGroupOffsets_destroy_array( + list_grpoffsets: *mut *mut rd_kafka_ListConsumerGroupOffsets_t, + list_grpoffset_cnt: usize, + ); +} +extern "C" { + pub fn rd_kafka_ListConsumerGroupOffsets( + rk: *mut rd_kafka_t, + list_grpoffsets: *mut *mut rd_kafka_ListConsumerGroupOffsets_t, + list_grpoffsets_cnt: usize, + options: *const rd_kafka_AdminOptions_t, + rkqu: *mut rd_kafka_queue_t, + ); +} +extern "C" { + pub fn rd_kafka_ListConsumerGroupOffsets_result_groups( + result: *const rd_kafka_ListConsumerGroupOffsets_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_group_result_t; +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_AlterConsumerGroupOffsets_s { + _unused: [u8; 0], +} +pub type rd_kafka_AlterConsumerGroupOffsets_t = rd_kafka_AlterConsumerGroupOffsets_s; +extern "C" { + pub fn rd_kafka_AlterConsumerGroupOffsets_new( + group_id: *const c_char, + partitions: *const rd_kafka_topic_partition_list_t, + ) -> *mut rd_kafka_AlterConsumerGroupOffsets_t; +} +extern "C" { + pub fn rd_kafka_AlterConsumerGroupOffsets_destroy( + alter_grpoffsets: *mut rd_kafka_AlterConsumerGroupOffsets_t, + ); +} +extern "C" { + pub fn rd_kafka_AlterConsumerGroupOffsets_destroy_array( + alter_grpoffsets: *mut *mut rd_kafka_AlterConsumerGroupOffsets_t, + alter_grpoffset_cnt: usize, + ); +} +extern "C" { + pub fn rd_kafka_AlterConsumerGroupOffsets( + rk: *mut rd_kafka_t, + alter_grpoffsets: *mut *mut rd_kafka_AlterConsumerGroupOffsets_t, + alter_grpoffsets_cnt: usize, + options: *const rd_kafka_AdminOptions_t, + rkqu: *mut rd_kafka_queue_t, + ); +} +extern "C" { + pub fn rd_kafka_AlterConsumerGroupOffsets_result_groups( + result: *const rd_kafka_AlterConsumerGroupOffsets_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_group_result_t; +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] pub struct rd_kafka_DeleteConsumerGroupOffsets_s { _unused: [u8; 0], } @@ -2498,6 +2921,138 @@ extern "C" { cntp: *mut usize, ) -> *mut *const rd_kafka_group_result_t; } +#[repr(u32)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +pub enum rd_kafka_ScramMechanism_t { + RD_KAFKA_SCRAM_MECHANISM_UNKNOWN = 0, + RD_KAFKA_SCRAM_MECHANISM_SHA_256 = 1, + RD_KAFKA_SCRAM_MECHANISM_SHA_512 = 2, + RD_KAFKA_SCRAM_MECHANISM__CNT = 3, +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_ScramCredentialInfo_s { + _unused: [u8; 0], +} +pub type rd_kafka_ScramCredentialInfo_t = rd_kafka_ScramCredentialInfo_s; +extern "C" { + pub fn rd_kafka_ScramCredentialInfo_mechanism( + scram_credential_info: *const rd_kafka_ScramCredentialInfo_t, + ) -> rd_kafka_ScramMechanism_t; +} +extern "C" { + pub fn rd_kafka_ScramCredentialInfo_iterations( + scram_credential_info: *const rd_kafka_ScramCredentialInfo_t, + ) -> i32; +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_UserScramCredentialsDescription_s { + _unused: [u8; 0], +} +pub type rd_kafka_UserScramCredentialsDescription_t = rd_kafka_UserScramCredentialsDescription_s; +extern "C" { + pub fn rd_kafka_UserScramCredentialsDescription_user( + description: *const rd_kafka_UserScramCredentialsDescription_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_UserScramCredentialsDescription_error( + description: *const rd_kafka_UserScramCredentialsDescription_t, + ) -> *const rd_kafka_error_t; +} +extern "C" { + pub fn rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count( + description: *const rd_kafka_UserScramCredentialsDescription_t, + ) -> usize; +} +extern "C" { + pub fn rd_kafka_UserScramCredentialsDescription_scramcredentialinfo( + description: *const rd_kafka_UserScramCredentialsDescription_t, + idx: usize, + ) -> *const rd_kafka_ScramCredentialInfo_t; +} +extern "C" { + pub fn rd_kafka_DescribeUserScramCredentials_result_descriptions( + result: *const rd_kafka_DescribeUserScramCredentials_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_UserScramCredentialsDescription_t; +} +extern "C" { + pub fn rd_kafka_DescribeUserScramCredentials( + rk: *mut rd_kafka_t, + users: *mut *const c_char, + user_cnt: usize, + options: *const rd_kafka_AdminOptions_t, + rkqu: *mut rd_kafka_queue_t, + ); +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_UserScramCredentialAlteration_s { + _unused: [u8; 0], +} +pub type rd_kafka_UserScramCredentialAlteration_t = rd_kafka_UserScramCredentialAlteration_s; +extern "C" { + pub fn rd_kafka_UserScramCredentialUpsertion_new( + username: *const c_char, + mechanism: rd_kafka_ScramMechanism_t, + iterations: i32, + password: *const ::std::os::raw::c_uchar, + password_size: usize, + salt: *const ::std::os::raw::c_uchar, + salt_size: usize, + ) -> *mut rd_kafka_UserScramCredentialAlteration_t; +} +extern "C" { + pub fn rd_kafka_UserScramCredentialDeletion_new( + username: *const c_char, + mechanism: rd_kafka_ScramMechanism_t, + ) -> *mut rd_kafka_UserScramCredentialAlteration_t; +} +extern "C" { + pub fn rd_kafka_UserScramCredentialAlteration_destroy( + alteration: *mut rd_kafka_UserScramCredentialAlteration_t, + ); +} +extern "C" { + pub fn rd_kafka_UserScramCredentialAlteration_destroy_array( + alterations: *mut *mut rd_kafka_UserScramCredentialAlteration_t, + alteration_cnt: usize, + ); +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct rd_kafka_AlterUserScramCredentials_result_response_s { + _unused: [u8; 0], +} +pub type rd_kafka_AlterUserScramCredentials_result_response_t = + rd_kafka_AlterUserScramCredentials_result_response_s; +extern "C" { + pub fn rd_kafka_AlterUserScramCredentials_result_response_user( + response: *const rd_kafka_AlterUserScramCredentials_result_response_t, + ) -> *const c_char; +} +extern "C" { + pub fn rd_kafka_AlterUserScramCredentials_result_response_error( + response: *const rd_kafka_AlterUserScramCredentials_result_response_t, + ) -> *const rd_kafka_error_t; +} +extern "C" { + pub fn rd_kafka_AlterUserScramCredentials_result_responses( + result: *const rd_kafka_AlterUserScramCredentials_result_t, + cntp: *mut usize, + ) -> *mut *const rd_kafka_AlterUserScramCredentials_result_response_t; +} +extern "C" { + pub fn rd_kafka_AlterUserScramCredentials( + rk: *mut rd_kafka_t, + alterations: *mut *mut rd_kafka_UserScramCredentialAlteration_t, + alteration_cnt: usize, + options: *const rd_kafka_AdminOptions_t, + rkqu: *mut rd_kafka_queue_t, + ); +} #[repr(C)] #[derive(Debug, Copy, Clone)] pub struct rd_kafka_AclBinding_s { @@ -2776,6 +3331,14 @@ extern "C" { ... ) -> rd_kafka_resp_err_t; } +extern "C" { + pub fn rd_kafka_mock_broker_error_stack_cnt( + mcluster: *mut rd_kafka_mock_cluster_t, + broker_id: i32, + ApiKey: i16, + cntp: *mut usize, + ) -> rd_kafka_resp_err_t; +} extern "C" { pub fn rd_kafka_mock_topic_set_error( mcluster: *mut rd_kafka_mock_cluster_t, From 78b80b0910251ce5fac0f14c19d29c3c84fa9234 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 21 Aug 2023 16:31:59 -0400 Subject: [PATCH 3/9] Fix missing addrinfo import for bindings. --- rdkafka-sys/src/bindings.rs | 2 +- rdkafka-sys/update-bindings.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rdkafka-sys/src/bindings.rs b/rdkafka-sys/src/bindings.rs index a303c4d5a..fe5319376 100644 --- a/rdkafka-sys/src/bindings.rs +++ b/rdkafka-sys/src/bindings.rs @@ -1,6 +1,6 @@ /* automatically generated by rust-bindgen 0.66.1 */ -use libc::{c_char, c_int, c_void, sockaddr, FILE}; +use libc::{addrinfo, c_char, c_int, c_void, sockaddr, FILE}; use num_enum::TryFromPrimitive; pub const RD_KAFKA_VERSION: i32 = 33685759; diff --git a/rdkafka-sys/update-bindings.sh b/rdkafka-sys/update-bindings.sh index e327fad3e..a56f6771f 100755 --- a/rdkafka-sys/update-bindings.sh +++ b/rdkafka-sys/update-bindings.sh @@ -14,7 +14,7 @@ bindgen \ --allowlist-var "rd_kafka.*|RD_KAFKA_.*" \ --no-recursive-allowlist \ --blocklist-function "rd_kafka_conf_set_open_cb" \ - --raw-line "use libc::{FILE, sockaddr, c_int, c_void, c_char};" \ + --raw-line "use libc::{FILE, addrinfo, sockaddr, c_int, c_void, c_char};" \ --raw-line "use num_enum::TryFromPrimitive;" \ --default-macro-constant-type "signed" \ "bindings.h" -o "src/bindings.rs" From 63c8989c79ed7d001fce72ad65fcd3a66f8f2f7a Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 21 Aug 2023 16:32:32 -0400 Subject: [PATCH 4/9] Add new RDKafkaErrorCode::LogTruncation and link to RD_KAFKA_RESP_ERR__LOG_TRUNCATION. --- rdkafka-sys/src/helpers.rs | 1 + rdkafka-sys/src/types.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/rdkafka-sys/src/helpers.rs b/rdkafka-sys/src/helpers.rs index d8f739291..8e5cc60b6 100644 --- a/rdkafka-sys/src/helpers.rs +++ b/rdkafka-sys/src/helpers.rs @@ -173,5 +173,6 @@ pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED => FeatureUpdateFailed, RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE => PrincipalDeserializationFailure, RD_KAFKA_RESP_ERR_END_ALL => EndAll, + RD_KAFKA_RESP_ERR__LOG_TRUNCATION => LogTruncation, } } diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 6d2fc9abc..97b77b312 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -244,6 +244,8 @@ pub enum RDKafkaErrorCode { Noop = -141, /// No offset to automatically reset to. AutoOffsetReset = -140, + /// Partition log truncation detected + LogTruncation = -139, #[doc(hidden)] End = -100, /// Unknown broker error. From 9652479b307d8f3fc6fb4d03624ddbb42c2f0864 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 21 Aug 2023 16:33:17 -0400 Subject: [PATCH 5/9] Bump local pin and update changelog for rdkafka-sys. --- rdkafka-sys/Cargo.toml | 2 +- rdkafka-sys/changelog.md | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml index fc1c86474..892ee675b 100644 --- a/rdkafka-sys/Cargo.toml +++ b/rdkafka-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rdkafka-sys" -version = "4.5.0+1.9.2" +version = "4.6.0+2.2.0" authors = ["Federico Giraud "] build = "build.rs" links = "rdkafka" diff --git a/rdkafka-sys/changelog.md b/rdkafka-sys/changelog.md index db0fa9d30..31ec4b1cc 100644 --- a/rdkafka-sys/changelog.md +++ b/rdkafka-sys/changelog.md @@ -2,6 +2,8 @@ ## Unreleased +* Upgrade to librdkafka v2.2.0. + ## v4.5.0+1.9.2 (2023-06-09) * Add support for the cluster mock API. From 40ebb66c41ced8e5f5771e5988e0e2fe53489cde Mon Sep 17 00:00:00 2001 From: David Blewett Date: Mon, 21 Aug 2023 16:50:07 -0400 Subject: [PATCH 6/9] Release v0.34.0, with librdkafka v2.2.0 --- Cargo.lock | 4 ++-- Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c7553538..b18df341d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,7 +1070,7 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.33.2" +version = "0.34.0" dependencies = [ "async-std", "backoff", @@ -1099,7 +1099,7 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.5.0+1.9.2" +version = "4.6.0+2.2.0" dependencies = [ "cmake", "curl-sys", diff --git a/Cargo.toml b/Cargo.toml index f8c848a4d..7544d41fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rdkafka" -version = "0.33.2" +version = "0.34.0" authors = ["Federico Giraud "] repository = "https://github.com/fede1024/rust-rdkafka" readme = "README.md" @@ -15,7 +15,7 @@ exclude = ["Cargo.lock"] members = ["rdkafka-sys"] [dependencies] -rdkafka-sys = { path = "rdkafka-sys", version = "4.5.0", default-features = false } +rdkafka-sys = { path = "rdkafka-sys", version = "4.6.0", default-features = false } futures-channel = "0.3.0" futures-executor = { version = "0.3.0", optional = true } futures-util = { version = "0.3.0", default-features = false } From 2a790e0db4a841d37b9da67fc6ab1911a6baa6ce Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 25 Aug 2023 09:03:06 -0400 Subject: [PATCH 7/9] Don't depend on `curl`, which will prevent it from being compiled statically. --- rdkafka-sys/Cargo.toml | 2 +- rdkafka-sys/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml index 892ee675b..4939f2373 100644 --- a/rdkafka-sys/Cargo.toml +++ b/rdkafka-sys/Cargo.toml @@ -75,7 +75,7 @@ curl = ["curl-sys"] # Link against the version of curl bundled with the curl-sys crate, rather than # the system's version. -curl-static = ["curl", "curl-sys/static-curl"] +curl-static = ["curl-sys/static-curl"] # Enable support for zstd compression. zstd = ["zstd-sys"] diff --git a/rdkafka-sys/src/lib.rs b/rdkafka-sys/src/lib.rs index 6dadc3fe6..8679ad407 100644 --- a/rdkafka-sys/src/lib.rs +++ b/rdkafka-sys/src/lib.rs @@ -100,7 +100,7 @@ extern crate sasl2_sys; #[cfg(feature = "libz-sys")] extern crate libz_sys; -#[cfg(feature = "curl-sys")] +#[cfg(any(feature = "curl-sys", feature = "curl-sys/static-curl"))] extern crate curl_sys; #[cfg(feature = "zstd-sys")] From d16601442479115664a2eaa37a592d50bcc7619d Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 25 Aug 2023 09:05:03 -0400 Subject: [PATCH 8/9] Copy the curl static library into `/lib`, so that cmake finds it. --- rdkafka-sys/build.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rdkafka-sys/build.rs b/rdkafka-sys/build.rs index 6ad8e9304..73933279d 100644 --- a/rdkafka-sys/build.rs +++ b/rdkafka-sys/build.rs @@ -1,6 +1,8 @@ use std::borrow::Borrow; use std::env; use std::ffi::OsStr; +#[cfg(feature = "cmake-build")] +use std::fs; use std::path::{Path, PathBuf}; use std::process::{self, Command}; @@ -234,6 +236,14 @@ fn build_librdkafka() { config.cxxflag("-DCURL_STATICLIB"); config.cflag(format!("-I{}/include", curl_root)); config.cxxflag(format!("-I{}/include", curl_root)); + config.cflag(format!("-L{}/lib", curl_root)); + config.cxxflag(format!("-L{}/lib", curl_root)); + //FIXME: Upstream should be copying this in their build.rs + fs::copy( + format!("{}/build/libcurl.a", curl_root), + format!("{}/lib/libcurl.a", curl_root), + ) + .unwrap(); } } else { config.define("WITH_CURL", "0"); From 405806fbf27c5113b0146c2cab844c549c58e63a Mon Sep 17 00:00:00 2001 From: David Blewett Date: Fri, 25 Aug 2023 09:49:47 -0400 Subject: [PATCH 9/9] Also block rd_kafka_conf_set_resolve_cb, as it also depends on addrinfo. --- rdkafka-sys/src/bindings.rs | 16 +--------------- rdkafka-sys/update-bindings.sh | 11 ++++++----- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/rdkafka-sys/src/bindings.rs b/rdkafka-sys/src/bindings.rs index fe5319376..322f56493 100644 --- a/rdkafka-sys/src/bindings.rs +++ b/rdkafka-sys/src/bindings.rs @@ -1,6 +1,6 @@ /* automatically generated by rust-bindgen 0.66.1 */ -use libc::{addrinfo, c_char, c_int, c_void, sockaddr, FILE}; +use libc::{c_char, c_int, c_void, sockaddr, FILE}; use num_enum::TryFromPrimitive; pub const RD_KAFKA_VERSION: i32 = 33685759; @@ -853,20 +853,6 @@ extern "C" { closesocket_cb: Option c_int>, ); } -extern "C" { - pub fn rd_kafka_conf_set_resolve_cb( - conf: *mut rd_kafka_conf_t, - resolve_cb: Option< - unsafe extern "C" fn( - node: *const c_char, - service: *const c_char, - hints: *const addrinfo, - res: *mut *mut addrinfo, - opaque: *mut c_void, - ) -> c_int, - >, - ); -} extern "C" { pub fn rd_kafka_conf_set_ssl_cert_verify_cb( conf: *mut rd_kafka_conf_t, diff --git a/rdkafka-sys/update-bindings.sh b/rdkafka-sys/update-bindings.sh index a56f6771f..58daa7928 100755 --- a/rdkafka-sys/update-bindings.sh +++ b/rdkafka-sys/update-bindings.sh @@ -1,9 +1,9 @@ #!/usr/bin/env bash -# rd_kafka_conf_set_open_cb is blocklisted because it is not compiled on -# Windows due to its usage of the Unix-only `mode_t` type. With a bit of -# elbow grease we could include it if not targeting Windows, but it doesn't -# seem worthwhile at the moment. +# rd_kafka_conf_set_open_cb/rd_kafka_conf_set_resolve_cb are blocklisted +# because it is not compiled on Windows due to its usage of the Unix-only +# `mode_t` type. With a bit of elbow grease we could include it if not +# targeting Windows, but it doesn't seem worthwhile at the moment. bindgen \ --no-doc-comments \ @@ -14,7 +14,8 @@ bindgen \ --allowlist-var "rd_kafka.*|RD_KAFKA_.*" \ --no-recursive-allowlist \ --blocklist-function "rd_kafka_conf_set_open_cb" \ - --raw-line "use libc::{FILE, addrinfo, sockaddr, c_int, c_void, c_char};" \ + --blocklist-function "rd_kafka_conf_set_resolve_cb" \ + --raw-line "use libc::{FILE, sockaddr, c_int, c_void, c_char};" \ --raw-line "use num_enum::TryFromPrimitive;" \ --default-macro-constant-type "signed" \ "bindings.h" -o "src/bindings.rs"