Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental alter configs implementation [KIP-339] #4110

Merged
merged 34 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5451d69
incremental alter configs implementation
PrasanthV454 Dec 16, 2022
1fb808a
added simple test to test.c
PrasanthV454 Dec 20, 2022
2c6b6b5
incremental alter enum fix
PrasanthV454 Dec 20, 2022
af9b012
Api Version revert max to zero
PrasanthV454 Jan 2, 2023
c164e25
latest changes update
PrasanthV454 Feb 5, 2023
a9f24f9
removed temp files
PrasanthV454 Feb 5, 2023
de30be6
removed temp files
PrasanthV454 Feb 5, 2023
3db3b75
exported append,subtract and delete
PrasanthV454 Feb 5, 2023
da3d4f1
style errors fixed
PrasanthV454 Feb 5, 2023
74f053d
Merge branch 'master' into feature/incremental-alter-KIP339
emasab May 30, 2023
0abdca2
Upgrade IncrementalAlterConfigs version to 1
emasab May 30, 2023
98816ed
Address comments
emasab May 30, 2023
96f13bb
Documentation fixes
emasab May 30, 2023
e295a28
Add example
emasab May 30, 2023
3c746e5
CMake fix
emasab May 30, 2023
fddd40e
Fail early on duplicate ConfigResource or
emasab Jun 1, 2023
c33e72b
Return rd_kafka_error_t * instead of
emasab Jun 6, 2023
fd23815
Doxygen fixes
emasab Jun 6, 2023
9991b3c
Changelog entry
emasab Jun 8, 2023
e80b257
Use single function for all the
emasab Jun 21, 2023
a63ca7f
Rename rd_kafka_ConfigResource_incremental_alter_config
emasab Jun 21, 2023
d81c161
Documentation for rd_kafka_AlterConfigOpType_t
emasab Jun 29, 2023
fd209ae
Address comments
emasab Jun 30, 2023
c462e01
rd_kafka_AlterOperation_t doc
emasab Jun 30, 2023
b233fea
Remove previous never really used
emasab Jun 30, 2023
82c8278
Merge branch 'master' into feature/incremental-alter-KIP339
emasab Jun 30, 2023
17c0a7c
Revert broker option to default if
emasab Jul 3, 2023
eb0d852
Documentation about broker option.
emasab Jul 3, 2023
6976690
Test healper with different resource type
emasab Jul 3, 2023
0370ad0
Fix parameter type and documentation.
emasab Jul 3, 2023
ee26cd5
Incremental alter configs integration
emasab Jul 4, 2023
9cbc19d
Rename set_incremental_config
emasab Jul 5, 2023
f88940c
Merge branch 'master' into feature/incremental-alter-KIP339
emasab Jul 10, 2023
df55c44
Changelog contribution
emasab Jul 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 33 additions & 32 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1889,7 +1889,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-302 - Use all addresses for resolved broker hostname | 2.1.0 | Supported |
| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Not supported |
| KIP-322 - DeleteTopics disabled error code | 2.1.0 | Supported |
| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Not supported |
| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Supported |
| KIP-341 - Update Sticky partition assignment data | 2.3.0 | Not supported (superceeded by KIP-429) |
| KIP-342 - Custom SASL OAUTHBEARER extensions | 2.1.0 | Supported |
| KIP-345 - Consumer: Static membership | 2.4.0 | Supported |
Expand Down Expand Up @@ -1948,37 +1948,38 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------- | ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 13 | 11 |
| 2 | ListOffsets | 7 | 2 |
| 3 | Metadata | 12 | 4 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 24 | AddPartitionsToTxn | 3 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 0 |
| 36 | SaslAuthenticate | 2 | 0 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ----------------------- | ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 13 | 11 |
| 2 | ListOffsets | 7 | 2 |
| 3 | Metadata | 12 | 4 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 24 | AddPartitionsToTxn | 3 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 0 |
| 36 | SaslAuthenticate | 2 | 0 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | TBD | TBD |
emasab marked this conversation as resolved.
Show resolved Hide resolved
| 47 | OffsetDelete | 0 | 0 |



Expand Down
26 changes: 14 additions & 12 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1590,18 +1590,19 @@ static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
},
[3 /*hide-unless-non-zero*/] = {
/* Hide Admin requests unless they've been used */
[RD_KAFKAP_CreateTopics] = rd_true,
[RD_KAFKAP_DeleteTopics] = rd_true,
[RD_KAFKAP_DeleteRecords] = rd_true,
[RD_KAFKAP_CreatePartitions] = rd_true,
[RD_KAFKAP_DescribeAcls] = rd_true,
[RD_KAFKAP_CreateAcls] = rd_true,
[RD_KAFKAP_DeleteAcls] = rd_true,
[RD_KAFKAP_DescribeConfigs] = rd_true,
[RD_KAFKAP_AlterConfigs] = rd_true,
[RD_KAFKAP_DeleteGroups] = rd_true,
[RD_KAFKAP_ListGroups] = rd_true,
[RD_KAFKAP_DescribeGroups] = rd_true}};
[RD_KAFKAP_CreateTopics] = rd_true,
[RD_KAFKAP_DeleteTopics] = rd_true,
[RD_KAFKAP_DeleteRecords] = rd_true,
[RD_KAFKAP_CreatePartitions] = rd_true,
[RD_KAFKAP_DescribeAcls] = rd_true,
[RD_KAFKAP_CreateAcls] = rd_true,
[RD_KAFKAP_DeleteAcls] = rd_true,
[RD_KAFKAP_DescribeConfigs] = rd_true,
[RD_KAFKAP_AlterConfigs] = rd_true,
[RD_KAFKAP_IncrementalAlterConfigs] = rd_true,
[RD_KAFKAP_DeleteGroups] = rd_true,
[RD_KAFKAP_ListGroups] = rd_true,
[RD_KAFKAP_DescribeGroups] = rd_true}};
int i;
int cnt = 0;

