diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 97365c07..c38a9969 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -108,28 +108,39 @@ jobs: # librdkafka 1.0.1 - php: '8.1.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '8.0.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '7.4.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '7.3.0' librdkafka: 'v1.0.1' + skipoauth: '1' # librdkafka 0.11.6 - php: '8.1.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '8.0.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.4.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.3.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.2.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.1.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.0.0' librdkafka: 'v0.11.6' + skipoauth: '1' # librdkafka master (experimental, does not block PRs) - php: '8.3.0' @@ -157,6 +168,7 @@ jobs: PHP_VERSION: ${{ matrix.php }} LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }} MEMORY_CHECK: ${{ matrix.memcheck }} + SKIP_OAUTH: ${{ matrix.skipoauth }} TEST_KAFKA_BROKERS: kafka:9092 TEST_KAFKA_BROKER_VERSION: 2.6 steps: diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh index 4413c48a..b8949a90 100755 --- a/.github/workflows/test/start-kafka.sh +++ b/.github/workflows/test/start-kafka.sh @@ -1,20 +1,66 @@ #!/bin/sh docker network create kafka_network -docker pull wurstmeister/zookeeper:3.4.6 -docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6 -docker pull wurstmeister/kafka:2.13-2.6.0 -docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0 -printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null +docker pull wurstmeister/zookeeper:latest +docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest +docker pull wurstmeister/kafka:latest +docker run -d -p 9092:9092 --network kafka_network \ + -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ + -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ + -e "KAFKA_BROKER_ID=1" \ + -e "KAFKA_ADVERTISED_HOST_NAME=kafka" \ + -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \ + -e "KAFKA_ADVERTISED_PORT=9092" \ + --name kafka wurstmeister/kafka:latest -echo "Waiting for Kafka to be ready" +if [ ${SKIP_OAUTH:-0} -ne 1 ]; then + docker run -d -p 29092:29092 --network kafka_network \ + -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ + -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ + -e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \ + -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \ + -e "KAFKA_ADVERTISED_PORT=29092" \ + -e "KAFKA_BROKER_ID=2" \ + -e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \ + -e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \ + -e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \ + -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \ + -e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \ + --name kafka_oauth2 wurstmeister/kafka:latest +fi + +printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null + +echo "Waiting for Kafka services to be ready" + +kafka_ready=0 +kafka_oauth2_ready=0 for i in $(seq 1 20); do - if kafkacat -b 127.0.0.1 -L; then - echo "Kafka is ready" - exit 0 + if [ $kafka_ready -eq 0 ]; then + if kafkacat -b 127.0.0.1 -L; then + kafka_ready=1 + echo "Kafka is ready" + fi + fi + if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then + if kafkacat -b kafka_oauth2:29092 \ + -X security.protocol=SASL_PLAINTEXT \ + -X sasl.mechanisms=OAUTHBEARER \ + -X enable.sasl.oauthbearer.unsecure.jwt="true" \ + -X sasl.oauthbearer.config="principal=admin scope=required-scope" -L + then + kafka_oauth2_ready=1 + echo "Kafka OAuth2 is ready" + fi + fi + + if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then + exit 0 fi done -echo "Timedout waiting for Kafka to be ready" +echo "Timedout waiting for Kafka services to be ready" exit 1 diff --git a/conf.c b/conf.c index e438e5fa..9aa8132f 100644 --- a/conf.c +++ b/conf.c @@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */ { + kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh); kafka_conf_callback_copy(&to->error, from->error); kafka_conf_callback_copy(&to->rebalance, from->rebalance); kafka_conf_callback_copy(&to->dr_msg, from->dr_msg); diff --git a/rdkafka.c b/rdkafka.c index 61c28eeb..bb2e7aa4 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -445,8 +445,8 @@ PHP_METHOD(RdKafka, oauthbearerSetToken) } errstr[0] = '\0'; - - int extensions_size; + + int extensions_size = 0; char **extensions = NULL; if (extensions_hash != NULL) { diff --git a/rdkafka.stub.php b/rdkafka.stub.php index d877b5b2..8d029b72 100644 --- a/rdkafka.stub.php +++ b/rdkafka.stub.php @@ -77,10 +77,10 @@ public function resumePartitions(array $topic_partitions): array {} #ifdef HAS_RD_KAFKA_OAUTHBEARER /** @tentative-return-type */ - public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void; + public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {} /** @tentative-return-type */ - public function oauthbearerSetTokenFailure(string $error): void; + public function oauthbearerSetTokenFailure(string $error): void {} #endif } } diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt new file mode 100644 index 00000000..b95de0f6 --- /dev/null +++ b/tests/oauthbearer_integration.phpt @@ -0,0 +1,151 @@ +--TEST-- +Oauthbearer +--SKIPIF-- += 0x01010000 || die("skip librdkafka too old does not support oauthbearer"); +--FILE-- + $jws, + 'principal' => $principal, + 'expiryMs' => $expiryMs, + ]; +} + +// Set up tests +$conf = new RdKafka\Conf(); +if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); +$conf->set('sasl.oauthbearer.config', 'principal=admin'); +$conf->setLogCb(function ($kafka, $level, $facility, $message) {}); +$conf->setErrorCb(function ($producer, $err, $errstr) { + printf("%s: %s\n", rd_kafka_err2str($err), $errstr); +}); + +// Test that refresh token with setting token accurately will succeed when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token and succeeding\n"; + $token = generateJws(); + $producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); + +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "Metadata retrieved successfully when refresh callback set token\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Caught exception when getting metadata after successfully refreshing any token\n"; +} + +// Test that refresh token with setting token failure will fail when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Setting token failure in refresh cb\n"; + $producer->oauthbearerSetTokenFailure('Token failure before getting metadata'); + $producer->poll(0); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "FAIL: Did not catch exception after not setting or refreshing any token\n"; +} catch (\RdKafka\Exception $e) { + echo "Caught exception when getting metadata after not setting or refreshing any token\n"; +} + +// Test that setting token without refreshing will get metadata successfully +$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws(); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "Got metadata successfully\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Set token but still got exception \n"; + exit; +} + +// Test that token refresh is called after token expires +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token\n"; +}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws('required-scope', 5); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$producer->poll(0); +echo "Polled with refresh\n"; +sleep(1); +$producer->poll(0); +echo "Polled without refresh\n"; +sleep(4); +$producer->poll(0); +echo "Polled with refresh\n"; + +// Test that tokens without required scope fail +$producer = new \RdKafka\Producer($conf); +$token = generateJws('not-required-scope'); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "FAIL: Exception not thrown as expected when using insufficient scope\n"; + exit; +} catch (\RdKafka\Exception $e) { + echo "Caught expected exception with insufficient_scope\n"; +} + +// Test that setting token with extensions succeeds +$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws(); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']); +$producer->poll(0); + +--EXPECT-- +Refreshing token and succeeding +Metadata retrieved successfully when refresh callback set token +Setting token failure in refresh cb +Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata +Caught exception when getting metadata after not setting or refreshing any token +Got metadata successfully +Refreshing token +Polled with refresh +Polled without refresh +Refreshing token +Polled with refresh +Caught expected exception with insufficient_scope diff --git a/tests/test_env.php.sample b/tests/test_env.php.sample index e43476aa..0d6be87a 100644 --- a/tests/test_env.php.sample +++ b/tests/test_env.php.sample @@ -4,6 +4,10 @@ if (false === getenv('TEST_KAFKA_BROKERS')) { putenv('TEST_KAFKA_BROKERS=localhost:9092'); } +if (false === getenv('TEST_KAFKA_OAUTH_BROKERS')) { + putenv('TEST_KAFKA_OAUTH_BROKERS=kafka_oauth2:29092'); +} + if (false === getenv('TEST_KAFKA_BROKER_VERSION')) { putenv('TEST_KAFKA_BROKER_VERSION=2.3'); }