Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,39 @@ jobs:
# librdkafka 1.0.1
- php: '8.1.0'
librdkafka: 'v1.0.1'
skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v1.0.1'
skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v1.0.1'
skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v1.0.1'
skipoauth: '1'

# librdkafka 0.11.6
- php: '8.1.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.2.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.1.0'
librdkafka: 'v0.11.6'
skipoauth: '1'
- php: '7.0.0'
librdkafka: 'v0.11.6'
skipoauth: '1'

# librdkafka master (experimental, does not block PRs)
- php: '8.3.0'
Expand Down Expand Up @@ -157,6 +168,7 @@ jobs:
PHP_VERSION: ${{ matrix.php }}
LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }}
MEMORY_CHECK: ${{ matrix.memcheck }}
SKIP_OAUTH: ${{ matrix.skipoauth }}
TEST_KAFKA_BROKERS: kafka:9092
TEST_KAFKA_BROKER_VERSION: 2.6
steps:
Expand Down
66 changes: 56 additions & 10 deletions .github/workflows/test/start-kafka.sh
Original file line number Diff line number Diff line change
@@ -1,20 +1,66 @@
#!/bin/sh

docker network create kafka_network
docker pull wurstmeister/zookeeper:3.4.6
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.13-2.6.0
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0
printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null
docker pull wurstmeister/zookeeper:latest
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
docker pull wurstmeister/kafka:latest
docker run -d -p 9092:9092 --network kafka_network \
-e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
-e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
-e "KAFKA_BROKER_ID=1" \
-e "KAFKA_ADVERTISED_HOST_NAME=kafka" \
-e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \
-e "KAFKA_ADVERTISED_PORT=9092" \
--name kafka wurstmeister/kafka:latest

echo "Waiting for Kafka to be ready"
if [ ${SKIP_OAUTH:-0} -ne 1 ]; then
docker run -d -p 29092:29092 --network kafka_network \
-e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
-e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
-e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \
-e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \
-e "KAFKA_ADVERTISED_PORT=29092" \
-e "KAFKA_BROKER_ID=2" \
-e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
-e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
-e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \
-e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \
-e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \
-e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \
-e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \
--name kafka_oauth2 wurstmeister/kafka:latest
fi

printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null

echo "Waiting for Kafka services to be ready"

kafka_ready=0
kafka_oauth2_ready=0

for i in $(seq 1 20); do
if kafkacat -b 127.0.0.1 -L; then
echo "Kafka is ready"
exit 0
if [ $kafka_ready -eq 0 ]; then
if kafkacat -b 127.0.0.1 -L; then
kafka_ready=1
echo "Kafka is ready"
fi
fi
if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then
if kafkacat -b kafka_oauth2:29092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=OAUTHBEARER \
-X enable.sasl.oauthbearer.unsecure.jwt="true" \
-X sasl.oauthbearer.config="principal=admin scope=required-scope" -L
then
kafka_oauth2_ready=1
echo "Kafka OAuth2 is ready"
fi
fi

if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then
exit 0
fi
done

echo "Timedout waiting for Kafka to be ready"
echo "Timedout waiting for Kafka services to be ready"
exit 1
1 change: 1 addition & 0 deletions conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba

void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */
{
kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh);
kafka_conf_callback_copy(&to->error, from->error);
kafka_conf_callback_copy(&to->rebalance, from->rebalance);
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg);
Expand Down
4 changes: 2 additions & 2 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ PHP_METHOD(RdKafka, oauthbearerSetToken)
}

errstr[0] = '\0';
int extensions_size;

int extensions_size = 0;
char **extensions = NULL;

if (extensions_hash != NULL) {
Expand Down
4 changes: 2 additions & 2 deletions rdkafka.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public function resumePartitions(array $topic_partitions): array {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void;
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}

/** @tentative-return-type */
public function oauthbearerSetTokenFailure(string $error): void;
public function oauthbearerSetTokenFailure(string $error): void {}
#endif
}
}
Expand Down
151 changes: 151 additions & 0 deletions tests/oauthbearer_integration.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
--TEST--
Oauthbearer
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
RD_KAFKA_VERSION >= 0x01010000 || die("skip librdkafka too old does not support oauthbearer");
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';

function generateJws($scope = 'required-scope', $expiresInSeconds = 60)
{
$nowSeconds = floor(microtime(true));
$expirySeconds = ($nowSeconds + $expiresInSeconds);
$expiryMs = $expirySeconds * 1000;

$principal = 'admin';
$claimsJson = sprintf(
'{"sub": "%s", "exp": %d, "iat": %d, "scope": "%s"}',
$principal,
$expirySeconds,
$nowSeconds - 10,
$scope,
);
$headerJwsSegment = 'eyJhbGciOiJub25lIn0';
$claimsJwsSegment = base64_encode($claimsJson);
$claimsJwsSegment = rtrim(strtr($claimsJwsSegment, '+/', '-_'), '=');

$jws = sprintf('%s.%s.', $headerJwsSegment, $claimsJwsSegment);

return [
'value' => $jws,
'principal' => $principal,
'expiryMs' => $expiryMs,
];
}

