Skip to content

Commit

Permalink
Remove warning for allow.auto.create.topics with producer
Browse files Browse the repository at this point in the history
Also improves the documentation around this property.

This solves the following problem:

The property "allow.auto.create.topics" is supposed to be a consumer
property, but we are setting it (and it is affecting the behaviour of)
both the consumer and producer. It gives a warning if we change it in
the producer, but works nevertheless.

(the default value for the producer is true)

One of the effects is that if a user is using their producer as an
adminclient, a call to get metadata for a topic might create that
topic, and if the user specifies allow.auto.create.topics, then they
get a warning.

Unfortunately, we even recommend using a producer with the above
setting (see INTRODUCTION.md).

A knock on effect is that both the go and python clients use a producer
internally for their adminclients so the user has to either live with
a call to GetMetadata creating topics, or with the warning.

The java client only allows this property to be set on the consumer,
which makes it more confusing.
  • Loading branch information
milindl committed Sep 6, 2022
1 parent 9b72ca3 commit c64de6e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 51 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,11 @@
# librdkafka v1.9.3

librdkafka v1.9.3 is a (TODO(milind): ask and fill up the type of release) release:

* Setting `allow.auto.create.topics` will no longer give a warning if used by
a producer, since that is an expected use case. Improvement in
documentation for this property.

# librdkafka v1.9.2

