Skip to content

Commit bc01ae8

Browse files
poorbarcodelhotari
authored andcommitted
[fix][proxy] Propagate client connection feature flags through Pulsar Proxy to Broker (apache#24158)
Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit e94a575)
1 parent 5fde2a6 commit bc01ae8

File tree

6 files changed

+86
-11
lines changed

6 files changed

+86
-11
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ public void testConnectCommandWithProxyVersion() throws Exception {
327327
assertEquals(serverCnx.getState(), State.Start);
328328

329329
ByteBuf clientCommand = Commands.newConnect("none", null, 1, null, null, null, null, null,
330-
"my-pulsar-proxy");
330+
"my-pulsar-proxy", null);
331331
channel.writeInbound(clientCommand);
332332

333333
assertEquals(serverCnx.getState(), State.Connected);
@@ -601,7 +601,7 @@ public void testConnectWithNonProxyRoleAndProxyVersion() throws Exception {
601601
assertEquals(serverCnx.getState(), State.Start);
602602

603603
ByteBuf clientCommand = Commands.newConnect(authMethodName, AuthData.of("pass.pass".getBytes()),
604-
1, null, null, null, null, null, "my-pulsar-proxy");
604+
1, null, null, null, null, null, "my-pulsar-proxy", null);
605605
channel.writeInbound(clientCommand);
606606
Object response = getResponse();
607607
assertTrue(response instanceof CommandError);

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,12 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p
240240
String targetBroker, String originalPrincipal, AuthData originalAuthData,
241241
String originalAuthMethod) {
242242
return newConnect(authMethodName, authData, protocolVersion, libVersion, targetBroker, originalPrincipal,
243-
originalAuthData, originalAuthMethod, null);
243+
originalAuthData, originalAuthMethod, null, null);
244244
}
245245

246246
public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion,
247247
String targetBroker, String originalPrincipal, AuthData originalAuthData,
248-
String originalAuthMethod, String proxyVersion) {
248+
String originalAuthMethod, String proxyVersion, FeatureFlags featureFlags) {
249249
BaseCommand cmd = localCmd(Type.CONNECT);
250250
CommandConnect connect = cmd.setConnect()
251251
.setClientVersion(libVersion != null ? libVersion : "Pulsar Client")
@@ -276,7 +276,11 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p
276276
connect.setOriginalAuthMethod(originalAuthMethod);
277277
}
278278
connect.setProtocolVersion(protocolVersion);
279-
setFeatureFlags(connect.setFeatureFlags());
279+
if (featureFlags != null) {
280+
connect.setFeatureFlags().copyFrom(featureFlags);
281+
} else {
282+
setFeatureFlags(connect.setFeatureFlags());
283+
}
280284

281285
return serializeWithSize(cmd);
282286
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.pulsar.common.api.AuthData;
5959
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
6060
import org.apache.pulsar.common.api.proto.CommandConnected;
61+
import org.apache.pulsar.common.api.proto.FeatureFlags;
6162
import org.apache.pulsar.common.protocol.Commands;
6263
import org.apache.pulsar.common.protocol.PulsarDecoder;
6364
import org.apache.pulsar.common.stats.Rate;
@@ -107,7 +108,8 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection)
107108
this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
108109
}
109110

110-
public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress, int protocolVersion) {
111+
public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress, int protocolVersion,
112+
final FeatureFlags featureFlags) {
111113
String remoteHost;
112114
try {
113115
remoteHost = parseHost(brokerHostAndPort);
@@ -182,7 +184,7 @@ protected void initChannel(SocketChannel ch) {
182184
service.getConfiguration().getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
183185
4));
184186
ch.pipeline().addLast("proxyOutboundHandler",
185-
(ChannelHandler) new ProxyBackendHandler(config, protocolVersion, remoteHost));
187+
(ChannelHandler) new ProxyBackendHandler(config, protocolVersion, remoteHost, featureFlags));
186188
}
187189
});
188190

@@ -276,11 +278,14 @@ public class ProxyBackendHandler extends PulsarDecoder {
276278
protected ChannelHandlerContext ctx;
277279
private final ProxyConfiguration config;
278280
private final int protocolVersion;
281+
private final FeatureFlags featureFlags;
279282

280-
public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion, String remoteHostName) {
283+
public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion, String remoteHostName,
284+
FeatureFlags featureFlags) {
281285
this.config = config;
282286
this.protocolVersion = protocolVersion;
283287
this.remoteHostName = remoteHostName;
288+
this.featureFlags = featureFlags;
284289
}
285290

286291
@Override
@@ -297,7 +302,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
297302
ByteBuf command = Commands.newConnect(
298303
authentication.getAuthMethodName(), authData, protocolVersion,
299304
proxyConnection.clientVersion, null /* target broker */,
300-
originalPrincipal, clientAuthData, clientAuthMethod, PulsarVersion.getVersion());
305+
originalPrincipal, clientAuthData, clientAuthMethod, PulsarVersion.getVersion(), featureFlags);
301306
writeAndFlush(command);
302307
isTlsOutboundChannel = ProxyConnection.isTlsChannel(inboundChannel);
303308
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected ByteBuf newConnectCommand() throws Exception {
7373
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
7474
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
7575
proxyConnection.clientVersion, proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
76-
clientAuthMethod, PulsarVersion.getVersion());
76+
clientAuthMethod, PulsarVersion.getVersion(), null);
7777
}
7878

