Skip to content

Commit

Permalink
Converted main part of code to use builder APIs with typed interface (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Mar 2, 2018
1 parent 21ffe2c commit a64b383
Show file tree
Hide file tree
Showing 41 changed files with 436 additions and 361 deletions.
Expand Up @@ -78,11 +78,12 @@
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -499,27 +500,29 @@ public PulsarClient getReplicationClient(String cluster) {
String path = PulsarWebResource.path("clusters", cluster);
ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path)
.orElseThrow(() -> new KeeperException.NoNodeException(path));
ClientConfiguration configuration = new ClientConfiguration();
configuration.setUseTcpNoDelay(false);
configuration.setConnectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker());
configuration.setStatsInterval(0, TimeUnit.SECONDS);
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);
if (pulsar.getConfiguration().isAuthenticationEnabled()) {
configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
String clusterUrl = null;
if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
clusterUrl = isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls();
configuration.setUseTls(true);
configuration.setTlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
configuration
.setTlsAllowInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
clientBuilder
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls())
.enableTls(true)
.tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
} else {
clusterUrl = isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl()
: data.getServiceUrl();
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}
return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -104,24 +104,20 @@ protected void resetConfig() {

protected final void internalSetup() throws Exception {
init();
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
lookupUrl = new URI(brokerUrl.toString());
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
}
pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();
}

protected final void internalSetupForStatsTest() throws Exception {
init();
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(1, TimeUnit.SECONDS);
String lookupUrl = brokerUrl.toString();
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
}
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build();
}

protected final void init() throws Exception {
Expand Down
Expand Up @@ -64,7 +64,7 @@ public void testClosedConsumer() throws PulsarClientException {
public void testListener() throws PulsarClientException {
Consumer consumer = null;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setMessageListener((Consumer c, Message msg) -> {
conf.setMessageListener((Consumer<byte[]> c, Message<byte[]> msg) -> {
});
consumer = pulsarClient.subscribe("persistent://prop/cluster/ns/topicName", "my-subscription", conf);
Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
Expand Down
Expand Up @@ -45,8 +45,8 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
Expand All @@ -62,7 +62,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {

private static final long serialVersionUID = 1L;

Expand All @@ -74,7 +74,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final String groupId;
private final boolean isAutoCommit;

private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();

private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
Expand All @@ -84,10 +84,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Properties properties;

private static class QueueItem {
final org.apache.pulsar.client.api.Consumer consumer;
final Message message;
final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
final Message<byte[]> message;

QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) {
QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
this.consumer = consumer;
this.message = message;
}
Expand Down Expand Up @@ -146,19 +146,19 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ

this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
clientConf.setUseTcpNoDelay(false);
clientBuilder.enableTcpNoDelay(false);
try {
client = PulsarClient.create(serviceUrl, clientConf);
client = clientBuilder.serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}

@Override
public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) {
public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
// Block listener thread if the application is slowing down
try {
receivedMessages.put(new QueueItem(consumer, msg));
Expand Down Expand Up @@ -204,16 +204,17 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
conf.setSubscriptionType(SubscriptionType.Failover);
conf.setMessageListener(this);
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
if (numberOfPartitions > 1) {
// Subscribe to each partition
conf.setConsumerName(ConsumerName.generateRandomName());
consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client
.subscribeAsync(partitionName, groupId, conf);
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(topic, partitionIndex);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
Expand All @@ -222,8 +223,8 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
}
} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client.subscribeAsync(topic,
groupId, conf);
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = new TopicPartition(topic, 0);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
Expand Down Expand Up @@ -293,7 +294,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message msg = item.message;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
long offset = MessageIdUtils.getOffset(msgId);

Expand Down Expand Up @@ -335,7 +336,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
}

@SuppressWarnings("unchecked")
private K getKey(String topic, Message msg) {
private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
return null;
}
Expand Down Expand Up @@ -393,7 +394,7 @@ private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMet
List<CompletableFuture<Void>> futures = new ArrayList<>();

offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition);
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);

lastCommittedOffset.put(topicPartition, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
Expand All @@ -415,7 +416,7 @@ private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}
Expand All @@ -436,7 +437,7 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
Expand All @@ -457,7 +458,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
Expand Down

0 comments on commit a64b383

Please sign in to comment.