Expand Down Expand Up @@ -3919,6 +3920,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
case RD_KAFKA_OP_DELETETOPICS:
case RD_KAFKA_OP_CREATEPARTITIONS:
case RD_KAFKA_OP_ALTERCONFIGS:
case RD_KAFKA_OP_INCREMENTALALTERCONFIGS:
case RD_KAFKA_OP_DESCRIBECONFIGS:
case RD_KAFKA_OP_DELETERECORDS:
case RD_KAFKA_OP_DELETEGROUPS:
Expand Down
162 changes: 159 additions & 3 deletions src/rdkafka.h
emasab marked this conversation as resolved.
Show resolved Hide resolved
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5283,6 +5283,8 @@ typedef int rd_kafka_event_type_t;
/** AlterConsumerGroupOffsets_result_t */
#define RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT 0x10000

emasab marked this conversation as resolved.
Show resolved Hide resolved
#define RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT \
0x20000 /**< IncrementalAlterConfigs_result_t */

/**
* @returns the event type for the given event.
Expand Down Expand Up @@ -5429,6 +5431,7 @@ int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev);
* - RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
* - RD_KAFKA_EVENT_DELETEACLS_RESULT
* - RD_KAFKA_EVENT_ALTERCONFIGS_RESULT
* - RD_KAFKA_EVENT_INCREMENTAL_ALTERCONFIGS_RESULT
* - RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT
* - RD_KAFKA_EVENT_DELETEGROUPS_RESULT
* - RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT
Expand Down Expand Up @@ -5532,6 +5535,8 @@ typedef rd_kafka_event_t rd_kafka_DeleteAcls_result_t;
typedef rd_kafka_event_t rd_kafka_CreatePartitions_result_t;
/*! AlterConfigs result type */
typedef rd_kafka_event_t rd_kafka_AlterConfigs_result_t;
/*! IncrementalAlterConfigs result type */
typedef rd_kafka_event_t rd_kafka_IncrementalAlterConfigs_result_t;
/*! CreateTopics result type */
typedef rd_kafka_event_t rd_kafka_DescribeConfigs_result_t;
/*! DeleteRecords result type */
Expand Down Expand Up @@ -5597,6 +5602,18 @@ rd_kafka_event_CreatePartitions_result(rd_kafka_event_t *rkev);
RD_EXPORT const rd_kafka_AlterConfigs_result_t *
rd_kafka_event_AlterConfigs_result(rd_kafka_event_t *rkev);

/**
* @brief Get IncrementalAlterConfigs result.
*
* @returns the result of a IncrementalAlterConfigs request, or NULL if event is
* of different type.
*
* Event types:
* RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT
*/
RD_EXPORT const rd_kafka_IncrementalAlterConfigs_result_t *
rd_kafka_event_IncrementalAlterConfigs_result(rd_kafka_event_t *rkev);

/**
* @brief Get DescribeConfigs result.
*
Expand Down Expand Up @@ -6621,9 +6638,11 @@ typedef enum rd_kafka_admin_op_t {
RD_KAFKA_ADMIN_OP_DELETETOPICS, /**< DeleteTopics */
RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, /**< CreatePartitions */
RD_KAFKA_ADMIN_OP_ALTERCONFIGS, /**< AlterConfigs */
RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, /**< DescribeConfigs */
RD_KAFKA_ADMIN_OP_DELETERECORDS, /**< DeleteRecords */
RD_KAFKA_ADMIN_OP_DELETEGROUPS, /**< DeleteGroups */
RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS,
emasab marked this conversation as resolved.
Show resolved Hide resolved
/**< IncrementalAlterConfigs */
RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, /**< DescribeConfigs */
RD_KAFKA_ADMIN_OP_DELETERECORDS, /**< DeleteRecords */
RD_KAFKA_ADMIN_OP_DELETEGROUPS, /**< DeleteGroups */
/** DeleteConsumerGroupOffsets */
RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS,
RD_KAFKA_ADMIN_OP_CREATEACLS, /**< CreateAcls */
Expand Down Expand Up @@ -6771,6 +6790,8 @@ rd_kafka_AdminOptions_set_validate_only(rd_kafka_AdminOptions_t *options,
* the following exceptions:
* - AlterConfigs with a BROKER resource are sent to the broker id set
* as the resource name.
* - IncrementalAlterConfigs with a BROKER resource are sent to the broker id
* set as the resource name.
emasab marked this conversation as resolved.
Show resolved Hide resolved
* - DescribeConfigs with a BROKER resource are sent to the broker id set
* as the resource name.
*
Expand Down Expand Up @@ -7396,6 +7417,82 @@ rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t *config,
const char *value);