7979
@Override

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ private void handleBrokerConnected(DirectProxyHandler directProxyHandler, Comman
455455
private void connectToBroker(InetSocketAddress brokerAddress) {
456456
assert ctx.executor().inEventLoop();
457457
DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
458-
directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, protocolVersionToAdvertise);
458+
directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, protocolVersionToAdvertise, features);
459459
}
460460

461461
public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.Mockito.doReturn;
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertNotNull;
27+
import io.netty.buffer.ByteBuf;
2728
import io.netty.channel.EventLoopGroup;
2829
import io.netty.util.concurrent.DefaultThreadFactory;
2930
import java.util.ArrayList;
@@ -42,22 +43,28 @@
4243
import org.apache.pulsar.broker.BrokerTestUtil;
4344
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4445
import org.apache.pulsar.broker.authentication.AuthenticationService;
46+
import org.apache.pulsar.broker.service.ServerCnx;
4547
import org.apache.pulsar.client.api.Authentication;
4648
import org.apache.pulsar.client.api.AuthenticationFactory;
4749
import org.apache.pulsar.client.api.Consumer;
50+
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
4851
import org.apache.pulsar.client.api.Message;
4952
import org.apache.pulsar.client.api.MessageRoutingMode;
5053
import org.apache.pulsar.client.api.Producer;
5154
import org.apache.pulsar.client.api.PulsarClient;
5255
import org.apache.pulsar.client.api.Schema;
5356
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
5457
import org.apache.pulsar.client.api.SubscriptionType;
58+
import org.apache.pulsar.client.impl.ClientBuilderImpl;
5559
import org.apache.pulsar.client.impl.ClientCnx;
5660
import org.apache.pulsar.client.impl.ConnectionPool;
5761
import org.apache.pulsar.client.impl.PulsarClientImpl;
5862
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
5963
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
64+
import org.apache.pulsar.common.api.AuthData;
65+
import org.apache.pulsar.common.api.proto.BaseCommand;
6066
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
67+
import org.apache.pulsar.common.api.proto.FeatureFlags;
6168
import org.apache.pulsar.common.api.proto.ProtocolVersion;
6269
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
6370
import org.apache.pulsar.common.naming.TopicName;
@@ -66,6 +73,7 @@
6673
import org.apache.pulsar.common.policies.data.TenantInfo;
6774
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
6875
import org.apache.pulsar.common.policies.data.TopicType;
76+
import org.apache.pulsar.common.protocol.Commands;
6977
import org.apache.pulsar.common.schema.SchemaInfo;
7078
import org.apache.pulsar.common.util.netty.EventLoopUtil;
7179
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -429,6 +437,64 @@ public void testGetClientVersion() throws Exception {
429437
.get(0).getClientVersion(), String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
430438
}
431439

440+
@DataProvider
441+
public Object[][] booleanValues() {
442+
return new Object[][]{
443+
{true},
444+
{false}
445+
};
446+
}
447+
448+
@Test(dataProvider = "booleanValues")
449+
public void testConnectedWithClientSideFeatures(boolean supported) throws Exception {
450+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
451+
admin.topics().createNonPartitionedTopic(topic);
452+
453+
// Create a client as a old version, which does not support "supportsReplDedupByLidAndEid".
454+
ClientBuilderImpl clientBuilder2 =
455+
(ClientBuilderImpl) PulsarClient.builder().serviceUrl(proxyService.getServiceUrl());
456+
PulsarClientImpl injectedClient = InjectedClientCnxClientBuilder.create(clientBuilder2,
457+
(conf, eventLoopGroup) -> {
458+
return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
459+
460+
@Override
461+
protected ByteBuf newConnectCommand() throws Exception {
462+
authenticationDataProvider = authentication.getAuthData(remoteHostName);
463+
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
464+
BaseCommand cmd =
465+
Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(), authData,
466+
this.protocolVersion, clientVersion, proxyToTargetBrokerAddress,
467+
null, null, null, null, null);
468+
FeatureFlags featureFlags = cmd.getConnect().getFeatureFlags();
469+
featureFlags.setSupportsAuthRefresh(supported);
470+
featureFlags.setSupportsBrokerEntryMetadata(supported);
471+
featureFlags.setSupportsPartialProducer(supported);
472+
featureFlags.setSupportsTopicWatchers(supported);
473+
featureFlags.setSupportsReplDedupByLidAndEid(supported);
474+
featureFlags.setSupportsGetPartitionedMetadataWithoutAutoCreation(supported);
475+
return Commands.serializeWithSize(cmd);
476+
}
477+
};
478+
});
479+
480+
// Verify: the broker will create a connection, which disabled "supportsReplDedupByLidAndEid".
481+
Producer<byte[]> producer = injectedClient.newProducer().topic(topic).create();
482+
ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService().getTopic(topic, false).get().get()
483+
.getProducers().values().iterator().next().getCnx();
484+
FeatureFlags featureFlags = serverCnx.getFeatures();
485+
assertEquals(featureFlags.isSupportsAuthRefresh(), supported);
486+
assertEquals(featureFlags.isSupportsBrokerEntryMetadata(), supported);
487+
assertEquals(featureFlags.isSupportsPartialProducer(), supported);
488+
assertEquals(featureFlags.isSupportsTopicWatchers(), supported);
489+
assertEquals(featureFlags.isSupportsReplDedupByLidAndEid(), supported);
490+
assertEquals(featureFlags.isSupportsGetPartitionedMetadataWithoutAutoCreation(), supported);
491+
492+
// cleanup.
493+
producer.close();
494+
injectedClient.close();
495+
admin.topics().delete(topic);
496+
}
497+
432498
private PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
433499
throws Exception {
434500
ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());

0 commit comments

Comments
 (0)