Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jobs:
tests:
name: 'Tests'
strategy:
fail-fast: false
matrix:
include:
# Latest librdkafka 2.x with memcheck
Expand Down Expand Up @@ -108,28 +109,39 @@ jobs:
# librdkafka 1.0.1
- php: '8.1.0'
librdkafka: 'v1.0.1'
skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v1.0.1'
skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v1.0.1'
skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v1.0.1'
skipoauth: '1'

# librdkafka 0.11.6
- php: '8.1.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.2.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.1.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.0.0'
librdkafka: 'v0.11.6'
skipoauth: '1'

# librdkafka master (experimental, does not block PRs)
- php: '8.3.0'
Expand Down Expand Up @@ -157,6 +169,7 @@ jobs:
PHP_VERSION: ${{ matrix.php }}
LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }}
MEMORY_CHECK: ${{ matrix.memcheck }}
SKIP_OAUTH: ${{ matrix.skipoauth }}
TEST_KAFKA_BROKERS: kafka:9092
TEST_KAFKA_BROKER_VERSION: 2.6
steps:
Expand Down
66 changes: 56 additions & 10 deletions .github/workflows/test/start-kafka.sh
Original file line number Diff line number Diff line change
@@ -1,20 +1,66 @@
#!/bin/sh

docker network create kafka_network
docker pull wurstmeister/zookeeper:3.4.6
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.13-2.6.0
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0
printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null
docker pull wurstmeister/zookeeper:latest
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
docker pull wurstmeister/kafka:latest
docker run -d -p 9092:9092 --network kafka_network \
-e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
-e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
-e "KAFKA_BROKER_ID=1" \
-e "KAFKA_ADVERTISED_HOST_NAME=kafka" \
-e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \
-e "KAFKA_ADVERTISED_PORT=9092" \
--name kafka wurstmeister/kafka:latest

echo "Waiting for Kafka to be ready"
if [ ${SKIP_OAUTH:-0} -ne 1 ]; then
docker run -d -p 29092:29092 --network kafka_network \
-e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
-e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
-e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \
-e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \
-e "KAFKA_ADVERTISED_PORT=29092" \
-e "KAFKA_BROKER_ID=2" \
-e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
-e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
-e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \
-e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \
-e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \
-e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \
-e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \
--name kafka_oauth2 wurstmeister/kafka:latest
fi

printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null

echo "Waiting for Kafka services to be ready"

kafka_ready=0
kafka_oauth2_ready=0

for i in $(seq 1 20); do
if kafkacat -b 127.0.0.1 -L; then
echo "Kafka is ready"
exit 0
if [ $kafka_ready -eq 0 ]; then
if kafkacat -b 127.0.0.1 -L; then
kafka_ready=1
echo "Kafka is ready"
fi
fi
if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then
if kafkacat -b kafka_oauth2:29092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=OAUTHBEARER \
-X enable.sasl.oauthbearer.unsecure.jwt="true" \
-X sasl.oauthbearer.config="principal=admin scope=required-scope" -L
then
kafka_oauth2_ready=1
echo "Kafka OAuth2 is ready"
fi
fi

if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then
exit 0
fi
done

echo "Timedout waiting for Kafka to be ready"
echo "Timedout waiting for Kafka services to be ready"
exit 1
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ modules
package.xml
rdkafka-*.tgz
run-tests.php
gen_stub.php
tests/*/*.diff
tests/*/*.exp
tests/*/*.log
Expand Down
25 changes: 0 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,6 @@ The high level and low level *consumers*, *producer*, and *metadata* APIs are su

Documentation is available [here](https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html).

## Sponsors

<table width="100%">
<tr>
<td>
<img width="1000" height="0">
<a href="https://upstash.com/?utm_source=php-rdkafka" >
<img src="https://raw.githubusercontent.com/upstash/sponsorship/master/kafka.png" alt="Upstash" width="260" align="right">
</a>

<h3>Upstash: Serverless Kafka</h3>

<ul>
<li>True Serverless Kafka with per-request-pricing</li>
<li>Managed Apache Kafka, works with all Kafka clients</li>
<li>Built-in REST API designed for serverless and edge functions</li>
</ul>

[Start for free in 30 seconds!](https://upstash.com/?utm_source=php-rdkafka)
</td>
</tr>
</table>

php-rdkafka supports Ukraine. Proceeds from our generous sponsors are currently donated to the [Support Ukraine collective](https://opencollective.com/support-ukraine).

## Table of Contents

1. [Installation](#installation)
Expand Down
1 change: 1 addition & 0 deletions conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba

void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */
{
kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh);
kafka_conf_callback_copy(&to->error, from->error);
kafka_conf_callback_copy(&to->rebalance, from->rebalance);
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg);
Expand Down
63 changes: 50 additions & 13 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
* Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, IS_ARRAY, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, 0)
#endif
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 2, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 0, 2)
#endif
ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, value, IS_STRING, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 0, 1)
#endif
ZEND_ARG_OBJ_INFO(0, topic_conf, RdKafka\\TopicConf, 0)
ZEND_END_ARG_INFO()

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()

Expand All @@ -32,8 +48,14 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
Expand All @@ -42,11 +64,14 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_TopicConf_set arginfo_class_RdKafka_Conf_set

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, partitioner, IS_LONG, 0)
ZEND_END_ARG_INFO()


ZEND_METHOD(RdKafka_Conf, __construct);
ZEND_METHOD(RdKafka_Conf, dump);
ZEND_METHOD(RdKafka_Conf, set);
Expand All @@ -58,15 +83,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);

#ifdef HAS_RD_KAFKA_OAUTHBEARER
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif

ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);


static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
Expand All @@ -79,17 +101,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
#endif
ZEND_FE_END
};


static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
#endif
#if (PHP_VERSION_ID >= 80400)
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
#else
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
Expand All @@ -99,7 +128,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void)
zend_class_entry ce, *class_entry;

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods);
#if (PHP_VERSION_ID >= 80400)
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
#endif

return class_entry;
}
Expand All @@ -109,7 +142,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void)
zend_class_entry ce, *class_entry;

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods);
#if (PHP_VERSION_ID >= 80400)
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
#endif

return class_entry;
}
Loading