Skip to content

Commit

Permalink
[improve] [client] PIP-344 support feature flag supportsGetPartitione…
Browse files Browse the repository at this point in the history
…dMetadataWithoutAutoCreation (apache#22773)

(cherry picked from commit 6236116)
(cherry picked from commit 03c0975)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed Jun 24, 2024
1 parent cb356b4 commit 8ca5e52
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import com.google.common.collect.Sets;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand Down Expand Up @@ -123,4 +127,44 @@ public void testClientVersion() throws Exception {
producer.close();
consumer.close();
}

@Test
public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception {
final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp");
admin.topics().createNonPartitionedTopic(topic);
PulsarClientImpl clientWitBinaryLookup = (PulsarClientImpl) PulsarClient.builder()
.maxNumberOfRejectedRequestPerConnection(1)
.connectionMaxIdleSeconds(Integer.MAX_VALUE)
.serviceUrl(pulsar.getBrokerServiceUrl())
.build();
ProducerImpl producer = (ProducerImpl) clientWitBinaryLookup.newProducer().topic(topic).create();

// Verify: the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation" responded from the broker is true.
Awaitility.await().untilAsserted(() -> {
ClientCnx clientCnx = producer.getClientCnx();
Assert.assertNotNull(clientCnx);
Assert.assertTrue(clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation());
});
Assert.assertEquals(
clientWitBinaryLookup.getPartitionsForTopic(topic, true).get().size(), 1);

// Inject a "false" value for the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation".
// Verify: client will get a not support error.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (CompletableFuture<ClientCnx> clientCnxFuture : clientWitBinaryLookup.getCnxPool().getConnections()) {
field.set(clientCnxFuture.get(), false);
}
try {
clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
Assert.fail("Expected an error that the broker version is too old.");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker"));
}

// cleanup.
producer.close();
clientWitBinaryLookup.close();
admin.topics().delete(topic, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The feature of"
+ " getting partitions without auto-creation is not supported from the broker,"
+ " please upgrade the broker to the latest version."));
return;
}
long requestId = client.newRequestId();
ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
metadataAutoCreationEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ public class ClientCnx extends PulsarHandler {
protected AuthenticationDataProvider authenticationDataProvider;
private TransactionBufferHandler transactionBufferHandler;
private boolean supportsTopicWatchers;
@Getter
private boolean supportsGetPartitionedMetadataWithoutAutoCreation;

/** Idle stat. **/
@Getter
Expand Down Expand Up @@ -382,6 +384,9 @@ protected void handleConnected(CommandConnected connected) {

supportsTopicWatchers =
connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers();
supportsGetPartitionedMetadataWithoutAutoCreation =
connected.hasFeatureFlags()
&& connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation();

// set remote protocol version to the correct version before we complete the connection future
setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max
connected.setProtocolVersion(versionToAdvertise);

connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers);
connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true);
return cmd;
}

Expand Down

0 comments on commit 8ca5e52

Please sign in to comment.