/**
* @brief Set the value of the configuration entry.
*
* @param config ConfigResource to set config property on.
* @param name Configuration name, depends on resource type.
* @param value Configuration value, depends on resource type and \p name.
* Set to \c NULL or use
* rd_kafka_ConfigResource_incremental_delete_config
* to revert configuration value to default.
*
* This will overwrite the current value.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR if config was added to resource,
* or RD_KAFKA_RESP_ERR__INVALID_ARG on invalid input.
*/
RD_EXPORT rd_kafka_resp_err_t rd_kafka_ConfigResource_incremental_set_config(
rd_kafka_ConfigResource_t *config,
const char *name,
const char *value);


/**
* @brief (For list-type configuration entries only) Add the specified
* values to the current value of the configuration entry.
*
* @param config ConfigResource to append config properties on.
* @param name Configuration name, depends on resource type.
* @param value Configuration values, depends on resource type and \p name.
*
* This will append to the current list.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR if configs were added to resource,
* or RD_KAFKA_RESP_ERR__INVALID_ARG on invalid input.
*/
RD_EXPORT rd_kafka_resp_err_t rd_kafka_ConfigResource_incremental_append_config(
rd_kafka_ConfigResource_t *config,
const char *name,
const char *value);


/**
* @brief (For list-type configuration entries only) Removes the specified
* values from the current value of the configuration entry.
*
* @param config ConfigResource to set config property on.
* @param name Configuration name, depends on resource type.
* @param value Configuration values, depends on resource type and \p name.
*
* This will subtract from the current list.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR if configs were subtracted from resource,
* or RD_KAFKA_RESP_ERR__INVALID_ARG on invalid input.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_ConfigResource_incremental_subtract_config(
rd_kafka_ConfigResource_t *config,
const char *name,
const char *value);

PrasanthV454 marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Revert the configuration entry to the default value (possibly NULL).
*
* @param config ConfigResource to set config property on.
* @param name Configuration name, depends on resource type.
*
* This will set the value to default or NULL.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR if config was deleted from resource,
* or RD_KAFKA_RESP_ERR__INVALID_ARG on invalid input.
*/
RD_EXPORT rd_kafka_resp_err_t rd_kafka_ConfigResource_incremental_delete_config(
rd_kafka_ConfigResource_t *config,
const char *name);


/**
* @brief Get an array of config entries from a ConfigResource object.
*
Expand Down Expand Up @@ -7495,6 +7592,65 @@ rd_kafka_AlterConfigs_result_resources(



/*
* IncrementalAlterConfigs - alter cluster configuration.
*
*/


/**
* @brief Incrementally update the configuration for the specified resources.
* Updates are not transactional so they may succeed for some resources
* while fail for others. The configs for a particular resource are
* updated atomically, replacing values using the provided ConfigEntrys and
* reverting unspecified ConfigEntrys to their default values.
*
* @remark Requires broker version >=2.3.0
*
* @warning IncrementalAlterConfigs will replace all existing configuration for
emasab marked this conversation as resolved.
Show resolved Hide resolved
* the provided resources with the new configuration given,
* reverting all other configuration to their default values.
*
* @remark Multiple resources and resource types may be set, but at most one
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
* resource of type \c RD_KAFKA_RESOURCE_BROKER is allowed per call
* since these resource requests must be sent to the broker specified
* in the resource.
*
*/
RD_EXPORT
void rd_kafka_IncrementalAlterConfigs(rd_kafka_t *rk,
rd_kafka_ConfigResource_t **configs,
size_t config_cnt,
const rd_kafka_AdminOptions_t *options,
rd_kafka_queue_t *rkqu);


/*
* IncrementalAlterConfigs result type and methods
*/

/**
* @brief Get an array of resource results from a IncrementalAlterConfigs
* result.
*
* Use \c rd_kafka_ConfigResource_error() and
* \c rd_kafka_ConfigResource_error_string() to extract per-resource error
* results on the returned array elements.
*
* The returned object life-times are the same as the \p result object.
*
* @param result Result object to get resource results from.
* @param cntp is updated to the number of elements in the array.
*
* @returns an array of ConfigResource elements, or NULL if not available.
*/
RD_EXPORT const rd_kafka_ConfigResource_t **
rd_kafka_IncrementalAlterConfigs_result_resources(
const rd_kafka_AlterConfigs_result_t *result,
emasab marked this conversation as resolved.
Show resolved Hide resolved
size_t *cntp);



/*
* DescribeConfigs - retrieve cluster configuration.
*
Expand Down