// Set up tests
$conf = new RdKafka\Conf();
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
}
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS'));
$conf->set('security.protocol', 'SASL_PLAINTEXT');
$conf->set('sasl.mechanisms', 'OAUTHBEARER');
$conf->set('sasl.oauthbearer.config', 'principal=admin');
$conf->setLogCb(function ($kafka, $level, $facility, $message) {});
$conf->setErrorCb(function ($producer, $err, $errstr) {
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
});

// Test that refresh token with setting token accurately will succeed when getting metadata
$conf->setOauthbearerTokenRefreshCb(function ($producer) {
echo "Refreshing token and succeeding\n";
$token = generateJws();
$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
});
$producer = new \RdKafka\Producer($conf);
$producer->poll(0);
$topicName = sprintf("test_rdkafka_%s", uniqid());
$topic = $producer->newTopic($topicName);

try {
$producer->getMetadata(false, $topic, 10*1000);
echo "Metadata retrieved successfully when refresh callback set token\n";
} catch (\RdKafka\Exception $e) {
echo "FAIL: Caught exception when getting metadata after successfully refreshing any token\n";
}

// Test that refresh token with setting token failure will fail when getting metadata
$conf->setOauthbearerTokenRefreshCb(function ($producer) {
echo "Setting token failure in refresh cb\n";
$producer->oauthbearerSetTokenFailure('Token failure before getting metadata');
$producer->poll(0);
});
$producer = new \RdKafka\Producer($conf);
$producer->poll(0);
$topicName = sprintf("test_rdkafka_%s", uniqid());
$topic = $producer->newTopic($topicName);
try {
$producer->getMetadata(false, $topic, 10*1000);
echo "FAIL: Did not catch exception after not setting or refreshing any token\n";
} catch (\RdKafka\Exception $e) {
echo "Caught exception when getting metadata after not setting or refreshing any token\n";
}

// Test that setting token without refreshing will get metadata successfully
$conf->setOauthbearerTokenRefreshCb(function ($producer) {});
$producer = new \RdKafka\Producer($conf);
$token = generateJws();
$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
$topicName = sprintf("test_rdkafka_%s", uniqid());
$topic = $producer->newTopic($topicName);
try {
$producer->getMetadata(false, $topic, 10*1000);
echo "Got metadata successfully\n";
} catch (\RdKafka\Exception $e) {
echo "FAIL: Set token but still got exception \n";
exit;
}

// Test that token refresh is called after token expires
$conf->setOauthbearerTokenRefreshCb(function ($producer) {
echo "Refreshing token\n";
});
$producer = new \RdKafka\Producer($conf);
$token = generateJws('required-scope', 5);
$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
$producer->poll(0);
echo "Polled with refresh\n";
sleep(1);
$producer->poll(0);
echo "Polled without refresh\n";
sleep(4);
$producer->poll(0);
echo "Polled with refresh\n";

// Test that tokens without required scope fail
$producer = new \RdKafka\Producer($conf);
$token = generateJws('not-required-scope');
$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
$topicName = sprintf("test_rdkafka_%s", uniqid());
$topic = $producer->newTopic($topicName);
try {
$producer->getMetadata(false, $topic, 10*1000);
echo "FAIL: Exception not thrown as expected when using insufficient scope\n";
exit;
} catch (\RdKafka\Exception $e) {
echo "Caught expected exception with insufficient_scope\n";
}

// Test that setting token with extensions succeeds
$conf->setOauthbearerTokenRefreshCb(function ($producer) {});
$producer = new \RdKafka\Producer($conf);
$token = generateJws();
$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']);
$producer->poll(0);

--EXPECT--
Refreshing token and succeeding
Metadata retrieved successfully when refresh callback set token
Setting token failure in refresh cb
Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata
Caught exception when getting metadata after not setting or refreshing any token
Got metadata successfully
Refreshing token
Polled with refresh
Polled without refresh
Refreshing token
Polled with refresh
Caught expected exception with insufficient_scope
4 changes: 4 additions & 0 deletions tests/test_env.php.sample
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ if (false === getenv('TEST_KAFKA_BROKERS')) {
putenv('TEST_KAFKA_BROKERS=localhost:9092');
}

if (false === getenv('TEST_KAFKA_OAUTH_BROKERS')) {
putenv('TEST_KAFKA_OAUTH_BROKERS=kafka_oauth2:29092');
}

if (false === getenv('TEST_KAFKA_BROKER_VERSION')) {
putenv('TEST_KAFKA_BROKER_VERSION=2.3');
}