Skip to content

Commit

Permalink
[improve][broker] Add createTopicIfDoesNotExist option to RawReader c…
Browse files Browse the repository at this point in the history
…onstructor (apache#22264)

(cherry picked from commit 16cf199)
  • Loading branch information
coderzc authored and mukesh-ctds committed Apr 19, 2024
1 parent 6070b4d commit 72d6710
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ public interface RawReader {
*/

static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
return create(client, topic, subscription, true);
}

static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription,
boolean createTopicIfDoesNotExist) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
RawReader r =
new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist);
return future.thenApply(__ -> r);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class RawReaderImpl implements RawReader {
private RawConsumerImpl consumer;

public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
CompletableFuture<Consumer<byte[]>> consumerFuture,
boolean createTopicIfDoesNotExist) {
consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(topic);
consumerConfiguration.setSubscriptionName(subscription);
Expand All @@ -61,8 +62,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);

consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);
consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist);
}

@Override
Expand Down Expand Up @@ -111,7 +111,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
final Queue<CompletableFuture<RawMessage>> pendingRawReceives;

RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
CompletableFuture<Consumer<byte[]>> consumerFuture, boolean createTopicIfDoesNotExist) {
super(client,
conf.getSingleTopic(),
conf,
Expand All @@ -123,7 +123,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
false
createTopicIfDoesNotExist
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Compactor(ServiceConfiguration conf,
}

public CompletableFuture<Long> compact(String topic) {
return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION).thenComposeAsync(
return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false).thenComposeAsync(
this::compactAndCloseReader, scheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -36,15 +37,18 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -496,4 +500,23 @@ public void testReadCancellationOnClose() throws Exception {
}
}
}

@Test
public void testAutoCreateTopic() throws ExecutionException, InterruptedException, PulsarAdminException {
String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");

RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
TopicStats stats = admin.topics().getStats(topic);
Assert.assertNotNull(stats);
reader.closeAsync().join();

String topic2 = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");
try {
reader = RawReader.create(pulsarClient, topic2, subscription, false).get();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof PulsarClientException.TopicDoesNotExistException);
}
reader.closeAsync().join();
}
}

0 comments on commit 72d6710

Please sign in to comment.