diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 97365c07..d986c660 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -12,6 +12,7 @@ jobs:
tests:
name: 'Tests'
strategy:
+ fail-fast: false
matrix:
include:
# Latest librdkafka 2.x with memcheck
@@ -108,28 +109,39 @@ jobs:
# librdkafka 1.0.1
- php: '8.1.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
# librdkafka 0.11.6
- php: '8.1.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.2.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.1.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.0.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
# librdkafka master (experimental, does not block PRs)
- php: '8.3.0'
@@ -157,6 +169,7 @@ jobs:
PHP_VERSION: ${{ matrix.php }}
LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }}
MEMORY_CHECK: ${{ matrix.memcheck }}
+ SKIP_OAUTH: ${{ matrix.skipoauth }}
TEST_KAFKA_BROKERS: kafka:9092
TEST_KAFKA_BROKER_VERSION: 2.6
steps:
diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh
index 4413c48a..b8949a90 100755
--- a/.github/workflows/test/start-kafka.sh
+++ b/.github/workflows/test/start-kafka.sh
@@ -1,20 +1,66 @@
#!/bin/sh
docker network create kafka_network
-docker pull wurstmeister/zookeeper:3.4.6
-docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
-docker pull wurstmeister/kafka:2.13-2.6.0
-docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0
-printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null
+docker pull wurstmeister/zookeeper:latest
+docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
+docker pull wurstmeister/kafka:latest
+docker run -d -p 9092:9092 --network kafka_network \
+ -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
+ -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
+ -e "KAFKA_BROKER_ID=1" \
+ -e "KAFKA_ADVERTISED_HOST_NAME=kafka" \
+ -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \
+ -e "KAFKA_ADVERTISED_PORT=9092" \
+ --name kafka wurstmeister/kafka:latest
-echo "Waiting for Kafka to be ready"
+if [ ${SKIP_OAUTH:-0} -ne 1 ]; then
+ docker run -d -p 29092:29092 --network kafka_network \
+ -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
+ -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
+ -e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \
+ -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \
+ -e "KAFKA_ADVERTISED_PORT=29092" \
+ -e "KAFKA_BROKER_ID=2" \
+ -e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
+ -e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
+ -e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \
+ -e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \
+ -e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \
+ -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \
+ -e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \
+ --name kafka_oauth2 wurstmeister/kafka:latest
+fi
+
+printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null
+
+echo "Waiting for Kafka services to be ready"
+
+kafka_ready=0
+kafka_oauth2_ready=0
for i in $(seq 1 20); do
- if kafkacat -b 127.0.0.1 -L; then
- echo "Kafka is ready"
- exit 0
+ if [ $kafka_ready -eq 0 ]; then
+ if kafkacat -b 127.0.0.1 -L; then
+ kafka_ready=1
+ echo "Kafka is ready"
+ fi
+ fi
+ if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then
+ if kafkacat -b kafka_oauth2:29092 \
+ -X security.protocol=SASL_PLAINTEXT \
+ -X sasl.mechanisms=OAUTHBEARER \
+ -X enable.sasl.oauthbearer.unsecure.jwt="true" \
+ -X sasl.oauthbearer.config="principal=admin scope=required-scope" -L
+ then
+ kafka_oauth2_ready=1
+ echo "Kafka OAuth2 is ready"
+ fi
+ fi
+
+ if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then
+ exit 0
fi
done
-echo "Timedout waiting for Kafka to be ready"
+echo "Timedout waiting for Kafka services to be ready"
exit 1
diff --git a/.gitignore b/.gitignore
index 1f1923f4..2c2ece86 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,6 +33,7 @@ modules
package.xml
rdkafka-*.tgz
run-tests.php
+gen_stub.php
tests/*/*.diff
tests/*/*.exp
tests/*/*.log
diff --git a/README.md b/README.md
index e8732207..8a659bd7 100644
--- a/README.md
+++ b/README.md
@@ -14,31 +14,6 @@ The high level and low level *consumers*, *producer*, and *metadata* APIs are su
Documentation is available [here](https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html).
-## Sponsors
-
-
-
-
-
-
-
-
-
-Upstash: Serverless Kafka
-
-
- - True Serverless Kafka with per-request-pricing
- - Managed Apache Kafka, works with all Kafka clients
- - Built-in REST API designed for serverless and edge functions
-
-
-[Start for free in 30 seconds!](https://upstash.com/?utm_source=php-rdkafka)
- |
-
-
-
-php-rdkafka supports Ukraine. Proceeds from our generous sponsors are currently donated to the [Support Ukraine collective](https://opencollective.com/support-ukraine).
-
## Table of Contents
1. [Installation](#installation)
diff --git a/conf.c b/conf.c
index e438e5fa..9aa8132f 100644
--- a/conf.c
+++ b/conf.c
@@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */
{
+ kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh);
kafka_conf_callback_copy(&to->error, from->error);
kafka_conf_callback_copy(&to->rebalance, from->rebalance);
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg);
diff --git a/conf_arginfo.h b/conf_arginfo.h
index 7c62dd4e..d179aee1 100644
--- a/conf_arginfo.h
+++ b/conf_arginfo.h
@@ -1,22 +1,38 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
+ * Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, IS_ARRAY, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, 0)
+#endif
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 2, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 0, 2)
+#endif
ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, value, IS_STRING, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 0, 1)
+#endif
ZEND_ARG_OBJ_INFO(0, topic_conf, RdKafka\\TopicConf, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
@@ -32,8 +48,14 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
-#ifdef HAS_RD_KAFKA_OAUTHBEARER
-#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+#if (PHP_VERSION_ID >= 80100)
+ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
+#endif
+ ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
+ZEND_END_ARG_INFO()
#endif
#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
@@ -42,11 +64,14 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_TopicConf_set arginfo_class_RdKafka_Conf_set
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, partitioner, IS_LONG, 0)
ZEND_END_ARG_INFO()
-
ZEND_METHOD(RdKafka_Conf, __construct);
ZEND_METHOD(RdKafka_Conf, dump);
ZEND_METHOD(RdKafka_Conf, set);
@@ -58,15 +83,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
-
-#ifdef HAS_RD_KAFKA_OAUTHBEARER
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
-
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);
-
static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
@@ -79,17 +101,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
- #ifdef HAS_RD_KAFKA_OAUTHBEARER
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
- #endif
+#endif
ZEND_FE_END
};
-
static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
+#endif
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
@@ -99,7 +128,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
@@ -109,7 +142,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
diff --git a/conf_legacy_arginfo.h b/conf_legacy_arginfo.h
index 74c08530..72305d3b 100644
--- a/conf_legacy_arginfo.h
+++ b/conf_legacy_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
+ * Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -32,8 +32,10 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
-#ifdef HAS_RD_KAFKA_OAUTHBEARER
-#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
+ ZEND_ARG_INFO(0, callback)
+ZEND_END_ARG_INFO()
#endif
#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
@@ -46,7 +48,6 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
ZEND_ARG_INFO(0, partitioner)
ZEND_END_ARG_INFO()
-
ZEND_METHOD(RdKafka_Conf, __construct);
ZEND_METHOD(RdKafka_Conf, dump);
ZEND_METHOD(RdKafka_Conf, set);
@@ -58,13 +59,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
-#ifdef HAS_RD_KAFKA_OAUTHBEARER
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);
-
static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
@@ -77,17 +77,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
- #ifdef HAS_RD_KAFKA_OAUTHBEARER
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
- #endif
+#endif
ZEND_FE_END
};
-
static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
+#endif
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
@@ -97,7 +104,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
@@ -107,7 +118,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
diff --git a/config.m4 b/config.m4
index d1e065b7..356a1df3 100644
--- a/config.m4
+++ b/config.m4
@@ -19,7 +19,7 @@ if test "$PHP_RDKAFKA" != "no"; then
fi
done
fi
-
+
if test -z "$RDKAFKA_DIR"; then
AC_MSG_RESULT([not found])
AC_MSG_ERROR([Please reinstall the rdkafka distribution])
@@ -71,6 +71,16 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([purge is not available])
])
+ AC_CHECK_LIB($LIBNAME,[rd_kafka_controllerid],[
+#if RD_KAFKA_VERSION >= 0x010000ff
+ AC_DEFINE(HAS_RD_KAFKA_CONTROLLERID,1,[ ])
+#else
+ AC_MSG_WARN([controllerid is broken on 0.11.x])
+#endif
+ ],[
+ AC_MSG_WARN([controllerid is not available])
+ ])
+
AC_CHECK_LIB($LIBNAME,[rd_kafka_init_transactions],[
AC_DEFINE(HAS_RD_KAFKA_TRANSACTIONS,1,[ ])
SOURCES="$SOURCES kafka_error_exception.c"
@@ -90,6 +100,12 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([oauthbearer support is not available])
])
+ AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[
+ AC_DEFINE(HAS_RD_KAFKA_INCREMENTAL_ASSIGN,1,[ ])
+ ],[
+ AC_MSG_WARN([no rd_kafka_incremental_(un)assign, incremental rebalance support will not be available])
+ ])
+
LDFLAGS="$ORIG_LDFLAGS"
CPPFLAGS="$ORIG_CPPFLAGS"
diff --git a/kafka_consumer.c b/kafka_consumer.c
index f00a20d5..d48abbd2 100644
--- a/kafka_consumer.c
+++ b/kafka_consumer.c
@@ -212,6 +212,59 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign)
}
/* }}} */
+#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
+static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
+{
+ HashTable *htopars = NULL;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) {
+ return;
+ }
+
+ object_intern *intern = get_object(getThis());
+ if (!intern) {
+ return;
+ }
+
+ rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars);
+ if (!topics) {
+ return;
+ }
+
+ rd_kafka_error_t *err;
+
+ if (assign) {
+ err = rd_kafka_incremental_assign(intern->rk, topics);
+ } else {
+ err = rd_kafka_incremental_unassign(intern->rk, topics);
+ }
+
+ rd_kafka_topic_partition_list_destroy(topics);
+
+ if (err) {
+ zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0);
+ rd_kafka_error_destroy(err);
+ }
+}
+/* }}} */
+
+/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics)
+ Incremental assignment of partitions to consume */
+PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign)
+{
+ consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU);
+}
+/* }}} */
+
+/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics)
+ Incremental unassign of partitions to consume */
+PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign)
+{
+ consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU);
+}
+/* }}} */
+#endif // !HAS_RD_KAFKA_INCREMENTAL_ASSIGN
+
/* {{{ proto array RdKafka\KafkaConsumer::getAssignment()
Returns the current partition getAssignment */
PHP_METHOD(RdKafka_KafkaConsumer, getAssignment)
@@ -489,7 +542,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, close)
}
/* }}} */
-/* {{{ proto Metadata RdKafka\KafkaConsumer::getMetadata(bool all_topics, RdKafka\Topic only_topic, int timeout_ms)
+/* {{{ proto RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
{
@@ -528,6 +581,28 @@ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
}
/* }}} */
+#ifdef HAS_RD_KAFKA_CONTROLLERID
+/* {{{ proto int RdKafka\KafkaConsumer::getControllerId(int $timeout_ms)
+ Returns the current ControllerId (controller broker id) as reported in broker metadata */
+PHP_METHOD(RdKafka_KafkaConsumer, getControllerId)
+{
+ object_intern *intern;
+ zend_long timeout;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
+ return;
+ }
+
+ intern = get_object(getThis());
+ if (!intern) {
+ return;
+ }
+
+ RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout));
+}
+/* }}} */
+#endif
+
/* {{{ proto RdKafka\KafkaConsumerTopic RdKafka\KafkaConsumer::newTopic(string $topic)
Returns a RdKafka\KafkaConsumerTopic object */
PHP_METHOD(RdKafka_KafkaConsumer, newTopic)
diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php
index 023b0a46..8d21bbd6 100644
--- a/kafka_consumer.stub.php
+++ b/kafka_consumer.stub.php
@@ -21,6 +21,14 @@ public function __construct(Conf $conf) {}
/** @tentative-return-type */
public function assign(?array $topic_partitions = null): void {}
+#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
+ /** @tentative-return-type */
+ public function incrementalAssign(array $topic_partitions): void {}
+
+ /** @tentative-return-type */
+ public function incrementalUnassign(array $topic_partitions): void {}
+#endif
+
/** @tentative-return-type */
public function getAssignment(): array {}
@@ -48,6 +56,11 @@ public function unsubscribe(): void {}
/** @tentative-return-type */
public function getMetadata(bool $all_topics, ?Topic $only_topic, int $timeout_ms): Metadata {}
+#ifdef HAS_RD_KAFKA_CONTROLLERID
+ /** @tentative-return-type */
+ public function getControllerId(int $timeout_ms): int {}
+#endif
+
/** @tentative-return-type */
public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic {}
diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h
index 4ba236ba..bff6420c 100644
--- a/kafka_consumer_arginfo.h
+++ b/kafka_consumer_arginfo.h
@@ -1,31 +1,69 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 47e9238c79f5508833423d31a2e09041754dbffb */
+ * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0)
+#endif
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null")
ZEND_END_ARG_INFO()
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+#if (PHP_VERSION_ID >= 80100)
+ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 1)
+#endif
+ ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0)
+ZEND_END_ARG_INFO()
+#endif
+
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+#define arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign arginfo_class_RdKafka_KafkaConsumer_incrementalAssign
+#endif
+
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0)
+#endif
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_commit, 0, 0, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_commit, 0, 0, 0)
+#endif
ZEND_ARG_OBJ_TYPE_MASK(0, message_or_offsets, RdKafka\\Message, MAY_BE_ARRAY|MAY_BE_NULL, "null")
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_close, 0, 0, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_close, 0, 0, 0)
+#endif
ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_KafkaConsumer_commitAsync arginfo_class_RdKafka_KafkaConsumer_commit
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_consume, 0, 1, RdKafka\\Message, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_consume, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_subscribe, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_subscribe, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 0)
ZEND_END_ARG_INFO()
@@ -33,27 +71,57 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_KafkaConsumer_unsubscribe arginfo_class_RdKafka_KafkaConsumer_close
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getMetadata, 0, 3, RdKafka\\Metadata, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getMetadata, 0, 0, 3)
+#endif
ZEND_ARG_TYPE_INFO(0, all_topics, _IS_BOOL, 0)
ZEND_ARG_OBJ_INFO(0, only_topic, RdKafka\\Topic, 1)
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+#if (PHP_VERSION_ID >= 80100)
+ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getControllerId, 0, 1, IS_LONG, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getControllerId, 0, 0, 1)
+#endif
+ ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
+ZEND_END_ARG_INFO()
+#endif
+
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_newTopic, 0, 1, RdKafka\\KafkaConsumerTopic, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_newTopic, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, topic_name, IS_STRING, 0)
ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, topic_conf, RdKafka\\TopicConf, 1, "null")
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, 0, 2, IS_ARRAY, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, 0, 0, 2)
+#endif
ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0)
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, 0, 1, IS_ARRAY, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets, 0, 0, 5)
+#endif
ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0)
@@ -67,9 +135,14 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions
-
ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
+#endif
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
+#endif
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
@@ -79,6 +152,9 @@ ZEND_METHOD(RdKafka_KafkaConsumer, subscribe);
ZEND_METHOD(RdKafka_KafkaConsumer, getSubscription);
ZEND_METHOD(RdKafka_KafkaConsumer, unsubscribe);
ZEND_METHOD(RdKafka_KafkaConsumer, getMetadata);
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ZEND_METHOD(RdKafka_KafkaConsumer, getControllerId);
+#endif
ZEND_METHOD(RdKafka_KafkaConsumer, newTopic);
ZEND_METHOD(RdKafka_KafkaConsumer, getCommittedOffsets);
ZEND_METHOD(RdKafka_KafkaConsumer, getOffsetPositions);
@@ -87,10 +163,15 @@ ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
-
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
+#endif
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
@@ -100,6 +181,9 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, getSubscription, arginfo_class_RdKafka_KafkaConsumer_getSubscription, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, unsubscribe, arginfo_class_RdKafka_KafkaConsumer_unsubscribe, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getMetadata, arginfo_class_RdKafka_KafkaConsumer_getMetadata, ZEND_ACC_PUBLIC)
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ ZEND_ME(RdKafka_KafkaConsumer, getControllerId, arginfo_class_RdKafka_KafkaConsumer_getControllerId, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka_KafkaConsumer, newTopic, arginfo_class_RdKafka_KafkaConsumer_newTopic, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getCommittedOffsets, arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getOffsetPositions, arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, ZEND_ACC_PUBLIC)
@@ -115,7 +199,11 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
zval property_error_cb_default_value;
ZVAL_UNDEF(&property_error_cb_default_value);
diff --git a/kafka_consumer_legacy_arginfo.h b/kafka_consumer_legacy_arginfo.h
index 6069e1ba..559fbfef 100644
--- a/kafka_consumer_legacy_arginfo.h
+++ b/kafka_consumer_legacy_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 47e9238c79f5508833423d31a2e09041754dbffb */
+ * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
ZEND_ARG_INFO(0, conf)
@@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 1)
+ ZEND_ARG_INFO(0, topic_partitions)
+ZEND_END_ARG_INFO()
+#endif
+
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+#define arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign arginfo_class_RdKafka_KafkaConsumer_incrementalAssign
+#endif
+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -38,6 +48,12 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getMetadata, 0, 0, 3)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getControllerId, 0, 0, 1)
+ ZEND_ARG_INFO(0, timeout_ms)
+ZEND_END_ARG_INFO()
+#endif
+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_newTopic, 0, 0, 1)
ZEND_ARG_INFO(0, topic_name)
ZEND_ARG_INFO(0, topic_conf)
@@ -66,9 +82,14 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions
-
ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
+#endif
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
+#endif
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
@@ -78,6 +99,9 @@ ZEND_METHOD(RdKafka_KafkaConsumer, subscribe);
ZEND_METHOD(RdKafka_KafkaConsumer, getSubscription);
ZEND_METHOD(RdKafka_KafkaConsumer, unsubscribe);
ZEND_METHOD(RdKafka_KafkaConsumer, getMetadata);
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ZEND_METHOD(RdKafka_KafkaConsumer, getControllerId);
+#endif
ZEND_METHOD(RdKafka_KafkaConsumer, newTopic);
ZEND_METHOD(RdKafka_KafkaConsumer, getCommittedOffsets);
ZEND_METHOD(RdKafka_KafkaConsumer, getOffsetPositions);
@@ -86,10 +110,15 @@ ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
-
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
+#endif
+#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
+ ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
@@ -99,6 +128,9 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, getSubscription, arginfo_class_RdKafka_KafkaConsumer_getSubscription, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, unsubscribe, arginfo_class_RdKafka_KafkaConsumer_unsubscribe, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getMetadata, arginfo_class_RdKafka_KafkaConsumer_getMetadata, ZEND_ACC_PUBLIC)
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ ZEND_ME(RdKafka_KafkaConsumer, getControllerId, arginfo_class_RdKafka_KafkaConsumer_getControllerId, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka_KafkaConsumer, newTopic, arginfo_class_RdKafka_KafkaConsumer_newTopic, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getCommittedOffsets, arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getOffsetPositions, arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions, ZEND_ACC_PUBLIC)
@@ -114,7 +146,11 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
zval property_error_cb_default_value;
ZVAL_NULL(&property_error_cb_default_value);
diff --git a/kafka_error_exception.c b/kafka_error_exception.c
index 03260c4a..8c19ca0c 100644
--- a/kafka_error_exception.c
+++ b/kafka_error_exception.c
@@ -39,7 +39,10 @@ void create_kafka_error(zval *return_value, const rd_kafka_error_t *error) /* {{
{
object_init_ex(return_value, ce_kafka_error);
- zend_update_property_string(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("message"), rd_kafka_error_name(error));
+ zend_string *message = zend_strpprintf(0, "%s (RD_KAFKA_RESP_ERR_%s)", rd_kafka_error_string(error), rd_kafka_error_name(error));
+ zend_update_property_str(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("message"), message);
+ zend_string_release(message);
+
zend_update_property_long(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("code"), rd_kafka_error_code(error));
zend_update_property_string(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("error_string"), rd_kafka_error_string(error));
zend_update_property_bool(ce_kafka_error, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("isFatal"), rd_kafka_error_is_fatal(error));
diff --git a/package.xml b/package.xml
index 926b8167..16957120 100644
--- a/package.xml
+++ b/package.xml
@@ -120,6 +120,7 @@
+
diff --git a/rdkafka.c b/rdkafka.c
index 61c28eeb..4029409a 100644
--- a/rdkafka.c
+++ b/rdkafka.c
@@ -350,7 +350,7 @@ PHP_METHOD(RdKafka, addBrokers)
}
/* }}} */
-/* {{{ proto RdKafka\Metadata::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
+/* {{{ proto RdKafka\Metadata RdKafka::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
PHP_METHOD(RdKafka, getMetadata)
{
@@ -388,7 +388,29 @@ PHP_METHOD(RdKafka, getMetadata)
kafka_metadata_init(return_value, metadata);
}
/* }}} */
-
+
+#ifdef HAS_RD_KAFKA_CONTROLLERID
+/* {{{ proto int RdKafka::getControllerId(int $timeout_ms)
+ Returns the current ControllerId (controller broker id) as reported in broker metadata */
+PHP_METHOD(RdKafka, getControllerId)
+{
+ kafka_object *intern;
+ zend_long timeout;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
+ return;
+ }
+
+ intern = get_kafka_object(getThis());
+ if (!intern) {
+ return;
+ }
+
+ RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout));
+}
+/* }}} */
+#endif
+
/* {{{ proto void RdKafka::setLogLevel(int $level)
Specifies the maximum logging level produced by internal kafka logging and debugging */
PHP_METHOD(RdKafka, setLogLevel)
@@ -445,8 +467,8 @@ PHP_METHOD(RdKafka, oauthbearerSetToken)
}
errstr[0] = '\0';
-
- int extensions_size;
+
+ int extensions_size = 0;
char **extensions = NULL;
if (extensions_hash != NULL) {
diff --git a/rdkafka.stub.php b/rdkafka.stub.php
index d877b5b2..3d99ef22 100644
--- a/rdkafka.stub.php
+++ b/rdkafka.stub.php
@@ -20,6 +20,11 @@ public function addBrokers(string $broker_list): int {}
/** @tentative-return-type */
public function getMetadata(bool $all_topics, ?RdKafka\Topic $only_topic, int $timeout_ms): RdKafka\Metadata {}
+#ifdef HAS_RD_KAFKA_CONTROLLERID
+ /** @tentative-return-type */
+ public function getControllerId(int $timeout_ms): int {}
+#endif
+
/** @tentative-return-type */
public function getOutQLen(): int {}
@@ -77,10 +82,10 @@ public function resumePartitions(array $topic_partitions): array {}
#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
- public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void;
+ public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}
/** @tentative-return-type */
- public function oauthbearerSetTokenFailure(string $error): void;
+ public function oauthbearerSetTokenFailure(string $error): void {}
#endif
}
}
diff --git a/rdkafka_arginfo.h b/rdkafka_arginfo.h
index d416fbdf..0e4fe52e 100644
--- a/rdkafka_arginfo.h
+++ b/rdkafka_arginfo.h
@@ -1,52 +1,98 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: e469d659a320102b1d64c048e9457bb9b64d8e5d */
+ * Stub hash: ea957a110b42c19bcb4a244655c1eaf99a1e3961 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_addBrokers, 0, 1, IS_LONG, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_addBrokers, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, broker_list, IS_STRING, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_getMetadata, 0, 3, RdKafka\\Metadata, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getMetadata, 0, 0, 3)
+#endif
ZEND_ARG_TYPE_INFO(0, all_topics, _IS_BOOL, 0)
ZEND_ARG_OBJ_INFO(0, only_topic, RdKafka\\Topic, 1)
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+#if (PHP_VERSION_ID >= 80100)
+ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_getControllerId, 0, 1, IS_LONG, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getControllerId, 0, 0, 1)
+#endif
+ ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
+ZEND_END_ARG_INFO()
+#endif
+
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_getOutQLen, 0, 0, IS_LONG, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getOutQLen, 0, 0, 0)
+#endif
ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_metadata arginfo_class_RdKafka_getMetadata
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_setLogLevel, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_setLogLevel, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, level, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_newTopic, 0, 1, RdKafka\\Topic, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_newTopic, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, topic_name, IS_STRING, 0)
ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, topic_conf, RdKafka\\TopicConf, 1, "null")
ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_outqLen arginfo_class_RdKafka_getOutQLen
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_poll, 0, 1, IS_LONG, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_poll, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_flush arginfo_class_RdKafka_poll
#if defined(HAS_RD_KAFKA_PURGE)
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_purge, 0, 1, IS_LONG, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_purge, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, purge_flags, IS_LONG, 0)
ZEND_END_ARG_INFO()
#endif
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_setLogger, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_setLogger, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, logger, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_queryWatermarkOffsets, 0, 0, 5)
+#endif
ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0)
@@ -54,28 +100,42 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_queryWat
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_offsetsForTimes, 0, 2, IS_ARRAY, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_offsetsForTimes, 0, 0, 2)
+#endif
ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0)
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_pausePartitions, 0, 1, IS_ARRAY, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_pausePartitions, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, topic_partitions, IS_ARRAY, 0)
ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_resumePartitions arginfo_class_RdKafka_pausePartitions
-
+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 3, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 0, 3)
+#endif
ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
ZEND_END_ARG_INFO()
#endif
@@ -84,34 +144,42 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer___construct, 0, 0, 0)
ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, conf, RdKafka\\Conf, 1, "null")
ZEND_END_ARG_INFO()
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_OBJ_INFO_EX(arginfo_class_RdKafka_Consumer_newQueue, 0, 0, RdKafka\\Queue, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer_newQueue, 0, 0, 0)
+#endif
ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_Producer___construct arginfo_class_RdKafka_Consumer___construct
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 0, 1)
+#endif
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
+#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, IS_VOID, 0)
-ZEND_END_ARG_INFO()
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, 0)
#endif
+ZEND_END_ARG_INFO()
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_commitTransaction arginfo_class_RdKafka_Producer_initTransactions
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions
#endif
-
ZEND_METHOD(RdKafka, __construct);
ZEND_METHOD(RdKafka, addBrokers);
ZEND_METHOD(RdKafka, getMetadata);
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ZEND_METHOD(RdKafka, getControllerId);
+#endif
ZEND_METHOD(RdKafka, getOutQLen);
ZEND_METHOD(RdKafka, setLogLevel);
ZEND_METHOD(RdKafka, newTopic);
@@ -127,8 +195,6 @@ ZEND_METHOD(RdKafka, pausePartitions);
ZEND_METHOD(RdKafka, resumePartitions);
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka, oauthbearerSetToken);
-#endif
-#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka, oauthbearerSetTokenFailure);
#endif
ZEND_METHOD(RdKafka_Consumer, __construct);
@@ -136,27 +202,31 @@ ZEND_METHOD(RdKafka_Consumer, newQueue);
ZEND_METHOD(RdKafka_Producer, __construct);
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, initTransactions);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, beginTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, commitTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, abortTransaction);
#endif
-
static const zend_function_entry class_RdKafka_methods[] = {
ZEND_ME(RdKafka, __construct, arginfo_class_RdKafka___construct, ZEND_ACC_PRIVATE)
ZEND_ME(RdKafka, addBrokers, arginfo_class_RdKafka_addBrokers, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, getMetadata, arginfo_class_RdKafka_getMetadata, ZEND_ACC_PUBLIC)
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ ZEND_ME(RdKafka, getControllerId, arginfo_class_RdKafka_getControllerId, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka, getOutQLen, arginfo_class_RdKafka_getOutQLen, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka, metadata, getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#endif
ZEND_ME(RdKafka, setLogLevel, arginfo_class_RdKafka_setLogLevel, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
ZEND_ME(RdKafka, newTopic, arginfo_class_RdKafka_newTopic, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka, outqLen, getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#endif
ZEND_ME(RdKafka, poll, arginfo_class_RdKafka_poll, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, flush, arginfo_class_RdKafka_flush, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_PURGE)
@@ -169,38 +239,23 @@ static const zend_function_entry class_RdKafka_methods[] = {
ZEND_ME(RdKafka, resumePartitions, arginfo_class_RdKafka_resumePartitions, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka, oauthbearerSetToken, arginfo_class_RdKafka_oauthbearerSetToken, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka, oauthbearerSetTokenFailure, arginfo_class_RdKafka_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};
-
-static const zend_function_entry class_RdKafka_Exception_methods[] = {
- ZEND_FE_END
-};
-
-
static const zend_function_entry class_RdKafka_Consumer_methods[] = {
ZEND_ME(RdKafka_Consumer, __construct, arginfo_class_RdKafka_Consumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Consumer, newQueue, arginfo_class_RdKafka_Consumer_newQueue, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
-
static const zend_function_entry class_RdKafka_Producer_methods[] = {
ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, initTransactions, arginfo_class_RdKafka_Producer_initTransactions, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, beginTransaction, arginfo_class_RdKafka_Producer_beginTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, commitTransaction, arginfo_class_RdKafka_Producer_commitTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, abortTransaction, arginfo_class_RdKafka_Producer_abortTransaction, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
@@ -211,8 +266,12 @@ static zend_class_entry *register_class_RdKafka(void)
zend_class_entry ce, *class_entry;
INIT_CLASS_ENTRY(ce, "RdKafka", class_RdKafka_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, ZEND_ACC_ABSTRACT);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
class_entry->ce_flags |= ZEND_ACC_ABSTRACT;
+#endif
zval property_error_cb_default_value;
ZVAL_UNDEF(&property_error_cb_default_value);
@@ -233,8 +292,12 @@ static zend_class_entry *register_class_RdKafka_Exception(zend_class_entry *clas
{
zend_class_entry ce, *class_entry;
- INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", class_RdKafka_Exception_methods);
+ INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", NULL);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_Exception, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_Exception);
+#endif
return class_entry;
}
@@ -244,7 +307,11 @@ static zend_class_entry *register_class_RdKafka_Consumer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", class_RdKafka_Consumer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
@@ -254,7 +321,11 @@ static zend_class_entry *register_class_RdKafka_Producer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", class_RdKafka_Producer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
diff --git a/rdkafka_legacy_arginfo.h b/rdkafka_legacy_arginfo.h
index 070e2c6f..49f594b4 100644
--- a/rdkafka_legacy_arginfo.h
+++ b/rdkafka_legacy_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: e469d659a320102b1d64c048e9457bb9b64d8e5d */
+ * Stub hash: ea957a110b42c19bcb4a244655c1eaf99a1e3961 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -14,6 +14,12 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getMetadata, 0, 0, 3)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getControllerId, 0, 0, 1)
+ ZEND_ARG_INFO(0, timeout_ms)
+ZEND_END_ARG_INFO()
+#endif
+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_getOutQLen, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -72,9 +78,7 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 0, 3)
ZEND_ARG_INFO(0, principal_name)
ZEND_ARG_INFO(0, extensions)
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 0, 1)
ZEND_ARG_INFO(0, error)
ZEND_END_ARG_INFO()
@@ -92,25 +96,21 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, 0)
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_commitTransaction arginfo_class_RdKafka_Producer_initTransactions
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions
#endif
-
ZEND_METHOD(RdKafka, __construct);
ZEND_METHOD(RdKafka, addBrokers);
ZEND_METHOD(RdKafka, getMetadata);
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ZEND_METHOD(RdKafka, getControllerId);
+#endif
ZEND_METHOD(RdKafka, getOutQLen);
ZEND_METHOD(RdKafka, setLogLevel);
ZEND_METHOD(RdKafka, newTopic);
@@ -126,8 +126,6 @@ ZEND_METHOD(RdKafka, pausePartitions);
ZEND_METHOD(RdKafka, resumePartitions);
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka, oauthbearerSetToken);
-#endif
-#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka, oauthbearerSetTokenFailure);
#endif
ZEND_METHOD(RdKafka_Consumer, __construct);
@@ -135,27 +133,31 @@ ZEND_METHOD(RdKafka_Consumer, newQueue);
ZEND_METHOD(RdKafka_Producer, __construct);
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, initTransactions);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, beginTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, commitTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, abortTransaction);
#endif
-
static const zend_function_entry class_RdKafka_methods[] = {
ZEND_ME(RdKafka, __construct, arginfo_class_RdKafka___construct, ZEND_ACC_PRIVATE)
ZEND_ME(RdKafka, addBrokers, arginfo_class_RdKafka_addBrokers, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, getMetadata, arginfo_class_RdKafka_getMetadata, ZEND_ACC_PUBLIC)
+#if defined(HAS_RD_KAFKA_CONTROLLERID)
+ ZEND_ME(RdKafka, getControllerId, arginfo_class_RdKafka_getControllerId, ZEND_ACC_PUBLIC)
+#endif
ZEND_ME(RdKafka, getOutQLen, arginfo_class_RdKafka_getOutQLen, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka, metadata, getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("metadata", zim_RdKafka_getMetadata, arginfo_class_RdKafka_metadata, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#endif
ZEND_ME(RdKafka, setLogLevel, arginfo_class_RdKafka_setLogLevel, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
ZEND_ME(RdKafka, newTopic, arginfo_class_RdKafka_newTopic, ZEND_ACC_PUBLIC)
- ZEND_MALIAS(RdKafka, outqLen, getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#if (PHP_VERSION_ID >= 80400)
+ ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED, NULL, NULL)
+#else
+ ZEND_RAW_FENTRY("outqLen", zim_RdKafka_getOutQLen, arginfo_class_RdKafka_outqLen, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED)
+#endif
ZEND_ME(RdKafka, poll, arginfo_class_RdKafka_poll, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, flush, arginfo_class_RdKafka_flush, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_PURGE)
@@ -168,38 +170,23 @@ static const zend_function_entry class_RdKafka_methods[] = {
ZEND_ME(RdKafka, resumePartitions, arginfo_class_RdKafka_resumePartitions, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka, oauthbearerSetToken, arginfo_class_RdKafka_oauthbearerSetToken, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka, oauthbearerSetTokenFailure, arginfo_class_RdKafka_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};
-
-static const zend_function_entry class_RdKafka_Exception_methods[] = {
- ZEND_FE_END
-};
-
-
static const zend_function_entry class_RdKafka_Consumer_methods[] = {
ZEND_ME(RdKafka_Consumer, __construct, arginfo_class_RdKafka_Consumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Consumer, newQueue, arginfo_class_RdKafka_Consumer_newQueue, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
-
static const zend_function_entry class_RdKafka_Producer_methods[] = {
ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, initTransactions, arginfo_class_RdKafka_Producer_initTransactions, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, beginTransaction, arginfo_class_RdKafka_Producer_beginTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, commitTransaction, arginfo_class_RdKafka_Producer_commitTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, abortTransaction, arginfo_class_RdKafka_Producer_abortTransaction, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
@@ -210,8 +197,12 @@ static zend_class_entry *register_class_RdKafka(void)
zend_class_entry ce, *class_entry;
INIT_CLASS_ENTRY(ce, "RdKafka", class_RdKafka_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, ZEND_ACC_ABSTRACT);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
class_entry->ce_flags |= ZEND_ACC_ABSTRACT;
+#endif
zval property_error_cb_default_value;
ZVAL_NULL(&property_error_cb_default_value);
@@ -232,8 +223,12 @@ static zend_class_entry *register_class_RdKafka_Exception(zend_class_entry *clas
{
zend_class_entry ce, *class_entry;
- INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", class_RdKafka_Exception_methods);
+ INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", NULL);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_Exception, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_Exception);
+#endif
return class_entry;
}
@@ -243,7 +238,11 @@ static zend_class_entry *register_class_RdKafka_Consumer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", class_RdKafka_Consumer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
@@ -253,7 +252,11 @@ static zend_class_entry *register_class_RdKafka_Producer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", class_RdKafka_Producer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
diff --git a/tests/controller_id.phpt b/tests/controller_id.phpt
new file mode 100644
index 00000000..b9724804
--- /dev/null
+++ b/tests/controller_id.phpt
@@ -0,0 +1,25 @@
+--TEST--
+Display controller id
+--SKIPIF--
+set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+
+echo (new RdKafka\Producer($conf))->getControllerId(10*1000) . \PHP_EOL;
+echo (new RdKafka\Consumer($conf))->getControllerId(10*1000) . \PHP_EOL;
+
+$conf = new RdKafka\Conf();
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+$conf->set('group.id', 'test');
+
+echo (new RdKafka\KafkaConsumer($conf))->getControllerId(10*1000) . \PHP_EOL;
+--EXPECT--
+1
+1
+1
\ No newline at end of file
diff --git a/tests/init_transaction_not_configured.phpt b/tests/init_transaction_not_configured.phpt
index 49e552f2..e0a28f3d 100644
--- a/tests/init_transaction_not_configured.phpt
+++ b/tests/init_transaction_not_configured.phpt
@@ -29,7 +29,7 @@ try {
}
--EXPECTF--
-_NOT_CONFIGURED
+The Transactional API requires transactional.id to be configured (RD_KAFKA_RESP_ERR__NOT_CONFIGURED)
-145
%s/tests/init_transaction_not_configured.php
14
diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt
new file mode 100644
index 00000000..b95de0f6
--- /dev/null
+++ b/tests/oauthbearer_integration.phpt
@@ -0,0 +1,151 @@
+--TEST--
+Oauthbearer
+--SKIPIF--
+= 0x01010000 || die("skip librdkafka too old does not support oauthbearer");
+--FILE--
+ $jws,
+ 'principal' => $principal,
+ 'expiryMs' => $expiryMs,
+ ];
+}
+
+// Set up tests
+$conf = new RdKafka\Conf();
+if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
+ $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
+}
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS'));
+$conf->set('security.protocol', 'SASL_PLAINTEXT');
+$conf->set('sasl.mechanisms', 'OAUTHBEARER');
+$conf->set('sasl.oauthbearer.config', 'principal=admin');
+$conf->setLogCb(function ($kafka, $level, $facility, $message) {});
+$conf->setErrorCb(function ($producer, $err, $errstr) {
+ printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
+});
+
+// Test that refresh token with setting token accurately will succeed when getting metadata
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {
+ echo "Refreshing token and succeeding\n";
+ $token = generateJws();
+ $producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+});
+$producer = new \RdKafka\Producer($conf);
+$producer->poll(0);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "Metadata retrieved successfully when refresh callback set token\n";
+} catch (\RdKafka\Exception $e) {
+ echo "FAIL: Caught exception when getting metadata after successfully refreshing any token\n";
+}
+
+// Test that refresh token with setting token failure will fail when getting metadata
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {
+ echo "Setting token failure in refresh cb\n";
+ $producer->oauthbearerSetTokenFailure('Token failure before getting metadata');
+ $producer->poll(0);
+});
+$producer = new \RdKafka\Producer($conf);
+$producer->poll(0);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "FAIL: Did not catch exception after not setting or refreshing any token\n";
+} catch (\RdKafka\Exception $e) {
+ echo "Caught exception when getting metadata after not setting or refreshing any token\n";
+}
+
+// Test that setting token without refreshing will get metadata successfully
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {});
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws();
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "Got metadata successfully\n";
+} catch (\RdKafka\Exception $e) {
+ echo "FAIL: Set token but still got exception \n";
+ exit;
+}
+
+// Test that token refresh is called after token expires
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {
+ echo "Refreshing token\n";
+});
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws('required-scope', 5);
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+$producer->poll(0);
+echo "Polled with refresh\n";
+sleep(1);
+$producer->poll(0);
+echo "Polled without refresh\n";
+sleep(4);
+$producer->poll(0);
+echo "Polled with refresh\n";
+
+// Test that tokens without required scope fail
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws('not-required-scope');
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "FAIL: Exception not thrown as expected when using insufficient scope\n";
+ exit;
+} catch (\RdKafka\Exception $e) {
+ echo "Caught expected exception with insufficient_scope\n";
+}
+
+// Test that setting token with extensions succeeds
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {});
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws();
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']);
+$producer->poll(0);
+
+--EXPECT--
+Refreshing token and succeeding
+Metadata retrieved successfully when refresh callback set token
+Setting token failure in refresh cb
+Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata
+Caught exception when getting metadata after not setting or refreshing any token
+Got metadata successfully
+Refreshing token
+Polled with refresh
+Polled without refresh
+Refreshing token
+Polled with refresh
+Caught expected exception with insufficient_scope
diff --git a/tests/test_env.php.sample b/tests/test_env.php.sample
index e43476aa..0d6be87a 100644
--- a/tests/test_env.php.sample
+++ b/tests/test_env.php.sample
@@ -4,6 +4,10 @@ if (false === getenv('TEST_KAFKA_BROKERS')) {
putenv('TEST_KAFKA_BROKERS=localhost:9092');
}
+if (false === getenv('TEST_KAFKA_OAUTH_BROKERS')) {
+ putenv('TEST_KAFKA_OAUTH_BROKERS=kafka_oauth2:29092');
+}
+
if (false === getenv('TEST_KAFKA_BROKER_VERSION')) {
putenv('TEST_KAFKA_BROKER_VERSION=2.3');
}