librdkafka v1.9.2 is a maintenance release:
Expand Down
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Expand Up @@ -57,6 +57,7 @@ api.version.request | * | true, false | true
api.version.request.timeout.ms | * | 1 .. 300000 | 10000 | low | Timeout for broker API version requests. <br>*Type: integer*
api.version.fallback.ms | * | 0 .. 604800000 | 0 | medium | Dictates how long the `broker.version.fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade). <br>*Type: integer*
broker.version.fallback | * | | 0.10.0 | medium | Older broker versions (before 0.10.0) provide no way for a client to query for supported protocol features (ApiVersionRequest, see `api.version.request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api.version.fallback.ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value >= 0.10, such as 0.10.2.1, enables ApiVersionRequests. <br>*Type: string*
allow.auto.create.topics | * | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuration to take effect. Note: the default value (true) for the producer is different from the default value (false) for the consumer. Further, the consumer default value is different from the Java consumer (true), and this property is not supported by the Java producer. Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies. <br>*Type: boolean*
security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | high | Protocol used to communicate with brokers. <br>*Type: enum value*
ssl.cipher.suites | * | | | low | A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3). <br>*Type: string*
ssl.curves.list | * | | | low | The supported-curves extension in the TLS ClientHello message specifies the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client is willing to have the server use. See manual page for `SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required. <br>*Type: string*
Expand Down Expand Up @@ -127,7 +128,6 @@ rebalance_cb | C | |
offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) <br>*Type: see dedicated API*
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
allow.auto.create.topics | C | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuraiton to take effect. Note: The default value (false) is different from the Java consumer (true). Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies. <br>*Type: boolean*
client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string*
transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0. <br>*Type: string*
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods. <br>*Type: integer*
Expand Down
103 changes: 53 additions & 50 deletions src/rdkafka_conf.c
Expand Up @@ -58,21 +58,22 @@
struct rd_kafka_property {
rd_kafka_conf_scope_t scope;
const char *name;
enum { _RK_C_STR,
_RK_C_INT,
_RK_C_DBL, /* Double */
_RK_C_S2I, /* String to Integer mapping.
* Supports limited canonical str->int mappings
* using s2i[] */
_RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */
_RK_C_BOOL,
_RK_C_PTR, /* Only settable through special set functions */
_RK_C_PATLIST, /* Pattern list */
_RK_C_KSTR, /* Kafka string */
_RK_C_ALIAS, /* Alias: points to other property through .sdef */
_RK_C_INTERNAL, /* Internal, don't expose to application */
_RK_C_INVALID, /* Invalid property, used to catch known
* but unsupported Java properties. */
enum {
_RK_C_STR,
_RK_C_INT,
_RK_C_DBL, /* Double */
_RK_C_S2I, /* String to Integer mapping.
* Supports limited canonical str->int mappings
* using s2i[] */
_RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */
_RK_C_BOOL,
_RK_C_PTR, /* Only settable through special set functions */
_RK_C_PATLIST, /* Pattern list */
_RK_C_KSTR, /* Kafka string */
_RK_C_ALIAS, /* Alias: points to other property through .sdef */
_RK_C_INTERNAL, /* Internal, don't expose to application */
_RK_C_INVALID, /* Invalid property, used to catch known
* but unsupported Java properties. */
} type;
int offset;
const char *desc;
Expand Down Expand Up @@ -718,16 +719,30 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"Any other value >= 0.10, such as 0.10.2.1, "
"enables ApiVersionRequests.",
.sdef = "0.10.0", .validate = rd_kafka_conf_validate_broker_version},
{_RK_GLOBAL, "allow.auto.create.topics", _RK_C_BOOL,
_RK(allow_auto_create_topics),
"Allow automatic topic creation on the broker when subscribing to "
"or assigning non-existent topics. "
"The broker must also be configured with "
"`auto.create.topics.enable=true` for this configuration to "
"take effect. "
"Note: the default value (true) for the producer is "
"different from the default value (false) for the consumer. "
"Further, the consumer default value is different from the Java "
"consumer (true), and this property is not supported by the Java "
"producer. Requires broker version >= 0.11.0.0, for older broker "
"versions only the broker configuration applies.",
0, 1, 0},

/* Security related global properties */
{_RK_GLOBAL | _RK_HIGH, "security.protocol", _RK_C_S2I,
_RK(security_protocol), "Protocol used to communicate with brokers.",
.vdef = RD_KAFKA_PROTO_PLAINTEXT,
.s2i = {{RD_KAFKA_PROTO_PLAINTEXT, "plaintext"},
{RD_KAFKA_PROTO_SSL, "ssl", _UNSUPPORTED_SSL},
{RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext"},
{RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl", _UNSUPPORTED_SSL},
{0, NULL}}},
{RD_KAFKA_PROTO_SSL, "ssl", _UNSUPPORTED_SSL},
{RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext"},
{RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl", _UNSUPPORTED_SSL},
{0, NULL}}},

{_RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR, _RK(ssl.cipher_suites),
"A cipher suite is a named combination of authentication, "
Expand Down Expand Up @@ -848,7 +863,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"OpenSSL >= 1.0.2 required.",
.vdef = RD_KAFKA_SSL_ENDPOINT_ID_NONE,
.s2i = {{RD_KAFKA_SSL_ENDPOINT_ID_NONE, "none"},
{RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https"}},
{RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https"}},
_UNSUPPORTED_OPENSSL_1_0_2},
{_RK_GLOBAL, "ssl.certificate.verify_cb", _RK_C_PTR,
_RK(ssl.cert_verify_cb),
Expand Down Expand Up @@ -966,7 +981,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"and `sasl.oauthbearer.token.endpoint.url`.",
.vdef = RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT,
.s2i = {{RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT, "default"},
{RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC, "oidc"}},
{RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC, "oidc"}},
_UNSUPPORTED_OIDC},
{_RK_GLOBAL, "sasl.oauthbearer.client.id", _RK_C_STR,
_RK(sasl.oauthbearer.client_id),
Expand Down Expand Up @@ -1202,8 +1217,8 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"(requires Apache Kafka 0.8.2 or later on the broker).",
.vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
.s2i = {{RD_KAFKA_OFFSET_METHOD_NONE, "none"},
{RD_KAFKA_OFFSET_METHOD_FILE, "file"},
{RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},
{RD_KAFKA_OFFSET_METHOD_FILE, "file"},
{RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},
{_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "isolation.level", _RK_C_S2I,
_RK(isolation_level),
"Controls how to read messages written transactionally: "
Expand All @@ -1212,7 +1227,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"transactional messages which have been aborted.",
.vdef = RD_KAFKA_READ_COMMITTED,
.s2i = {{RD_KAFKA_READ_UNCOMMITTED, "read_uncommitted"},
{RD_KAFKA_READ_COMMITTED, "read_committed"}}},
{RD_KAFKA_READ_COMMITTED, "read_committed"}}},
{_RK_GLOBAL | _RK_CONSUMER, "consume_cb", _RK_C_PTR, _RK(consume_cb),
"Message consume callback (set with rd_kafka_conf_set_consume_cb())"},
{_RK_GLOBAL | _RK_CONSUMER, "rebalance_cb", _RK_C_PTR, _RK(rebalance_cb),
Expand All @@ -1233,18 +1248,6 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"on-disk corruption to the messages occurred. This check comes "
"at slightly increased CPU usage.",
0, 1, 0},
{_RK_GLOBAL | _RK_CONSUMER, "allow.auto.create.topics", _RK_C_BOOL,
_RK(allow_auto_create_topics),
"Allow automatic topic creation on the broker when subscribing to "
"or assigning non-existent topics. "
"The broker must also be configured with "
"`auto.create.topics.enable=true` for this configuraiton to "
"take effect. "
"Note: The default value (false) is different from the "
"Java consumer (true). "
"Requires broker version >= 0.11.0.0, for older broker versions "
"only the broker configuration applies.",
0, 1, 0},
{_RK_GLOBAL, "client.rack", _RK_C_KSTR, _RK(client_rack),
"A rack identifier for this client. This can be any string value "
"which indicates where this client is physically located. It "
Expand Down Expand Up @@ -1358,11 +1361,11 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"the topic configuration property `compression.codec`. ",
.vdef = RD_KAFKA_COMPRESSION_NONE,
.s2i = {{RD_KAFKA_COMPRESSION_NONE, "none"},
{RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
{RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
{RD_KAFKA_COMPRESSION_LZ4, "lz4"},
{RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
{0}}},
{RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
{RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
{RD_KAFKA_COMPRESSION_LZ4, "lz4"},
{RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
{0}}},
{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "compression.type", _RK_C_ALIAS,
.sdef = "compression.codec"},
{_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "batch.num.messages", _RK_C_INT,
Expand Down Expand Up @@ -1488,12 +1491,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"inherit = inherit global compression.codec configuration.",
.vdef = RD_KAFKA_COMPRESSION_INHERIT,
.s2i = {{RD_KAFKA_COMPRESSION_NONE, "none"},
{RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
{RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
{RD_KAFKA_COMPRESSION_LZ4, "lz4"},
{RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
{RD_KAFKA_COMPRESSION_INHERIT, "inherit"},
{0}}},
{RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
{RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
{RD_KAFKA_COMPRESSION_LZ4, "lz4"},
{RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
{RD_KAFKA_COMPRESSION_INHERIT, "inherit"},
{0}}},
{_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "compression.type", _RK_C_ALIAS,
.sdef = "compression.codec"},
{_RK_TOPIC | _RK_PRODUCER | _RK_MED, "compression.level", _RK_C_INT,
Expand Down Expand Up @@ -1577,7 +1580,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"Apache Kafka 0.8.2 or later on the broker.).",
.vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
.s2i = {{RD_KAFKA_OFFSET_METHOD_FILE, "file"},
{RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},
{RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},

{_RK_TOPIC | _RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT,
_RKT(consume_callback_max_msgs),
Expand Down Expand Up @@ -4293,7 +4296,7 @@ int unittest_conf(void) {
/* Verify that software.client.* string-safing works */
conf = rd_kafka_conf_new();
res = rd_kafka_conf_set(conf, "client.software.name",
" .~aba. va! !.~~", NULL, 0);
" .~aba. va! !.~~", NULL, 0);
RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);
res = rd_kafka_conf_set(conf, "client.software.version",
"!1.2.3.4.5!!! a", NULL, 0);
Expand All @@ -4312,7 +4315,7 @@ int unittest_conf(void) {

readlen = sizeof(readval);
res2 = rd_kafka_conf_get(conf, "client.software.version", readval,
&readlen);
&readlen);
RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK, "%d", res2);
RD_UT_ASSERT(!strcmp(readval, "1.2.3.4.5----a"),
"client.software.* safification failed: \"%s\"", readval);
Expand Down

0 comments on commit c64de6e

Please sign in to comment.