From c64de6e7cb5cf32a5eb42d06ca3e8344560351f5 Mon Sep 17 00:00:00 2001 From: Milind L Date: Wed, 24 Aug 2022 11:04:34 +0530 Subject: [PATCH] Remove warning for allow.auto.create.topics with producer Also improves the documentation around this property. This solves the following problem: The property "allow.auto.create.topics" is supposed to be a consumer property, but we are setting it (and it is affecting the behaviour of) both the consumer and producer. It gives a warning if we change it in the producer, but works nevertheless. (the default value for the producer is true) One of the effects is that if a user is using their producer as an adminclient, a call to get metadata for a topic might create that topic, and if the user specifies allow.auto.create.topics, then they get a warning. Unfortunately, we even recommend using a producer with the above setting (see INTRODUCTION.md). A knock on effect is that both the go and python clients use a producer internally for their adminclients so the user has to either live with a call to GetMetadata creating topics, or with the warning. The java client only allows this property to be set on the consumer, which makes it more confusing. --- CHANGELOG.md | 8 ++++ CONFIGURATION.md | 2 +- src/rdkafka_conf.c | 103 +++++++++++++++++++++++---------------------- 3 files changed, 62 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73eb4df8e1..db768eacd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# librdkafka v1.9.3 + +librdkafka v1.9.3 is a (TODO(milind): ask and fill up the type of release) release: + + * Setting `allow.auto.create.topics` will no longer give a warning if used by + a producer, since that is an expected use case. Improvement in + documentation for this property. + # librdkafka v1.9.2 librdkafka v1.9.2 is a maintenance release: diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 62f7f0cfa1..768ff79b34 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -57,6 +57,7 @@ api.version.request | * | true, false | true api.version.request.timeout.ms | * | 1 .. 300000 | 10000 | low | Timeout for broker API version requests.
*Type: integer* api.version.fallback.ms | * | 0 .. 604800000 | 0 | medium | Dictates how long the `broker.version.fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade).
*Type: integer* broker.version.fallback | * | | 0.10.0 | medium | Older broker versions (before 0.10.0) provide no way for a client to query for supported protocol features (ApiVersionRequest, see `api.version.request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api.version.fallback.ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value >= 0.10, such as 0.10.2.1, enables ApiVersionRequests.
*Type: string* +allow.auto.create.topics | * | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuration to take effect. Note: the default value (true) for the producer is different from the default value (false) for the consumer. Further, the consumer default value is different from the Java consumer (true), and this property is not supported by the Java producer. Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies.
*Type: boolean* security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | high | Protocol used to communicate with brokers.
*Type: enum value* ssl.cipher.suites | * | | | low | A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3).
*Type: string* ssl.curves.list | * | | | low | The supported-curves extension in the TLS ClientHello message specifies the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client is willing to have the server use. See manual page for `SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required.
*Type: string* @@ -127,7 +128,6 @@ rebalance_cb | C | | offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb())
*Type: see dedicated API* enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.
*Type: boolean* check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
*Type: boolean* -allow.auto.create.topics | C | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuraiton to take effect. Note: The default value (false) is different from the Java consumer (true). Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies.
*Type: boolean* client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`.
*Type: string* transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0.
*Type: string* transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods.
*Type: integer* diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index a8a1204bf3..04c28113b0 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -58,21 +58,22 @@ struct rd_kafka_property { rd_kafka_conf_scope_t scope; const char *name; - enum { _RK_C_STR, - _RK_C_INT, - _RK_C_DBL, /* Double */ - _RK_C_S2I, /* String to Integer mapping. - * Supports limited canonical str->int mappings - * using s2i[] */ - _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */ - _RK_C_BOOL, - _RK_C_PTR, /* Only settable through special set functions */ - _RK_C_PATLIST, /* Pattern list */ - _RK_C_KSTR, /* Kafka string */ - _RK_C_ALIAS, /* Alias: points to other property through .sdef */ - _RK_C_INTERNAL, /* Internal, don't expose to application */ - _RK_C_INVALID, /* Invalid property, used to catch known - * but unsupported Java properties. */ + enum { + _RK_C_STR, + _RK_C_INT, + _RK_C_DBL, /* Double */ + _RK_C_S2I, /* String to Integer mapping. + * Supports limited canonical str->int mappings + * using s2i[] */ + _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */ + _RK_C_BOOL, + _RK_C_PTR, /* Only settable through special set functions */ + _RK_C_PATLIST, /* Pattern list */ + _RK_C_KSTR, /* Kafka string */ + _RK_C_ALIAS, /* Alias: points to other property through .sdef */ + _RK_C_INTERNAL, /* Internal, don't expose to application */ + _RK_C_INVALID, /* Invalid property, used to catch known + * but unsupported Java properties. */ } type; int offset; const char *desc; @@ -718,16 +719,30 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "Any other value >= 0.10, such as 0.10.2.1, " "enables ApiVersionRequests.", .sdef = "0.10.0", .validate = rd_kafka_conf_validate_broker_version}, + {_RK_GLOBAL, "allow.auto.create.topics", _RK_C_BOOL, + _RK(allow_auto_create_topics), + "Allow automatic topic creation on the broker when subscribing to " + "or assigning non-existent topics. " + "The broker must also be configured with " + "`auto.create.topics.enable=true` for this configuration to " + "take effect. " + "Note: the default value (true) for the producer is " + "different from the default value (false) for the consumer. " + "Further, the consumer default value is different from the Java " + "consumer (true), and this property is not supported by the Java " + "producer. Requires broker version >= 0.11.0.0, for older broker " + "versions only the broker configuration applies.", + 0, 1, 0}, /* Security related global properties */ {_RK_GLOBAL | _RK_HIGH, "security.protocol", _RK_C_S2I, _RK(security_protocol), "Protocol used to communicate with brokers.", .vdef = RD_KAFKA_PROTO_PLAINTEXT, .s2i = {{RD_KAFKA_PROTO_PLAINTEXT, "plaintext"}, - {RD_KAFKA_PROTO_SSL, "ssl", _UNSUPPORTED_SSL}, - {RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext"}, - {RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl", _UNSUPPORTED_SSL}, - {0, NULL}}}, + {RD_KAFKA_PROTO_SSL, "ssl", _UNSUPPORTED_SSL}, + {RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext"}, + {RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl", _UNSUPPORTED_SSL}, + {0, NULL}}}, {_RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR, _RK(ssl.cipher_suites), "A cipher suite is a named combination of authentication, " @@ -848,7 +863,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "OpenSSL >= 1.0.2 required.", .vdef = RD_KAFKA_SSL_ENDPOINT_ID_NONE, .s2i = {{RD_KAFKA_SSL_ENDPOINT_ID_NONE, "none"}, - {RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https"}}, + {RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https"}}, _UNSUPPORTED_OPENSSL_1_0_2}, {_RK_GLOBAL, "ssl.certificate.verify_cb", _RK_C_PTR, _RK(ssl.cert_verify_cb), @@ -966,7 +981,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "and `sasl.oauthbearer.token.endpoint.url`.", .vdef = RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT, .s2i = {{RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT, "default"}, - {RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC, "oidc"}}, + {RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC, "oidc"}}, _UNSUPPORTED_OIDC}, {_RK_GLOBAL, "sasl.oauthbearer.client.id", _RK_C_STR, _RK(sasl.oauthbearer.client_id), @@ -1202,8 +1217,8 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "(requires Apache Kafka 0.8.2 or later on the broker).", .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, .s2i = {{RD_KAFKA_OFFSET_METHOD_NONE, "none"}, - {RD_KAFKA_OFFSET_METHOD_FILE, "file"}, - {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, + {RD_KAFKA_OFFSET_METHOD_FILE, "file"}, + {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "isolation.level", _RK_C_S2I, _RK(isolation_level), "Controls how to read messages written transactionally: " @@ -1212,7 +1227,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "transactional messages which have been aborted.", .vdef = RD_KAFKA_READ_COMMITTED, .s2i = {{RD_KAFKA_READ_UNCOMMITTED, "read_uncommitted"}, - {RD_KAFKA_READ_COMMITTED, "read_committed"}}}, + {RD_KAFKA_READ_COMMITTED, "read_committed"}}}, {_RK_GLOBAL | _RK_CONSUMER, "consume_cb", _RK_C_PTR, _RK(consume_cb), "Message consume callback (set with rd_kafka_conf_set_consume_cb())"}, {_RK_GLOBAL | _RK_CONSUMER, "rebalance_cb", _RK_C_PTR, _RK(rebalance_cb), @@ -1233,18 +1248,6 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "on-disk corruption to the messages occurred. This check comes " "at slightly increased CPU usage.", 0, 1, 0}, - {_RK_GLOBAL | _RK_CONSUMER, "allow.auto.create.topics", _RK_C_BOOL, - _RK(allow_auto_create_topics), - "Allow automatic topic creation on the broker when subscribing to " - "or assigning non-existent topics. " - "The broker must also be configured with " - "`auto.create.topics.enable=true` for this configuraiton to " - "take effect. " - "Note: The default value (false) is different from the " - "Java consumer (true). " - "Requires broker version >= 0.11.0.0, for older broker versions " - "only the broker configuration applies.", - 0, 1, 0}, {_RK_GLOBAL, "client.rack", _RK_C_KSTR, _RK(client_rack), "A rack identifier for this client. This can be any string value " "which indicates where this client is physically located. It " @@ -1358,11 +1361,11 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "the topic configuration property `compression.codec`. ", .vdef = RD_KAFKA_COMPRESSION_NONE, .s2i = {{RD_KAFKA_COMPRESSION_NONE, "none"}, - {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB}, - {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY}, - {RD_KAFKA_COMPRESSION_LZ4, "lz4"}, - {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD}, - {0}}}, + {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB}, + {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY}, + {RD_KAFKA_COMPRESSION_LZ4, "lz4"}, + {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD}, + {0}}}, {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "compression.type", _RK_C_ALIAS, .sdef = "compression.codec"}, {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "batch.num.messages", _RK_C_INT, @@ -1488,12 +1491,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "inherit = inherit global compression.codec configuration.", .vdef = RD_KAFKA_COMPRESSION_INHERIT, .s2i = {{RD_KAFKA_COMPRESSION_NONE, "none"}, - {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB}, - {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY}, - {RD_KAFKA_COMPRESSION_LZ4, "lz4"}, - {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD}, - {RD_KAFKA_COMPRESSION_INHERIT, "inherit"}, - {0}}}, + {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB}, + {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY}, + {RD_KAFKA_COMPRESSION_LZ4, "lz4"}, + {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD}, + {RD_KAFKA_COMPRESSION_INHERIT, "inherit"}, + {0}}}, {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "compression.type", _RK_C_ALIAS, .sdef = "compression.codec"}, {_RK_TOPIC | _RK_PRODUCER | _RK_MED, "compression.level", _RK_C_INT, @@ -1577,7 +1580,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "Apache Kafka 0.8.2 or later on the broker.).", .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, .s2i = {{RD_KAFKA_OFFSET_METHOD_FILE, "file"}, - {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, + {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}}, {_RK_TOPIC | _RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT, _RKT(consume_callback_max_msgs), @@ -4293,7 +4296,7 @@ int unittest_conf(void) { /* Verify that software.client.* string-safing works */ conf = rd_kafka_conf_new(); res = rd_kafka_conf_set(conf, "client.software.name", - " .~aba. va! !.~~", NULL, 0); + " .~aba. va! !.~~", NULL, 0); RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res); res = rd_kafka_conf_set(conf, "client.software.version", "!1.2.3.4.5!!! a", NULL, 0); @@ -4312,7 +4315,7 @@ int unittest_conf(void) { readlen = sizeof(readval); res2 = rd_kafka_conf_get(conf, "client.software.version", readval, - &readlen); + &readlen); RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK, "%d", res2); RD_UT_ASSERT(!strcmp(readval, "1.2.3.4.5----a"), "client.software.* safification failed: \"%s\"", readval);