From 9cafbba8808963a373374524b0030f63d1b4c471 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Sat, 1 Jun 2024 13:35:04 +0200 Subject: [PATCH 01/14] Update README.md --- README.md | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/README.md b/README.md index e8732207..8a659bd7 100644 --- a/README.md +++ b/README.md @@ -14,31 +14,6 @@ The high level and low level *consumers*, *producer*, and *metadata* APIs are su Documentation is available [here](https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html). -## Sponsors - - - - - -
- - - Upstash - - -

Upstash: Serverless Kafka

- -
    -
  • True Serverless Kafka with per-request-pricing
  • -
  • Managed Apache Kafka, works with all Kafka clients
  • -
  • Built-in REST API designed for serverless and edge functions
  • -
- -[Start for free in 30 seconds!](https://upstash.com/?utm_source=php-rdkafka) -
- -php-rdkafka supports Ukraine. Proceeds from our generous sponsors are currently donated to the [Support Ukraine collective](https://opencollective.com/support-ukraine). - ## Table of Contents 1. [Installation](#installation) From 4b085d2a1a6e7fe3d746d3e937b6d4786da9328a Mon Sep 17 00:00:00 2001 From: Ivan Keberlein Date: Fri, 30 Aug 2024 15:07:14 +0300 Subject: [PATCH 02/14] Add missing methods to support incremental rebalance (#541) --- config.m4 | 8 ++++- kafka_consumer.c | 53 +++++++++++++++++++++++++++++++++ kafka_consumer.stub.php | 8 +++++ kafka_consumer_arginfo.h | 18 +++++++++++ kafka_consumer_legacy_arginfo.h | 18 +++++++++++ 5 files changed, 104 insertions(+), 1 deletion(-) diff --git a/config.m4 b/config.m4 index 9a10e91f..b1f75236 100644 --- a/config.m4 +++ b/config.m4 @@ -19,7 +19,7 @@ if test "$PHP_RDKAFKA" != "no"; then fi done fi - + if test -z "$RDKAFKA_DIR"; then AC_MSG_RESULT([not found]) AC_MSG_ERROR([Please reinstall the rdkafka distribution]) @@ -90,6 +90,12 @@ if test "$PHP_RDKAFKA" != "no"; then AC_MSG_WARN([oauthbearer token refresh cb is not available]) ]) + AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[ + AC_DEFINE(HAS_RD_KAFKA_INCREMENTAL_ASSIGN,1,[ ]) + ],[ + AC_MSG_WARN([no rd_kafka_incremental_(un)assign, incremental rebalance support will not be available]) + ]) + LDFLAGS="$ORIG_LDFLAGS" CPPFLAGS="$ORIG_CPPFLAGS" diff --git a/kafka_consumer.c b/kafka_consumer.c index f00a20d5..8678343e 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -212,6 +212,59 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign) } /* }}} */ +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */ +{ + HashTable *htopars = NULL; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) { + return; + } + + object_intern *intern = get_object(getThis()); + if (!intern) { + return; + } + + rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars); + if (!topics) { + return; + } + + rd_kafka_error_t *err; + + if (assign) { + err = rd_kafka_incremental_assign(intern->rk, topics); + } else { + err = rd_kafka_incremental_unassign(intern->rk, topics); + } + + rd_kafka_topic_partition_list_destroy(topics); + + if (err) { + zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0); + rd_kafka_error_destroy(err); + } +} +/* }}} */ + +/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics) + Incremental assignment of partitions to consume */ +PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign) +{ + consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU); +} +/* }}} */ + +/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics) + Incremental unassign of partitions to consume */ +PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign) +{ + consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU); +} +/* }}} */ +#endif // !HAS_RD_KAFKA_INCREMENTAL_ASSIGN + /* {{{ proto array RdKafka\KafkaConsumer::getAssignment() Returns the current partition getAssignment */ PHP_METHOD(RdKafka_KafkaConsumer, getAssignment) diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index 023b0a46..d12ef2d4 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -21,6 +21,14 @@ public function __construct(Conf $conf) {} /** @tentative-return-type */ public function assign(?array $topic_partitions = null): void {} +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN + /** @tentative-return-type */ + public function incrementalAssign(array $topic_partitions): void {} + + /** @tentative-return-type */ + public function incrementalUnassign(array $topic_partitions): void {} +#endif + /** @tentative-return-type */ public function getAssignment(): array {} diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index 4ba236ba..8ff01db2 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaCon ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null") ZEND_END_ARG_INFO() +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0) ZEND_END_ARG_INFO() @@ -70,6 +80,10 @@ ZEND_END_ARG_INFO() ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); +#endif ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); ZEND_METHOD(RdKafka_KafkaConsumer, commit); ZEND_METHOD(RdKafka_KafkaConsumer, close); @@ -91,6 +105,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN + ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC) diff --git a/kafka_consumer_legacy_arginfo.h b/kafka_consumer_legacy_arginfo.h index 6069e1ba..f2e83eee 100644 --- a/kafka_consumer_legacy_arginfo.h +++ b/kafka_consumer_legacy_arginfo.h @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0) ZEND_ARG_INFO(0, topic_partitions) ZEND_END_ARG_INFO() +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0) ZEND_END_ARG_INFO() @@ -69,6 +79,10 @@ ZEND_END_ARG_INFO() ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); +#endif ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); ZEND_METHOD(RdKafka_KafkaConsumer, commit); ZEND_METHOD(RdKafka_KafkaConsumer, close); @@ -90,6 +104,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN + ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC) From 554305b58bd4de29ea8d51a2a9d3604b400966f1 Mon Sep 17 00:00:00 2001 From: Quentin Dreyer Date: Mon, 2 Sep 2024 13:05:37 +0200 Subject: [PATCH 03/14] Add getControllerId (#554) --- .github/workflows/test.yml | 1 + .github/workflows/test/start-kafka.sh | 8 +-- .gitignore | 1 + conf_arginfo.h | 55 ++++++++++++---- conf_legacy_arginfo.h | 29 +++++---- config.m4 | 10 +++ kafka_consumer.c | 24 ++++++- kafka_consumer.stub.php | 5 ++ kafka_consumer_arginfo.h | 92 +++++++++++++++++++++++---- kafka_consumer_legacy_arginfo.h | 40 ++++++++---- package.xml | 1 + rdkafka.c | 26 +++++++- rdkafka.stub.php | 5 ++ rdkafka_arginfo.h | 91 +++++++++++++++++++++++--- rdkafka_legacy_arginfo.h | 31 ++++++--- tests/controller_id.phpt | 25 ++++++++ 16 files changed, 375 insertions(+), 69 deletions(-) create mode 100644 tests/controller_id.phpt diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 97365c07..7ae9ecb2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,6 +12,7 @@ jobs: tests: name: 'Tests' strategy: + fail-fast: false matrix: include: # Latest librdkafka 2.x with memcheck diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh index 4413c48a..8892f6f5 100755 --- a/.github/workflows/test/start-kafka.sh +++ b/.github/workflows/test/start-kafka.sh @@ -1,10 +1,10 @@ #!/bin/sh docker network create kafka_network -docker pull wurstmeister/zookeeper:3.4.6 -docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6 -docker pull wurstmeister/kafka:2.13-2.6.0 -docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0 +docker pull wurstmeister/zookeeper:latest +docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest +docker pull wurstmeister/kafka:latest +docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:latest printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null echo "Waiting for Kafka to be ready" diff --git a/.gitignore b/.gitignore index 1f1923f4..2c2ece86 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ modules package.xml rdkafka-*.tgz run-tests.php +gen_stub.php tests/*/*.diff tests/*/*.exp tests/*/*.log diff --git a/conf_arginfo.h b/conf_arginfo.h index 983e0e64..75d7a2da 100644 --- a/conf_arginfo.h +++ b/conf_arginfo.h @@ -1,22 +1,38 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */ + * Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, IS_ARRAY, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, 0) +#endif ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 2, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 0, 2) +#endif ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0) ZEND_ARG_TYPE_INFO(0, value, IS_STRING, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 0, 1) +#endif ZEND_ARG_OBJ_INFO(0, topic_conf, RdKafka\\TopicConf, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) ZEND_END_ARG_INFO() @@ -32,8 +48,14 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb -#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB -#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb +#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) +ZEND_END_ARG_INFO() #endif #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct @@ -42,11 +64,14 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_TopicConf_set arginfo_class_RdKafka_Conf_set +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, partitioner, IS_LONG, 0) ZEND_END_ARG_INFO() - ZEND_METHOD(RdKafka_Conf, __construct); ZEND_METHOD(RdKafka_Conf, dump); ZEND_METHOD(RdKafka_Conf, set); @@ -58,15 +83,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); - -#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); #endif - ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); - static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC) @@ -79,17 +101,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) - #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) - #endif +#endif ZEND_FE_END }; - static const zend_function_entry class_RdKafka_TopicConf_methods[] = { ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC) +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL) +#else + ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC) +#endif +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL) +#else + ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/conf_legacy_arginfo.h b/conf_legacy_arginfo.h index 6120e1cf..5601e9d6 100644 --- a/conf_legacy_arginfo.h +++ b/conf_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */ + * Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -32,8 +32,10 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb -#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB -#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb +#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1) + ZEND_ARG_INFO(0, callback) +ZEND_END_ARG_INFO() #endif #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct @@ -46,7 +48,6 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1) ZEND_ARG_INFO(0, partitioner) ZEND_END_ARG_INFO() - ZEND_METHOD(RdKafka_Conf, __construct); ZEND_METHOD(RdKafka_Conf, dump); ZEND_METHOD(RdKafka_Conf, set); @@ -58,13 +59,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); -#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); #endif ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); - static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC) @@ -77,17 +77,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) - #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) - #endif +#endif ZEND_FE_END }; - static const zend_function_entry class_RdKafka_TopicConf_methods[] = { ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC) +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL) +#else + ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC) +#endif +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL) +#else + ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/config.m4 b/config.m4 index b1f75236..17c9c32e 100644 --- a/config.m4 +++ b/config.m4 @@ -71,6 +71,16 @@ if test "$PHP_RDKAFKA" != "no"; then AC_MSG_WARN([purge is not available]) ]) + AC_CHECK_LIB($LIBNAME,[rd_kafka_controllerid],[ +#if RD_KAFKA_VERSION >= 0x010000ff + AC_DEFINE(HAS_RD_KAFKA_CONTROLLERID,1,[ ]) +#else + AC_MSG_WARN([controllerid is broken on 0.11.x]) +#endif + ],[ + AC_MSG_WARN([controllerid is not available]) + ]) + AC_CHECK_LIB($LIBNAME,[rd_kafka_init_transactions],[ AC_DEFINE(HAS_RD_KAFKA_TRANSACTIONS,1,[ ]) SOURCES="$SOURCES kafka_error_exception.c" diff --git a/kafka_consumer.c b/kafka_consumer.c index 8678343e..d48abbd2 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -542,7 +542,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, close) } /* }}} */ -/* {{{ proto Metadata RdKafka\KafkaConsumer::getMetadata(bool all_topics, RdKafka\Topic only_topic, int timeout_ms) +/* {{{ proto RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms) Request Metadata from broker */ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata) { @@ -581,6 +581,28 @@ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata) } /* }}} */ +#ifdef HAS_RD_KAFKA_CONTROLLERID +/* {{{ proto int RdKafka\KafkaConsumer::getControllerId(int $timeout_ms) + Returns the current ControllerId (controller broker id) as reported in broker metadata */ +PHP_METHOD(RdKafka_KafkaConsumer, getControllerId) +{ + object_intern *intern; + zend_long timeout; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout)); +} +/* }}} */ +#endif + /* {{{ proto RdKafka\KafkaConsumerTopic RdKafka\KafkaConsumer::newTopic(string $topic) Returns a RdKafka\KafkaConsumerTopic object */ PHP_METHOD(RdKafka_KafkaConsumer, newTopic) diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index d12ef2d4..8d21bbd6 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -56,6 +56,11 @@ public function unsubscribe(): void {} /** @tentative-return-type */ public function getMetadata(bool $all_topics, ?Topic $only_topic, int $timeout_ms): Metadata {} +#ifdef HAS_RD_KAFKA_CONTROLLERID + /** @tentative-return-type */ + public function getControllerId(int $timeout_ms): int {} +#endif + /** @tentative-return-type */ public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic {} diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index 8ff01db2..bff6420c 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -1,41 +1,69 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 47e9238c79f5508833423d31a2e09041754dbffb */ + * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0) +#endif ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null") ZEND_END_ARG_INFO() -#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN -ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0) - ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0) ZEND_END_ARG_INFO() +#endif -ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0) - ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) -ZEND_END_ARG_INFO() +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) +#define arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign arginfo_class_RdKafka_KafkaConsumer_incrementalAssign #endif +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0) +#endif ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_commit, 0, 0, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_commit, 0, 0, 0) +#endif ZEND_ARG_OBJ_TYPE_MASK(0, message_or_offsets, RdKafka\\Message, MAY_BE_ARRAY|MAY_BE_NULL, "null") ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_close, 0, 0, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_close, 0, 0, 0) +#endif ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_KafkaConsumer_commitAsync arginfo_class_RdKafka_KafkaConsumer_commit +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_consume, 0, 1, RdKafka\\Message, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_consume, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_subscribe, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_subscribe, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 0) ZEND_END_ARG_INFO() @@ -43,27 +71,57 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_KafkaConsumer_unsubscribe arginfo_class_RdKafka_KafkaConsumer_close +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getMetadata, 0, 3, RdKafka\\Metadata, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getMetadata, 0, 0, 3) +#endif ZEND_ARG_TYPE_INFO(0, all_topics, _IS_BOOL, 0) ZEND_ARG_OBJ_INFO(0, only_topic, RdKafka\\Topic, 1) ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if defined(HAS_RD_KAFKA_CONTROLLERID) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getControllerId, 0, 1, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getControllerId, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) +ZEND_END_ARG_INFO() +#endif + +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_newTopic, 0, 1, RdKafka\\KafkaConsumerTopic, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_newTopic, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, topic_name, IS_STRING, 0) ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, topic_conf, RdKafka\\TopicConf, 1, "null") ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, 0, 2, IS_ARRAY, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, 0, 0, 2) +#endif ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0) ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, 0, 1, IS_ARRAY, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets, 0, 5, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets, 0, 0, 5) +#endif ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) @@ -77,11 +135,12 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions - ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); -#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +#endif +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); #endif ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); @@ -93,6 +152,9 @@ ZEND_METHOD(RdKafka_KafkaConsumer, subscribe); ZEND_METHOD(RdKafka_KafkaConsumer, getSubscription); ZEND_METHOD(RdKafka_KafkaConsumer, unsubscribe); ZEND_METHOD(RdKafka_KafkaConsumer, getMetadata); +#if defined(HAS_RD_KAFKA_CONTROLLERID) +ZEND_METHOD(RdKafka_KafkaConsumer, getControllerId); +#endif ZEND_METHOD(RdKafka_KafkaConsumer, newTopic); ZEND_METHOD(RdKafka_KafkaConsumer, getCommittedOffsets); ZEND_METHOD(RdKafka_KafkaConsumer, getOffsetPositions); @@ -101,12 +163,13 @@ ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes); ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); - static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) -#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) +#endif +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) #endif ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) @@ -118,6 +181,9 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, getSubscription, arginfo_class_RdKafka_KafkaConsumer_getSubscription, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, unsubscribe, arginfo_class_RdKafka_KafkaConsumer_unsubscribe, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getMetadata, arginfo_class_RdKafka_KafkaConsumer_getMetadata, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_CONTROLLERID) + ZEND_ME(RdKafka_KafkaConsumer, getControllerId, arginfo_class_RdKafka_KafkaConsumer_getControllerId, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_KafkaConsumer, newTopic, arginfo_class_RdKafka_KafkaConsumer_newTopic, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getCommittedOffsets, arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getOffsetPositions, arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, ZEND_ACC_PUBLIC) @@ -133,7 +199,11 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); +#endif zval property_error_cb_default_value; ZVAL_UNDEF(&property_error_cb_default_value); diff --git a/kafka_consumer_legacy_arginfo.h b/kafka_consumer_legacy_arginfo.h index f2e83eee..559fbfef 100644 --- a/kafka_consumer_legacy_arginfo.h +++ b/kafka_consumer_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 47e9238c79f5508833423d31a2e09041754dbffb */ + * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1) ZEND_ARG_INFO(0, conf) @@ -9,14 +9,14 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0) ZEND_ARG_INFO(0, topic_partitions) ZEND_END_ARG_INFO() -#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN -ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0) - ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 1) + ZEND_ARG_INFO(0, topic_partitions) ZEND_END_ARG_INFO() +#endif -ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0) - ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) -ZEND_END_ARG_INFO() +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) +#define arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign arginfo_class_RdKafka_KafkaConsumer_incrementalAssign #endif ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0) @@ -48,6 +48,12 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getMetadata, 0, 0, 3) ZEND_ARG_INFO(0, timeout_ms) ZEND_END_ARG_INFO() +#if defined(HAS_RD_KAFKA_CONTROLLERID) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getControllerId, 0, 0, 1) + ZEND_ARG_INFO(0, timeout_ms) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_newTopic, 0, 0, 1) ZEND_ARG_INFO(0, topic_name) ZEND_ARG_INFO(0, topic_conf) @@ -76,11 +82,12 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions - ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); -#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +#endif +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); #endif ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); @@ -92,6 +99,9 @@ ZEND_METHOD(RdKafka_KafkaConsumer, subscribe); ZEND_METHOD(RdKafka_KafkaConsumer, getSubscription); ZEND_METHOD(RdKafka_KafkaConsumer, unsubscribe); ZEND_METHOD(RdKafka_KafkaConsumer, getMetadata); +#if defined(HAS_RD_KAFKA_CONTROLLERID) +ZEND_METHOD(RdKafka_KafkaConsumer, getControllerId); +#endif ZEND_METHOD(RdKafka_KafkaConsumer, newTopic); ZEND_METHOD(RdKafka_KafkaConsumer, getCommittedOffsets); ZEND_METHOD(RdKafka_KafkaConsumer, getOffsetPositions); @@ -100,12 +110,13 @@ ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes); ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); - static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) -#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) +#endif +#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) #endif ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) @@ -117,6 +128,9 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, getSubscription, arginfo_class_RdKafka_KafkaConsumer_getSubscription, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, unsubscribe, arginfo_class_RdKafka_KafkaConsumer_unsubscribe, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getMetadata, arginfo_class_RdKafka_KafkaConsumer_getMetadata, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_CONTROLLERID) + ZEND_ME(RdKafka_KafkaConsumer, getControllerId, arginfo_class_RdKafka_KafkaConsumer_getControllerId, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_KafkaConsumer, newTopic, arginfo_class_RdKafka_KafkaConsumer_newTopic, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getCommittedOffsets, arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getOffsetPositions, arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, ZEND_ACC_PUBLIC) @@ -132,7 +146,11 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); +#endif zval property_error_cb_default_value; ZVAL_NULL(&property_error_cb_default_value); diff --git a/package.xml b/package.xml index 926b8167..16957120 100644 --- a/package.xml +++ b/package.xml @@ -120,6 +120,7 @@ + diff --git a/rdkafka.c b/rdkafka.c index dca76ca6..5d9636e1 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -350,7 +350,7 @@ PHP_METHOD(RdKafka, addBrokers) } /* }}} */ -/* {{{ proto RdKafka\Metadata::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms) +/* {{{ proto RdKafka\Metadata RdKafka::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms) Request Metadata from broker */ PHP_METHOD(RdKafka, getMetadata) { @@ -388,7 +388,29 @@ PHP_METHOD(RdKafka, getMetadata) kafka_metadata_init(return_value, metadata); } /* }}} */ - + +#ifdef HAS_RD_KAFKA_CONTROLLERID +/* {{{ proto int RdKafka::getControllerId(int $timeout_ms) + Returns the current ControllerId (controller broker id) as reported in broker metadata */ +PHP_METHOD(RdKafka, getControllerId) +{ + kafka_object *intern; + zend_long timeout; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) { + return; + } + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout)); +} +/* }}} */ +#endif + /* {{{ proto void RdKafka::setLogLevel(int $level) Specifies the maximum logging level produced by internal kafka logging and debugging */ PHP_METHOD(RdKafka, setLogLevel) diff --git a/rdkafka.stub.php b/rdkafka.stub.php index be04e20c..26601ac5 100644 --- a/rdkafka.stub.php +++ b/rdkafka.stub.php @@ -20,6 +20,11 @@ public function addBrokers(string $broker_list): int {} /** @tentative-return-type */ public function getMetadata(bool $all_topics, ?RdKafka\Topic $only_topic, int $timeout_ms): RdKafka\Metadata {} +#ifdef HAS_RD_KAFKA_CONTROLLERID + /** @tentative-return-type */ + public function getControllerId(int $timeout_ms): int {} +#endif + /** @tentative-return-type */ public function getOutQLen(): int {} diff --git a/rdkafka_arginfo.h b/rdkafka_arginfo.h index 1f8e168c..220d6da3 100644 --- a/rdkafka_arginfo.h +++ b/rdkafka_arginfo.h @@ -1,52 +1,98 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: fbfdb28740208d5f909e9db261bea0aa26bfd471 */ + * Stub hash: 2d70b7756ae39db557148a4c65fc6bc5c164b102 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_addBrokers, 0, 1, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_addBrokers, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, broker_list, IS_STRING, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_getMetadata, 0, 3, RdKafka\\Metadata, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getMetadata, 0, 0, 3) +#endif ZEND_ARG_TYPE_INFO(0, all_topics, _IS_BOOL, 0) ZEND_ARG_OBJ_INFO(0, only_topic, RdKafka\\Topic, 1) ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if defined(HAS_RD_KAFKA_CONTROLLERID) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_getControllerId, 0, 1, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getControllerId, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) +ZEND_END_ARG_INFO() +#endif + +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_getOutQLen, 0, 0, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getOutQLen, 0, 0, 0) +#endif ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_metadata arginfo_class_RdKafka_getMetadata +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_setLogLevel, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_setLogLevel, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, level, IS_LONG, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_newTopic, 0, 1, RdKafka\\Topic, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_newTopic, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, topic_name, IS_STRING, 0) ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, topic_conf, RdKafka\\TopicConf, 1, "null") ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_outqLen arginfo_class_RdKafka_getOutQLen +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_poll, 0, 1, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_poll, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_flush arginfo_class_RdKafka_poll #if defined(HAS_RD_KAFKA_PURGE) +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_purge, 0, 1, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_purge, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, purge_flags, IS_LONG, 0) ZEND_END_ARG_INFO() #endif +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_setLogger, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_setLogger, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, logger, IS_LONG, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_queryWatermarkOffsets, 0, 0, 5) +#endif ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) @@ -54,12 +100,20 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_queryWat ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_offsetsForTimes, 0, 2, IS_ARRAY, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_offsetsForTimes, 0, 0, 2) +#endif ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0) ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_pausePartitions, 0, 1, IS_ARRAY, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_pausePartitions, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0) ZEND_END_ARG_INFO() @@ -69,19 +123,31 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer___construct, 0, 0, 0) ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, conf, RdKafka\\Conf, 1, "null") ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_Consumer_newQueue, 0, 0, RdKafka\\Queue, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer_newQueue, 0, 0, 0) +#endif ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Producer___construct arginfo_class_RdKafka_Consumer___construct #if defined(HAS_RD_KAFKA_TRANSACTIONS) +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 0, 1) +#endif ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() #endif #if defined(HAS_RD_KAFKA_TRANSACTIONS) +#if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, 0) +#endif ZEND_END_ARG_INFO() #endif @@ -93,10 +159,12 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions #endif - ZEND_METHOD(RdKafka, __construct); ZEND_METHOD(RdKafka, addBrokers); ZEND_METHOD(RdKafka, getMetadata); +#if defined(HAS_RD_KAFKA_CONTROLLERID) +ZEND_METHOD(RdKafka, getControllerId); +#endif ZEND_METHOD(RdKafka, getOutQLen); ZEND_METHOD(RdKafka, setLogLevel); ZEND_METHOD(RdKafka, newTopic); @@ -126,16 +194,26 @@ ZEND_METHOD(RdKafka_Producer, commitTransaction); ZEND_METHOD(RdKafka_Producer, abortTransaction); #endif - static const zend_function_entry class_RdKafka_methods[] = { ZEND_ME(RdKafka, __construct, arginfo_class_RdKafka___construct, ZEND_ACC_PRIVATE) ZEND_ME(RdKafka, addBrokers, arginfo_class_RdKafka_addBrokers, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, getMetadata, arginfo_class_RdKafka_getMetadata, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_CONTROLLERID) + ZEND_ME(RdKafka, getControllerId, arginfo_class_RdKafka_getControllerId, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka, getOutQLen, arginfo_class_RdKafka_getOutQLen, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka, metadata, getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL) +#else + ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#endif ZEND_ME(RdKafka, setLogLevel, arginfo_class_RdKafka_setLogLevel, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) ZEND_ME(RdKafka, newTopic, arginfo_class_RdKafka_newTopic, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka, outqLen, getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL) +#else + ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#endif ZEND_ME(RdKafka, poll, arginfo_class_RdKafka_poll, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, flush, arginfo_class_RdKafka_flush, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_PURGE) @@ -149,19 +227,16 @@ static const zend_function_entry class_RdKafka_methods[] = { ZEND_FE_END }; - static const zend_function_entry class_RdKafka_Exception_methods[] = { ZEND_FE_END }; - static const zend_function_entry class_RdKafka_Consumer_methods[] = { ZEND_ME(RdKafka_Consumer, __construct, arginfo_class_RdKafka_Consumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Consumer, newQueue, arginfo_class_RdKafka_Consumer_newQueue, ZEND_ACC_PUBLIC) ZEND_FE_END }; - static const zend_function_entry class_RdKafka_Producer_methods[] = { ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_TRANSACTIONS) diff --git a/rdkafka_legacy_arginfo.h b/rdkafka_legacy_arginfo.h index eb7e64b6..5fcb7263 100644 --- a/rdkafka_legacy_arginfo.h +++ b/rdkafka_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: fbfdb28740208d5f909e9db261bea0aa26bfd471 */ + * Stub hash: 2d70b7756ae39db557148a4c65fc6bc5c164b102 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -14,6 +14,12 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getMetadata, 0, 0, 3) ZEND_ARG_INFO(0, timeout_ms) ZEND_END_ARG_INFO() +#if defined(HAS_RD_KAFKA_CONTROLLERID) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getControllerId, 0, 0, 1) + ZEND_ARG_INFO(0, timeout_ms) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getOutQLen, 0, 0, 0) ZEND_END_ARG_INFO() @@ -92,10 +98,12 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions #endif - ZEND_METHOD(RdKafka, __construct); ZEND_METHOD(RdKafka, addBrokers); ZEND_METHOD(RdKafka, getMetadata); +#if defined(HAS_RD_KAFKA_CONTROLLERID) +ZEND_METHOD(RdKafka, getControllerId); +#endif ZEND_METHOD(RdKafka, getOutQLen); ZEND_METHOD(RdKafka, setLogLevel); ZEND_METHOD(RdKafka, newTopic); @@ -125,16 +133,26 @@ ZEND_METHOD(RdKafka_Producer, commitTransaction); ZEND_METHOD(RdKafka_Producer, abortTransaction); #endif - static const zend_function_entry class_RdKafka_methods[] = { ZEND_ME(RdKafka, __construct, arginfo_class_RdKafka___construct, ZEND_ACC_PRIVATE) ZEND_ME(RdKafka, addBrokers, arginfo_class_RdKafka_addBrokers, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, getMetadata, arginfo_class_RdKafka_getMetadata, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_CONTROLLERID) + ZEND_ME(RdKafka, getControllerId, arginfo_class_RdKafka_getControllerId, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka, getOutQLen, arginfo_class_RdKafka_getOutQLen, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka, metadata, getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL) +#else + ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#endif ZEND_ME(RdKafka, setLogLevel, arginfo_class_RdKafka_setLogLevel, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) ZEND_ME(RdKafka, newTopic, arginfo_class_RdKafka_newTopic, ZEND_ACC_PUBLIC) - ZEND_MALIAS(RdKafka, outqLen, getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#if (PHP_VERSION_ID >= 80400) + ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL) +#else + ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) +#endif ZEND_ME(RdKafka, poll, arginfo_class_RdKafka_poll, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, flush, arginfo_class_RdKafka_flush, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_PURGE) @@ -148,19 +166,16 @@ static const zend_function_entry class_RdKafka_methods[] = { ZEND_FE_END }; - static const zend_function_entry class_RdKafka_Exception_methods[] = { ZEND_FE_END }; - static const zend_function_entry class_RdKafka_Consumer_methods[] = { ZEND_ME(RdKafka_Consumer, __construct, arginfo_class_RdKafka_Consumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Consumer, newQueue, arginfo_class_RdKafka_Consumer_newQueue, ZEND_ACC_PUBLIC) ZEND_FE_END }; - static const zend_function_entry class_RdKafka_Producer_methods[] = { ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_TRANSACTIONS) diff --git a/tests/controller_id.phpt b/tests/controller_id.phpt new file mode 100644 index 00000000..af7e95f0 --- /dev/null +++ b/tests/controller_id.phpt @@ -0,0 +1,25 @@ +--TEST-- +Display controller id +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +echo (new RdKafka\Producer($conf))->getControllerId(10*1000) . \PHP_EOL; +echo (new RdKafka\Consumer($conf))->getControllerId(10*1000) . \PHP_EOL; + +$conf = new RdKafka\Conf(); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('group.id', 'test'); + +echo (new RdKafka\KafkaConsumer($conf))->getControllerId(10*1000) . \PHP_EOL; +--EXPECT-- +1001 +1001 +1001 \ No newline at end of file From 0ee8bbada7f73cee334c7d00b137c8760b558438 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 18 Sep 2024 17:35:23 +0200 Subject: [PATCH 04/14] Improve KafkaErrorException message (#555) --- kafka_error_exception.c | 5 ++++- tests/init_transaction_not_configured.phpt | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka_error_exception.c b/kafka_error_exception.c index 03260c4a..8c19ca0c 100644 --- a/kafka_error_exception.c +++ b/kafka_error_exception.c @@ -39,7 +39,10 @@ void create_kafka_error(zval *return_value, const rd_kafka_error_t *error) /* {{ { object_init_ex(return_value, ce_kafka_error); - zend_update_property_string(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("message"), rd_kafka_error_name(error)); + zend_string *message = zend_strpprintf(0, "%s (RD_KAFKA_RESP_ERR_%s)", rd_kafka_error_string(error), rd_kafka_error_name(error)); + zend_update_property_str(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("message"), message); + zend_string_release(message); + zend_update_property_long(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("code"), rd_kafka_error_code(error)); zend_update_property_string(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("error_string"), rd_kafka_error_string(error)); zend_update_property_bool(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("isFatal"), rd_kafka_error_is_fatal(error)); diff --git a/tests/init_transaction_not_configured.phpt b/tests/init_transaction_not_configured.phpt index 49e552f2..e0a28f3d 100644 --- a/tests/init_transaction_not_configured.phpt +++ b/tests/init_transaction_not_configured.phpt @@ -29,7 +29,7 @@ try { } --EXPECTF-- -_NOT_CONFIGURED +The Transactional API requires transactional.id to be configured (RD_KAFKA_RESP_ERR__NOT_CONFIGURED) -145 %s/tests/init_transaction_not_configured.php 14 From 4d3f4933dede6c49260ebe0e54f01bab6a88c729 Mon Sep 17 00:00:00 2001 From: Sarina Corrigan <26685104+scorgn@users.noreply.github.com> Date: Sun, 13 Oct 2024 11:33:03 -0400 Subject: [PATCH 05/14] Implement integration tests for oauth token methods * Fix seg fault in oauthbearerSetToken definition in rdkafka.c when not passing value for extension * Fix issue with oauthbearer_token_refresh not persisting (was not included in kafka_conf_callbacks_copy) * Modify start-kafka.sh to start kafka container with oauth2 configuration * Implement oauth2 integration tests --- .github/workflows/test/start-kafka.sh | 64 +++++++++-- conf.c | 1 + rdkafka.c | 4 +- rdkafka.stub.php | 4 +- tests/oauthbearer_integration.phpt | 151 ++++++++++++++++++++++++++ tests/test_env.php.sample | 4 + 6 files changed, 214 insertions(+), 14 deletions(-) create mode 100644 tests/oauthbearer_integration.phpt diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh index 4413c48a..bf2eefcc 100755 --- a/.github/workflows/test/start-kafka.sh +++ b/.github/workflows/test/start-kafka.sh @@ -1,20 +1,64 @@ #!/bin/sh docker network create kafka_network -docker pull wurstmeister/zookeeper:3.4.6 -docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6 -docker pull wurstmeister/kafka:2.13-2.6.0 -docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0 -printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null +docker pull wurstmeister/zookeeper:latest +docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest +docker pull wurstmeister/kafka:latest +docker run -d -p 9092:9092 --network kafka_network \ + -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ + -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ + -e "KAFKA_BROKER_ID=1" \ + -e "KAFKA_ADVERTISED_HOST_NAME=kafka" \ + -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \ + -e "KAFKA_ADVERTISED_PORT=9092" \ + --name kafka wurstmeister/kafka:latest -echo "Waiting for Kafka to be ready" +docker run -d -p 29092:29092 --network kafka_network \ + -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ + -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ + -e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \ + -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \ + -e "KAFKA_ADVERTISED_PORT=29092" \ + -e "KAFKA_BROKER_ID=2" \ + -e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \ + -e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \ + -e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \ + -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \ + -e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \ + --name kafka_oauth2 wurstmeister/kafka:latest + +printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null + +echo "Waiting for Kafka services to be ready" + +kakfa_ready=0 +kafka_oauth2_ready=0 for i in $(seq 1 20); do - if kafkacat -b 127.0.0.1 -L; then - echo "Kafka is ready" - exit 0 + if [ $kafka_ready -eq 0 ]; then + if kafkacat -b 127.0.0.1 -L; then + kafka_ready=1 + echo "Kafka is ready" + fi + fi + if [ $kafka_oauth2_ready -eq 0 ]; then + if kafkacat -b kafka_oauth2:29092 \ + -X security.protocol=SASL_PLAINTEXT \ + -X sasl.mechanisms=OAUTHBEARER \ + -X enable.sasl.oauthbearer.unsecure.jwt="true" \ + -X sasl.oauthbearer.config="principal=admin scope=required-scope" -L + then + kafka_oauth2_ready=1 + echo "Kafka OAuth2 is ready" + fi + fi + + if [ $kafka_ready -eq 1 ] && [ $kafka_oauth2_ready -eq 1 ]; then + exit 0 fi done -echo "Timedout waiting for Kafka to be ready" +echo "Timedout waiting for Kafka services to be ready" exit 1 diff --git a/conf.c b/conf.c index e438e5fa..9aa8132f 100644 --- a/conf.c +++ b/conf.c @@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */ { + kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh); kafka_conf_callback_copy(&to->error, from->error); kafka_conf_callback_copy(&to->rebalance, from->rebalance); kafka_conf_callback_copy(&to->dr_msg, from->dr_msg); diff --git a/rdkafka.c b/rdkafka.c index 61c28eeb..bb2e7aa4 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -445,8 +445,8 @@ PHP_METHOD(RdKafka, oauthbearerSetToken) } errstr[0] = '\0'; - - int extensions_size; + + int extensions_size = 0; char **extensions = NULL; if (extensions_hash != NULL) { diff --git a/rdkafka.stub.php b/rdkafka.stub.php index d877b5b2..8d029b72 100644 --- a/rdkafka.stub.php +++ b/rdkafka.stub.php @@ -77,10 +77,10 @@ public function resumePartitions(array $topic_partitions): array {} #ifdef HAS_RD_KAFKA_OAUTHBEARER /** @tentative-return-type */ - public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void; + public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {} /** @tentative-return-type */ - public function oauthbearerSetTokenFailure(string $error): void; + public function oauthbearerSetTokenFailure(string $error): void {} #endif } } diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt new file mode 100644 index 00000000..e10bcf40 --- /dev/null +++ b/tests/oauthbearer_integration.phpt @@ -0,0 +1,151 @@ +--TEST-- +Produce, consume, oauth +--SKIPIF-- + +--FILE-- + $principal, + 'exp' => $expirySeconds, + 'iat' => $nowSeconds - 10, + 'scope' => $scope, + ]; + + $headerJwsSegment = 'eyJhbGciOiJub25lIn0'; + $claimsJwsSegment = base64_encode(json_encode($claims)); + $claimsJwsSegment = rtrim(strtr($claimsJwsSegment, '+/', '-_'), '='); + + $jws = sprintf('%s.%s.', $headerJwsSegment, $claimsJwsSegment); + + return [ + 'value' => $jws, + 'principal' => $principal, + 'expiryMs' => $expiryMs, + ]; +} + +// Set up tests +$conf = new RdKafka\Conf(); +if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); +$conf->set('sasl.oauthbearer.config', 'principal=admin'); +$conf->setLogCb(function ($kafka, $level, $facility, $message) {}); +$conf->setErrorCb(function ($producer, $err, $errstr) { + printf("%s: %s\n", rd_kafka_err2str($err), $errstr); +}); + +// Test that refresh token with setting token accurately will succeed when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token and succeeding\n"; + $token = generateJws(); + $producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); + +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "Metadata retrieved successfully when refresh callback set token\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Caught exception when getting metadata after successfully refreshing any token\n"; +} + +// Test that refresh token with setting token failure will fail when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Setting token failure in refresh cb\n"; + $producer->oauthbearerSetTokenFailure('Token failure before getting metadata'); + $producer->poll(0); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "FAIL: Did not catch exception after not setting or refreshing any token\n"; +} catch (\RdKafka\Exception $e) { + echo "Caught exception when getting metadata after not setting or refreshing any token\n"; +} + +// Test that setting token without refreshing will get metadata successfully +$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws(); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "Got metadata successfully\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Set token but still got exception \n"; + exit; +} + +// Test that token refresh is called after token expires +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token\n"; +}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws(expiresInSeconds: 5); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$producer->poll(0); +echo "Polled with refresh\n"; +sleep(1); +$producer->poll(0); +echo "Polled without refresh\n"; +sleep(4); +$producer->poll(0); +echo "Polled with refresh\n"; + +// Test that tokens without required scope fail +$producer = new \RdKafka\Producer($conf); +$token = generateJws('not-required-scope'); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "FAIL: Exception not thrown as expected when using insufficient scope\n"; + exit; +} catch (\RdKafka\Exception $e) { + echo "Caught expected exception with insufficient_scope\n"; +} + +// Test that setting token with extensions succeeds +$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws(); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']); +$producer->poll(0); + +--EXPECT-- +Refreshing token and succeeding +Metadata retrieved successfully when refresh callback set token +Setting token failure in refresh cb +Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata +Caught exception when getting metadata after not setting or refreshing any token +Got metadata successfully +Refreshing token +Polled with refresh +Polled without refresh +Refreshing token +Polled with refresh +Caught expected exception with insufficient_scope \ No newline at end of file diff --git a/tests/test_env.php.sample b/tests/test_env.php.sample index e43476aa..0d6be87a 100644 --- a/tests/test_env.php.sample +++ b/tests/test_env.php.sample @@ -4,6 +4,10 @@ if (false === getenv('TEST_KAFKA_BROKERS')) { putenv('TEST_KAFKA_BROKERS=localhost:9092'); } +if (false === getenv('TEST_KAFKA_OAUTH_BROKERS')) { + putenv('TEST_KAFKA_OAUTH_BROKERS=kafka_oauth2:29092'); +} + if (false === getenv('TEST_KAFKA_BROKER_VERSION')) { putenv('TEST_KAFKA_BROKER_VERSION=2.3'); } From edead1c29de7760081c7c29dd610711d57d8dd48 Mon Sep 17 00:00:00 2001 From: Sarina Corrigan <26685104+scorgn@users.noreply.github.com> Date: Sun, 13 Oct 2024 11:41:15 -0400 Subject: [PATCH 06/14] Fix typo in kafka_ready --- .github/workflows/test/start-kafka.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh index bf2eefcc..b41035ff 100755 --- a/.github/workflows/test/start-kafka.sh +++ b/.github/workflows/test/start-kafka.sh @@ -33,7 +33,7 @@ printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev echo "Waiting for Kafka services to be ready" -kakfa_ready=0 +kafka_ready=0 kafka_oauth2_ready=0 for i in $(seq 1 20); do From b92336be6d7791834009194faa622290409e3977 Mon Sep 17 00:00:00 2001 From: Sarina Corrigan <26685104+scorgn@users.noreply.github.com> Date: Sun, 13 Oct 2024 19:49:03 -0400 Subject: [PATCH 07/14] Skip oauthbearer tests and setup steps if librdkafka version does not support it * Add new test env var SKIP_OAUTH based on matrix.skipoauth * Set matrix.skipoauth on all librdkafka versions below v1.1.0 * Don't set up kafka_oauth2 container if SKIP_OAUTH is 1 * Skip tests in oauthbearer_integration.phpt if RD_KAFKA_VERSION is below 0x010100 --- .github/workflows/test.yml | 12 +++++++++ .github/workflows/test/start-kafka.sh | 36 ++++++++++++++------------- tests/oauthbearer_integration.phpt | 2 +- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 97365c07..c38a9969 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -108,28 +108,39 @@ jobs: # librdkafka 1.0.1 - php: '8.1.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '8.0.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '7.4.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '7.3.0' librdkafka: 'v1.0.1' + skipoauth: '1' # librdkafka 0.11.6 - php: '8.1.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '8.0.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.4.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.3.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.2.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.1.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.0.0' librdkafka: 'v0.11.6' + skipoauth: '1' # librdkafka master (experimental, does not block PRs) - php: '8.3.0' @@ -157,6 +168,7 @@ jobs: PHP_VERSION: ${{ matrix.php }} LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }} MEMORY_CHECK: ${{ matrix.memcheck }} + SKIP_OAUTH: ${{ matrix.skipoauth }} TEST_KAFKA_BROKERS: kafka:9092 TEST_KAFKA_BROKER_VERSION: 2.6 steps: diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh index b41035ff..b8949a90 100755 --- a/.github/workflows/test/start-kafka.sh +++ b/.github/workflows/test/start-kafka.sh @@ -13,21 +13,23 @@ docker run -d -p 9092:9092 --network kafka_network \ -e "KAFKA_ADVERTISED_PORT=9092" \ --name kafka wurstmeister/kafka:latest -docker run -d -p 29092:29092 --network kafka_network \ - -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ - -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ - -e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \ - -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \ - -e "KAFKA_ADVERTISED_PORT=29092" \ - -e "KAFKA_BROKER_ID=2" \ - -e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ - -e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ - -e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \ - -e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \ - -e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \ - -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \ - -e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \ - --name kafka_oauth2 wurstmeister/kafka:latest +if [ ${SKIP_OAUTH:-0} -ne 1 ]; then + docker run -d -p 29092:29092 --network kafka_network \ + -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ + -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ + -e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \ + -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \ + -e "KAFKA_ADVERTISED_PORT=29092" \ + -e "KAFKA_BROKER_ID=2" \ + -e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \ + -e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \ + -e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \ + -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \ + -e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \ + --name kafka_oauth2 wurstmeister/kafka:latest +fi printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null @@ -43,7 +45,7 @@ for i in $(seq 1 20); do echo "Kafka is ready" fi fi - if [ $kafka_oauth2_ready -eq 0 ]; then + if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then if kafkacat -b kafka_oauth2:29092 \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanisms=OAUTHBEARER \ @@ -55,7 +57,7 @@ for i in $(seq 1 20); do fi fi - if [ $kafka_ready -eq 1 ] && [ $kafka_oauth2_ready -eq 1 ]; then + if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then exit 0 fi done diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt index e10bcf40..40b74944 100644 --- a/tests/oauthbearer_integration.phpt +++ b/tests/oauthbearer_integration.phpt @@ -3,7 +3,7 @@ Produce, consume, oauth --SKIPIF-- +RD_KAFKA_VERSION >= 0x010100 || die("skip librdkafka too old does not support oauthbearer"); --FILE-- Date: Sun, 13 Oct 2024 19:57:48 -0400 Subject: [PATCH 08/14] Ensure tests compatible with all php versions --- tests/oauthbearer_integration.phpt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt index 40b74944..221b8a07 100644 --- a/tests/oauthbearer_integration.phpt +++ b/tests/oauthbearer_integration.phpt @@ -8,7 +8,7 @@ RD_KAFKA_VERSION >= 0x010100 || die("skip librdkafka too old does not support oa setOauthbearerTokenRefreshCb(function ($producer) { echo "Refreshing token\n"; }); $producer = new \RdKafka\Producer($conf); -$token = generateJws(expiresInSeconds: 5); +$token = generateJws('required-scope', 5); $producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); $producer->poll(0); echo "Polled with refresh\n"; From 42d3110b99f8d5caa9e74c633984d87e63853a9b Mon Sep 17 00:00:00 2001 From: Sarina Corrigan <26685104+scorgn@users.noreply.github.com> Date: Sun, 13 Oct 2024 20:10:27 -0400 Subject: [PATCH 09/14] Fix RD_KAFKA_VERSION comparison --- tests/oauthbearer_integration.phpt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt index 221b8a07..b43349f3 100644 --- a/tests/oauthbearer_integration.phpt +++ b/tests/oauthbearer_integration.phpt @@ -3,7 +3,7 @@ Produce, consume, oauth --SKIPIF-- = 0x010100 || die("skip librdkafka too old does not support oauthbearer"); +RD_KAFKA_VERSION >= 0x01010000 || die("skip librdkafka too old does not support oauthbearer"); --FILE-- Date: Sun, 13 Oct 2024 20:27:58 -0400 Subject: [PATCH 10/14] Remove usage of json_encode as we don't have access to json extension --- tests/oauthbearer_integration.phpt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt index b43349f3..38812574 100644 --- a/tests/oauthbearer_integration.phpt +++ b/tests/oauthbearer_integration.phpt @@ -15,15 +15,15 @@ function generateJws($scope = 'required-scope', $expiresInSeconds = 60) $expiryMs = $expirySeconds * 1000; $principal = 'admin'; - $claims = [ - 'sub' => $principal, - 'exp' => $expirySeconds, - 'iat' => $nowSeconds - 10, - 'scope' => $scope, - ]; - + $claimsJson = sprintf( + '{"sub": "%s", "exp": %d, "iat": %d, "scope": "%s"}', + $principal, + $expirySeconds, + $nowSeconds - 10, + $scope, + ); $headerJwsSegment = 'eyJhbGciOiJub25lIn0'; - $claimsJwsSegment = base64_encode(json_encode($claims)); + $claimsJwsSegment = base64_encode($claimsJson); $claimsJwsSegment = rtrim(strtr($claimsJwsSegment, '+/', '-_'), '='); $jws = sprintf('%s.%s.', $headerJwsSegment, $claimsJwsSegment); From 28cc1ccdfa4930a51df80d4abbd9c41c0bd9892a Mon Sep 17 00:00:00 2001 From: Sarina Corrigan <26685104+scorgn@users.noreply.github.com> Date: Sun, 13 Oct 2024 21:33:22 -0400 Subject: [PATCH 11/14] Add newline to end of test file --- tests/oauthbearer_integration.phpt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt index 38812574..0b14edf8 100644 --- a/tests/oauthbearer_integration.phpt +++ b/tests/oauthbearer_integration.phpt @@ -148,4 +148,4 @@ Polled with refresh Polled without refresh Refreshing token Polled with refresh -Caught expected exception with insufficient_scope \ No newline at end of file +Caught expected exception with insufficient_scope From 2b859d6d3dcf77d502c56ff5114408d5dda6cc8a Mon Sep 17 00:00:00 2001 From: Sarina Corrigan <26685104+scorgn@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:47:51 -0400 Subject: [PATCH 12/14] Change test name to match contents --- tests/oauthbearer_integration.phpt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt index 0b14edf8..a016bfdd 100644 --- a/tests/oauthbearer_integration.phpt +++ b/tests/oauthbearer_integration.phpt @@ -1,5 +1,5 @@ --TEST-- -Produce, consume, oauth +oauthbearer, metadata --SKIPIF-- Date: Mon, 14 Oct 2024 16:51:05 -0400 Subject: [PATCH 13/14] Change test name to match contents --- tests/oauthbearer_integration.phpt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt index a016bfdd..b95de0f6 100644 --- a/tests/oauthbearer_integration.phpt +++ b/tests/oauthbearer_integration.phpt @@ -1,5 +1,5 @@ --TEST-- -oauthbearer, metadata +Oauthbearer --SKIPIF-- Date: Mon, 14 Oct 2024 16:56:17 -0400 Subject: [PATCH 14/14] Change controller_id test to use 1 instead of 1001 --- tests/controller_id.phpt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/controller_id.phpt b/tests/controller_id.phpt index af7e95f0..b9724804 100644 --- a/tests/controller_id.phpt +++ b/tests/controller_id.phpt @@ -20,6 +20,6 @@ $conf->set('group.id', 'test'); echo (new RdKafka\KafkaConsumer($conf))->getControllerId(10*1000) . \PHP_EOL; --EXPECT-- -1001 -1001 -1001 \ No newline at end of file +1 +1 +1 \ No newline at end of file