Skip to content

Commit

Permalink
Refactored ClientConfuguration to use ClientConfigurationData shared …
Browse files Browse the repository at this point in the history
…with ClientBuilderImpl (#1276)

* Refactored ClientConfuguration to use ClientConfigurationData shared with ClientBuilderImpl

* Fixed unit tests

* Fixed cloning issue after refactoring

* Fixed another test

* Fixed cloning issues

* Fixes for mock tests

* Fixed refactoring problem in TopicsConsumerImpl
  • Loading branch information
merlimat committed Feb 24, 2018
1 parent a72c912 commit 1a1a6e3
Show file tree
Hide file tree
Showing 47 changed files with 1,015 additions and 629 deletions.
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ flexible messaging model and an intuitive client API.</description>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>

<dependency> <dependency>
<groupId>org.testng</groupId> <groupId>org.testng</groupId>
<artifactId>testng</artifactId> <artifactId>testng</artifactId>
Expand Down Expand Up @@ -405,7 +405,7 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>jersey-container-servlet</artifactId> <artifactId>jersey-container-servlet</artifactId>
<version>2.23.2</version> <version>2.23.2</version>
</dependency> </dependency>

<dependency> <dependency>
<groupId>javax.ws.rs</groupId> <groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId> <artifactId>javax.ws.rs-api</artifactId>
Expand Down Expand Up @@ -629,6 +629,13 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>powermock-module-testng</artifactId> <artifactId>powermock-module-testng</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>


<build> <build>
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@


import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,49 +22,39 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import io.netty.buffer.ByteBuf;

public class RawReaderImpl implements RawReader { public class RawReaderImpl implements RawReader {


final static int DEFAULT_RECEIVER_QUEUE_SIZE = 1000; final static int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
private final PulsarClientImpl client; private final ConsumerConfigurationData consumerConfiguration;
private final String topic;
private final String subscription;
private final ConsumerConfiguration consumerConfiguration;
private RawConsumerImpl consumer; private RawConsumerImpl consumer;


public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer> consumerFuture) { CompletableFuture<Consumer> consumerFuture) {
this.client = client; consumerConfiguration = new ConsumerConfigurationData();
this.subscription = subscription; consumerConfiguration.getTopicNames().add(topic);
this.topic = topic; consumerConfiguration.setSubscriptionName(subscription);

consumerConfiguration = new ConsumerConfiguration();
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE); consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);


consumer = new RawConsumerImpl(client, topic, subscription, consumerConfiguration, consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture); consumerFuture);
} }


Expand Down Expand Up @@ -92,11 +82,10 @@ static class RawConsumerImpl extends ConsumerImpl {
final BlockingQueue<RawMessageAndCnx> incomingRawMessages; final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
final Queue<CompletableFuture<RawMessage>> pendingRawReceives; final Queue<CompletableFuture<RawMessage>> pendingRawReceives;


RawConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
CompletableFuture<Consumer> consumerFuture) { CompletableFuture<Consumer> consumerFuture) {
super(client, topic, subscription, conf, super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
client.externalExecutorProvider().getExecutor(), -1, consumerFuture, consumerFuture, SubscriptionMode.Durable, MessageId.earliest);
SubscriptionMode.Durable, MessageId.earliest);
incomingRawMessages = new GrowableArrayBlockingQueue<>(); incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>();
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
Expand Down Expand Up @@ -1151,17 +1152,18 @@ public void testClosingReplicationProducerTwice() throws Exception {
brokerService.getReplicationClients().put(remoteCluster, client); brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService); PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);


doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName), any()); doReturn(new CompletableFuture<Producer>()).when(clientImpl)
.createProducerAsync(any(ProducerConfigurationData.class));


replicator.startProducer(); replicator.startProducer();
verify(clientImpl).createProducerAsync(matches(globalTopicName), any()); verify(clientImpl).createProducerAsync(any(ProducerConfigurationData.class));


replicator.disconnect(false); replicator.disconnect(false);
replicator.disconnect(false); replicator.disconnect(false);


replicator.startProducer(); replicator.startProducer();


verify(clientImpl, Mockito.times(2)).createProducerAsync(matches(globalTopicName), any()); verify(clientImpl, Mockito.times(2)).createProducerAsync(any(ProducerConfigurationData.class));
} }


@Test @Test
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -242,7 +243,7 @@ public void testConcurrentReplicator() throws Exception {
} }
Thread.sleep(3000); Thread.sleep(3000);


Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject()); Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class));


} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -53,10 +53,11 @@ protected void cleanup() throws Exception {


@Test @Test
public void testSingleIpAddress() throws Exception { public void testSingleIpAddress() throws Exception {
ClientConfiguration conf = new ClientConfiguration(); ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test")); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool); conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);


List<InetAddress> result = Lists.newArrayList(); List<InetAddress> result = Lists.newArrayList();
result.add(InetAddress.getByName("127.0.0.1")); result.add(InetAddress.getByName("127.0.0.1"));
Expand All @@ -71,10 +72,11 @@ public void testSingleIpAddress() throws Exception {
public void testDoubleIpAddress() throws Exception { public void testDoubleIpAddress() throws Exception {
String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT; String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT;


ClientConfiguration conf = new ClientConfiguration(); ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test")); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool); conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);


List<InetAddress> result = Lists.newArrayList(); List<InetAddress> result = Lists.newArrayList();


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@
import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType; import org.apache.pulsar.common.api.Commands.ChecksumType;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -295,7 +296,7 @@ public void testChecksumVersionComptability() throws Exception {
((PulsarClientImpl) pulsarClient).timer().stop(); ((PulsarClientImpl) pulsarClient).timer().stop();


ClientCnx mockClientCnx = spy( ClientCnx mockClientCnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup())); new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion(); doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
prod.setClientCnx(mockClientCnx); prod.setClientCnx(mockClientCnx);


Expand Down Expand Up @@ -360,7 +361,7 @@ public void testChecksumReconnection() throws Exception {


// set clientCnx mock to get non-checksum supported version // set clientCnx mock to get non-checksum supported version
ClientCnx mockClientCnx = spy( ClientCnx mockClientCnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup())); new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion(); doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
prod.setClientCnx(mockClientCnx); prod.setClientCnx(mockClientCnx);


Expand Down Expand Up @@ -489,7 +490,7 @@ public void testCorruptMessageRemove() throws Exception {
MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build(); MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build();
future = producer.sendAsync(msg1); future = producer.sendAsync(msg1);
ClientCnx cnx = spy( ClientCnx cnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup())); new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
String exc = "broker is already stopped"; String exc = "broker is already stopped";
// when client-try to recover checksum by resending to broker: throw exception as broker is stopped // when client-try to recover checksum by resending to broker: throw exception as broker is stopped
doThrow(new IllegalStateException(exc)).when(cnx).ctx(); doThrow(new IllegalStateException(exc)).when(cnx).ctx();
Expand Down
Loading

0 comments on commit 1a1a6e3

Please sign in to comment.