Skip to content

Commit

Permalink
PR Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Jun 8, 2023
1 parent 80decf4 commit 92068d1
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 22 deletions.
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ delivery.report.only.error | P | true, false | false
dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb()) <br>*Type: see dedicated API*
dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb()) <br>*Type: see dedicated API*
sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages. <br>*Type: integer*
enable.bootstrap.servers.canonical.resolve | * | true, false | false | low | Resolve each bootstrap address into a list of canonical names. By default client will not attempt to reverse lookup to find the FQDN.Default: false. <br>*Type: boolean*
client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default when the lookupreturns multiple IP addresses for a hostname they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only` each entry will be resolved and expanded into a list of canonical names <br>*Type: enum value*


## Topic configuration properties
Expand Down
30 changes: 18 additions & 12 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -5247,14 +5247,14 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {
int cnt = 0;
int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
rd_sockaddr_inx_t *sinx;
rd_sockaddr_list_t *sockaddrList;
rd_sockaddr_list_t *sockaddr_list;

/* Parse comma-separated list of brokers. */
while (*s) {
uint16_t port;
const char *host;
const char *errstr;
const char *resolvedFQDN;
const char *err_str;
const char *resolved_FQDN;
rd_kafka_secproto_t proto;

if (*s == ',' || *s == ' ') {
Expand All @@ -5267,32 +5267,38 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {
break;

rd_kafka_wrlock(rk);
if (rk->rk_conf.enable_bootstrap_servers_canonical_resolve) {
if (rk->rk_conf.client_dns_lookup ==
RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
rd_kafka_dbg(rk, ALL, "INIT",
"Canonicalizing bootstrap broker %s:%d",
host, port);
sockaddrList = rd_getaddrinfo(
sockaddr_list = rd_getaddrinfo(
host, RD_KAFKA_PORT_STR, AI_ADDRCONFIG,
rk->rk_conf.broker_addr_family, SOCK_STREAM,
IPPROTO_TCP, rk->rk_conf.resolve_cb,
rk->rk_conf.opaque, &errstr);
rk->rk_conf.opaque, &err_str);

if (!sockaddrList) {
if (!sockaddr_list) {
rd_kafka_log(rk, LOG_WARNING, "BROKER",
"Failed to resolve '%s': %s", host,
errstr);
err_str);
rd_kafka_wrunlock(rk);
continue;
}

RD_SOCKADDR_LIST_FOREACH(sinx, sockaddrList) {
resolvedFQDN = rd_sockaddr2str(
RD_SOCKADDR_LIST_FOREACH(sinx, sockaddr_list) {
resolved_FQDN = rd_sockaddr2str(
sinx, RD_SOCKADDR2STR_F_RESOLVE);
rd_kafka_dbg(
rk, ALL, "INIT",
"Adding broker with resolved hostname %s",
resolved_FQDN);

rd_kafka_find_or_add_broker(
rk, proto, resolvedFQDN, port, &cnt);
rk, proto, resolved_FQDN, port, &cnt);
};

rd_sockaddr_list_destroy(sockaddrList);
rd_sockaddr_list_destroy(sockaddr_list);
} else {
rd_kafka_find_or_add_broker(rk, proto, host, port,
&cnt);
Expand Down
18 changes: 12 additions & 6 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1427,12 +1427,18 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"A higher value allows for more effective batching of these "
"messages.",
0, 900000, 10},
{_RK_GLOBAL, "enable.bootstrap.servers.canonical.resolve", _RK_C_BOOL,
_RK(enable_bootstrap_servers_canonical_resolve),
"Resolve each bootstrap address into a list of canonical names. By "
"default client will not attempt to reverse lookup to find the FQDN."
"Default: false.",
0, 1, 0},
{_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup),
"Controls how the client uses DNS lookups. By default when the lookup "
"returns multiple IP addresses for a hostname they will all be attempted "
"to connect to before failing the connection. Applies to both bootstrap "
"and "
" advertised servers. If the value is set to "
"`resolve_canonical_bootstrap_servers_only` each entry will be "
"resolved and expanded into a list of canonical names",
.vdef = RD_KAFKA_USE_ALL_DNS_IPS,
.s2i = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"},
{RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
"resolve_canonical_bootstrap_servers_only"}}},


/*
Expand Down
7 changes: 6 additions & 1 deletion src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ typedef enum {
RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */
} rd_kafka_ssl_endpoint_id_t;

typedef enum {
RD_KAFKA_USE_ALL_DNS_IPS,
RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
} rd_kafka_client_dns_lookup_t;

/* Increase in steps of 64 as needed.
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)
Expand Down Expand Up @@ -224,7 +229,7 @@ struct rd_kafka_conf_s {
int api_version_fallback_ms;
char *broker_version_fallback;
rd_kafka_secproto_t security_protocol;
int enable_bootstrap_servers_canonical_resolve;
rd_kafka_client_dns_lookup_t client_dns_lookup;

struct {
#if WITH_SSL
Expand Down
4 changes: 2 additions & 2 deletions tests/0004-conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ int main_0004_conf(int argc, char **argv) {
"ssl.ca.certificate.stores",
"Intermediate ,, Root ,",
#endif
"enable.bootstrap.servers.canonical.resolve",
"true",
"client.dns.lookup",
"resolve_canonical_bootstrap_servers_only",
NULL
};
static const char *tconfs[] = {"request.required.acks",
Expand Down

0 comments on commit 92068d1

Please sign in to comment.