Skip to content

Commit

Permalink
Fix examples
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Aug 9, 2023
1 parent 0ed7250 commit ef3f480
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 222 deletions.
48 changes: 29 additions & 19 deletions examples/describe_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@


const char *argv0;
static rd_kafka_queue_t *queue; /** Admin result queue.
static rd_kafka_queue_t *queue = NULL; /** Admin result queue.
* This is a global so we can
* yield in stop() */
static volatile sig_atomic_t run = 1;
Expand All @@ -63,7 +63,9 @@ static void stop(int sig) {
exit(2);
}
run = 0;
rd_kafka_queue_yield(queue);

if (queue)
rd_kafka_queue_yield(queue);
}


Expand All @@ -73,7 +75,7 @@ static void usage(const char *reason, ...) {
stderr,
"Describe cluster usage examples\n"
"\n"
"Usage: %s <options> <include_cluster_authorized_operations> ...\n"
"Usage: %s <options> <include_cluster_authorized_operations>"
"\n"
"Options:\n"
" -b <brokers> Bootstrap server list to connect to.\n"
Expand Down Expand Up @@ -123,7 +125,7 @@ static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) {
*/
int64_t parse_int(const char *what, const char *str) {
char *end;
unsigned long n = strtoull(str, &end, 0);
long n = strtol(str, &end, 0);

if (end != str + strlen(str)) {
fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n",
Expand All @@ -134,6 +136,7 @@ int64_t parse_int(const char *what, const char *str) {
return (int64_t)n;
}


/**
* @brief Print cluster information.
*/
Expand Down Expand Up @@ -176,16 +179,21 @@ print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) {
return 0;
}


/**
* @brief Call rd_kafka_DescribeCluster()
*/
static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_t *rk;
rd_kafka_t *rk = NULL;
char errstr[512];
rd_kafka_AdminOptions_t *options;
rd_kafka_AdminOptions_t *options = NULL;
rd_kafka_event_t *event = NULL;
rd_kafka_error_t *error;
int retval = 0;
const int min_argc = 1;

if (argc < min_argc)
usage("Wrong number of arguments.");

int include_cluster_authorized_operations =
parse_int("include_cluster_authorized_operations", argv[0]);
Expand All @@ -194,18 +202,15 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
usage("include_cluster_authorized_operations not a 0-1 int");

/*
* Create consumer instance
* Create producer instance
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk)
fatal("Failed to create new consumer: %s", errstr);
fatal("Failed to create new producer: %s", errstr);

/*
* Describe cluster
*/
queue = rd_kafka_queue_new(rk);

/* Signal handler for clean shutdown */
Expand All @@ -217,6 +222,7 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
if (rd_kafka_AdminOptions_set_request_timeout(
options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) {
fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
retval = 1;
goto exit;
}
if ((error = rd_kafka_AdminOptions_set_include_authorized_operations(
Expand All @@ -226,9 +232,11 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
"operations: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
exit(1);
retval = 1;
goto exit;
}

/* Call DescribeCluster. */
rd_kafka_DescribeCluster(rk, options, queue);

/* Wait for results */
Expand All @@ -246,8 +254,7 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
/* DescribeCluster request failed */
fprintf(stderr, "%% DescribeCluster failed[%" PRId32 "]: %s\n",
err, rd_kafka_event_error_string(event));
goto exit;

retval = 1;
} else {
/* DescribeCluster request succeeded */
const rd_kafka_DescribeCluster_result_t *result;
Expand All @@ -259,12 +266,15 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {


exit:
/* Cleanup. */
if (event)
rd_kafka_event_destroy(event);
rd_kafka_AdminOptions_destroy(options);
rd_kafka_queue_destroy(queue);
/* Destroy the client instance */
rd_kafka_destroy(rk);
if (options)
rd_kafka_AdminOptions_destroy(options);
if (queue)
rd_kafka_queue_destroy(queue);
if (rk)
rd_kafka_destroy(rk);

exit(retval);
}
Expand Down
Loading

0 comments on commit ef3f480

Please sign in to comment.