Skip to content

Commit

Permalink
Add KIP-235 implementation (#4292)
Browse files Browse the repository at this point in the history
Add DNS alias support for secured connection, needed
for Kerberos SASL authentication.
  • Loading branch information
anchitj committed Jul 7, 2023
1 parent d174c0d commit 961946e
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -21,6 +21,8 @@ librdkafka v2.2.0 is a feature release:
closes as normal ones (#4294).
* Added `fetch.queue.backoff.ms` to the consumer to control how long
the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879)
* [KIP-235](https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection):
Add DNS alias support for secured connection (#4292).


## Enhancements
Expand Down
1 change: 1 addition & 0 deletions CONFIGURATION.md
Expand Up @@ -152,6 +152,7 @@ delivery.report.only.error | P | true, false | false
dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb()) <br>*Type: see dedicated API*
dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb()) <br>*Type: see dedicated API*
sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages. <br>*Type: integer*
client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. NOTE: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname. <br>*Type: enum value*


## Topic configuration properties
Expand Down
2 changes: 1 addition & 1 deletion INTRODUCTION.md
Expand Up @@ -1900,7 +1900,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported |
| KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported |
| KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Not supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported |
| KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported |
| KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) |
Expand Down
2 changes: 1 addition & 1 deletion src/rdaddr.h
Expand Up @@ -139,7 +139,7 @@ rd_sockaddr_list_next(rd_sockaddr_list_t *rsal) {

#define RD_SOCKADDR_LIST_FOREACH(sinx, rsal) \
for ((sinx) = &(rsal)->rsal_addr[0]; \
(sinx) < &(rsal)->rsal_addr[(rsal)->rsal_len]; (sinx)++)
(sinx) < &(rsal)->rsal_addr[(rsal)->rsal_cnt]; (sinx)++)

/**
* Wrapper for getaddrinfo(3) that performs these additional tasks:
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka.c
Expand Up @@ -2524,7 +2524,8 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,

/* Add initial list of brokers from configuration */
if (rk->rk_conf.brokerlist) {
if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist,
rd_true) == 0)
rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
"No brokers configured");
}
Expand Down
88 changes: 71 additions & 17 deletions src/rdkafka_broker.c
Expand Up @@ -50,6 +50,7 @@
#include <ctype.h>

#include "rd.h"
#include "rdaddr.h"
#include "rdkafka_int.h"
#include "rdkafka_msg.h"
#include "rdkafka_msgset.h"
Expand Down Expand Up @@ -5257,24 +5258,54 @@ static int rd_kafka_broker_name_parse(rd_kafka_t *rk,
return 0;
}

/**
* @brief Add a broker from a string of type "[proto://]host[:port]" to the list
* of brokers. *cnt is increased by one if a broker was added, else not.
*/
static void rd_kafka_find_or_add_broker(rd_kafka_t *rk,
rd_kafka_secproto_t proto,
const char *host,
uint16_t port,
int *cnt) {
rd_kafka_broker_t *rkb = NULL;

if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
rkb->rkb_source == RD_KAFKA_CONFIGURED) {
(*cnt)++;
} else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto, host,
port, RD_KAFKA_NODEID_UA) != NULL)
(*cnt)++;

/* If rd_kafka_broker_find returned a broker its
* reference needs to be released
* See issue #193 */
if (rkb)
rd_kafka_broker_destroy(rkb);
}

