[fix][broker] PIP-460: Authorize scalable topic binary commands#25635
Merged
merlimat merged 2 commits intoapache:masterfrom Apr 30, 2026
Merged
[fix][broker] PIP-460: Authorize scalable topic binary commands#25635merlimat merged 2 commits intoapache:masterfrom
merlimat merged 2 commits intoapache:masterfrom
Conversation
The PIP-460/PIP-466 binary protocol commands (ScalableTopicLookup, ScalableTopicSubscribe, ScalableTopicClose) bypassed per-operation authorization. Any authenticated client could discover topology and register consumers on any scalable topic regardless of permissions. Wire the lookup and subscribe handlers through the existing isTopicOperationAllowed() helpers using TopicOperation.LOOKUP and TopicOperation.CONSUME — the same authorization required by classic lookup/subscribe. Close needs no per-call check (per-connection session scoping; the originating lookup was already authorized).
merlimat
approved these changes
Apr 30, 2026
poorbarcode
pushed a commit
to poorbarcode/pulsar
that referenced
this pull request
May 6, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #25615
PIP: 460
Motivation
PIP-460 (Scalable Topics) and PIP-466 introduced six new binary protocol commands. The three client→broker handlers in
ServerCnx—ScalableTopicLookup(id 70),ScalableTopicClose(id 72), andScalableTopicSubscribe(id 73) — performed no per-operation authorization. Authentication is enforced at connection setup, but every comparable command (handleLookup,handleSubscribe,handleProducer,handlePartitionMetadataRequest) gates access throughAuthorizationServicewhile the scalable-topic equivalents did not.This is a security gap: any authenticated client could open a DAG watch session on any scalable topic and register consumers on any subscription, regardless of namespace/topic permissions. The Admin REST API for scalable topics already enforces authorization (
ScalableTopicsAuthZTestcovers it). This PR closes the parallel gap on the binary protocol path.PIP-460 segments inherit authorization from their parent
topic://, so authorization happens at the parent topic level — no new permission types are needed. The mapping is the same as classic Pulsar:CommandScalableTopicLookupTopicOperation.LOOKUPCommandScalableTopicSubscribeTopicOperation.CONSUME(with subscription)CommandScalableTopicCloseModifications
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.javahandleCommandScalableTopicLookup: wrapsDagWatchSessioncreation inisTopicOperationAllowed(topic, TopicOperation.LOOKUP, ...). On denial →Commands.newScalableTopicError(sessionId, AuthorizationError, ...). On authz exception →logAuthException+AuthorizationError. The session is registered indagWatchSessionsonly after authorization passes, so an unauthorized lookup never holds a server-side resource.handleCommandScalableTopicSubscribe: wrapsscalableTopicService.registerConsumerinisTopicOperationAllowed(topic, subscription, TopicOperation.CONSUME)— the 3-arg overload that wraps the auth data inAuthenticationDataSubscriptionso subscription-aware authz plugins receive the subscription name. On denial →getCommandSender().sendScalableTopicSubscribeError(requestId, AuthorizationError, ...).handleCommandScalableTopicClose: no per-call authorization, with a comment documenting the deliberate decision. The session is keyed in this connection's per-instancedagWatchSessionsmap; authentication is enforced at connect; the originatingScalableTopicLookupwas authorized when the session was created; a close for an unknownsessionIdis an idempotent no-op. Same pattern ashandleCloseProducer/handleCloseConsumer.Mirrors existing patterns (
handleLookupline 648,handleSubscribeline 1535) for thread model, error frames, and exception logging.pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java— added decoder hooks forCommandScalableTopicUpdate,CommandScalableTopicSubscribeResponse, andCommandScalableTopicAssignmentUpdateso the existing test harness can decode broker→client scalable-topic responses.Verifying this change
This change added tests and can be verified as follows:
ScalableTopicBinaryAuthZTest(6 tests) — dedicated binary-protocol authorization suite covering: authorized lookup proceeds, unauthorized lookup returnsAuthorizationError, unauthorized subscribe returnsAuthorizationError, close-no-authz no-op for unknown session, and feature-flag short-circuits authz for both lookup and subscribe.ServerCnxTest.testScalableTopicCommandsRequireTopicAuthorization— combined unauthorized check that mirrors the existingtestVerifyAuthRoleAndAuthDataFromDirectConnectionBrokerpattern.ScalableTopicsAuthZTest(admin REST) continues to pass.scalable.*feature tests (92 tests acrossConsumerSessionTest,DagWatchSessionTest,ScalableTopicControllerTest,ScalableTopicServiceTest,SegmentLayoutTest,SubscriptionCoordinatorTest) continue to pass.Run locally:
```
./gradlew :pulsar-broker:test --tests "org.apache.pulsar.broker.service.ScalableTopicBinaryAuthZTest"
./gradlew :pulsar-broker:test --tests "org.apache.pulsar.broker.service.ServerCnxTest"
./gradlew :pulsar-broker:test --tests "org.apache.pulsar.broker.admin.ScalableTopicsAuthZTest"
```
Does this pull request potentially affect one of the following parts:
CommandScalableTopicError/CommandScalableTopicSubscribeResponseframes are returned, now withServerError.AuthorizationErrorfor unauthorized clients.