/**
* @brief Adds a (csv list of) broker(s).
* Returns the number of brokers succesfully added.
*
* @locality any thread
* @locks none
*/
int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {
int rd_kafka_brokers_add0(rd_kafka_t *rk,
const char *brokerlist,
rd_bool_t is_bootstrap_server_list) {
char *s_copy = rd_strdup(brokerlist);
char *s = s_copy;
int cnt = 0;
rd_kafka_broker_t *rkb;
int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
rd_sockaddr_inx_t *sinx;
rd_sockaddr_list_t *sockaddr_list;

/* Parse comma-separated list of brokers. */
while (*s) {
uint16_t port;
const char *host;
const char *err_str;
const char *resolved_FQDN;
rd_kafka_secproto_t proto;

if (*s == ',' || *s == ' ') {
Expand All @@ -5287,20 +5318,43 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {
break;

rd_kafka_wrlock(rk);
if (is_bootstrap_server_list &&
rk->rk_conf.client_dns_lookup ==
RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
rd_kafka_dbg(rk, ALL, "INIT",
"Canonicalizing bootstrap broker %s:%d",
host, port);
sockaddr_list = rd_getaddrinfo(
host, RD_KAFKA_PORT_STR, AI_ADDRCONFIG,
rk->rk_conf.broker_addr_family, SOCK_STREAM,
IPPROTO_TCP, rk->rk_conf.resolve_cb,
rk->rk_conf.opaque, &err_str);

if (!sockaddr_list) {
rd_kafka_log(rk, LOG_WARNING, "BROKER",
"Failed to resolve '%s': %s", host,
err_str);
rd_kafka_wrunlock(rk);
continue;
}

if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
rkb->rkb_source == RD_KAFKA_CONFIGURED) {
cnt++;
} else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, proto,
host, port,
RD_KAFKA_NODEID_UA) != NULL)
cnt++;

/* If rd_kafka_broker_find returned a broker its
* reference needs to be released
* See issue #193 */
if (rkb)
rd_kafka_broker_destroy(rkb);
RD_SOCKADDR_LIST_FOREACH(sinx, sockaddr_list) {
resolved_FQDN = rd_sockaddr2str(
sinx, RD_SOCKADDR2STR_F_RESOLVE);
rd_kafka_dbg(
rk, ALL, "INIT",
"Adding broker with resolved hostname %s",
resolved_FQDN);

rd_kafka_find_or_add_broker(
rk, proto, resolved_FQDN, port, &cnt);
};

rd_sockaddr_list_destroy(sockaddr_list);
} else {
rd_kafka_find_or_add_broker(rk, proto, host, port,
&cnt);
}

rd_kafka_wrunlock(rk);
}
Expand All @@ -5322,7 +5376,7 @@ int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist) {


int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) {
return rd_kafka_brokers_add0(rk, brokerlist);
return rd_kafka_brokers_add0(rk, brokerlist, rd_false);
}


Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka_broker.h
Expand Up @@ -469,7 +469,9 @@ rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk,
int state,
rd_kafka_enq_once_t *eonce);

int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist);
int rd_kafka_brokers_add0(rd_kafka_t *rk,
const char *brokerlist,
rd_bool_t is_bootstrap_server_list);
void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state);

void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
Expand Down
13 changes: 13 additions & 0 deletions src/rdkafka_conf.c
Expand Up @@ -1439,6 +1439,19 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"A higher value allows for more effective batching of these "
"messages.",
0, 900000, 10},
{_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup),
"Controls how the client uses DNS lookups. By default, when the lookup "
"returns multiple IP addresses for a hostname, they will all be attempted "
"for connection before the connection is considered failed. This applies "
"to both bootstrap and advertised servers. If the value is set to "
"`resolve_canonical_bootstrap_servers_only`, each entry will be resolved "
"and expanded into a list of canonical names. NOTE: Default here is "
"different from the Java client's default behavior, which connects only "
"to the first IP address returned for a hostname. ",
.vdef = RD_KAFKA_USE_ALL_DNS_IPS,
.s2i = {{RD_KAFKA_USE_ALL_DNS_IPS, "use_all_dns_ips"},
{RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
"resolve_canonical_bootstrap_servers_only"}}},


/*
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_conf.h
Expand Up @@ -158,6 +158,11 @@ typedef enum {
RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */
} rd_kafka_ssl_endpoint_id_t;

typedef enum {
RD_KAFKA_USE_ALL_DNS_IPS,
RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
} rd_kafka_client_dns_lookup_t;

/* Increase in steps of 64 as needed.
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)
Expand Down Expand Up @@ -224,6 +229,7 @@ struct rd_kafka_conf_s {
int api_version_fallback_ms;
char *broker_version_fallback;
rd_kafka_secproto_t security_protocol;
rd_kafka_client_dns_lookup_t client_dns_lookup;

struct {
#if WITH_SSL
Expand Down
2 changes: 2 additions & 0 deletions tests/0004-conf.c
Expand Up @@ -529,6 +529,8 @@ int main_0004_conf(int argc, char **argv) {
"ssl.ca.certificate.stores",
"Intermediate ,, Root ,",
#endif
"client.dns.lookup",
"resolve_canonical_bootstrap_servers_only",
NULL
};
static const char *tconfs[] = {"request.required.acks",
Expand Down

0 comments on commit 961946e

Please sign in to comment.