Skip to content

Commit

Permalink
Issue 1024: Message Router should take numPartitions as parameter i…
Browse files Browse the repository at this point in the history
…n choosing partition (#1025)

* Expose partition number in choosePartition method

* Use `ThreadLocalRandom`
  • Loading branch information
sijie authored and rdhabalia committed Jan 9, 2018
1 parent efd4189 commit 1fb1938
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 41 deletions.
Expand Up @@ -27,7 +27,24 @@ public interface MessageRouter extends Serializable {
* @param msg * @param msg
* Message object * Message object
* @return The index of the partition to use for the message * @return The index of the partition to use for the message
* @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead.
*/ */
int choosePartition(Message msg); @Deprecated
default int choosePartition(Message msg) {
throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
}

/**
* Choose a partition based on msg and the topic metadata.
*
* @param msg message to route
* @param metadata topic metadata
* @return the partition to route the message.
* @since 1.22.0
*/
@SuppressWarnings("deprecation")
default int choosePartition(Message msg, TopicMetadata metadata) {
return choosePartition(msg);
}


} }
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl; import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
Expand Down Expand Up @@ -224,27 +225,25 @@ public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
} }


/** /**
* Get the message router object * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
* *
* @return * @return message router.
* @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already passed as parameter in
* {@link MessageRouter#choosePartition(Message, TopicMetadata)}.
* @see MessageRouter
*/ */
@Deprecated
public MessageRouter getMessageRouter(int numPartitions) { public MessageRouter getMessageRouter(int numPartitions) {
MessageRouter messageRouter; return customMessageRouter;

}
switch (messageRouteMode) {
case CustomPartition:
checkNotNull(customMessageRouter);
messageRouter = customMessageRouter;
break;
case RoundRobinPartition:
messageRouter = new RoundRobinPartitionMessageRouterImpl(numPartitions);
break;
case SinglePartition:
default:
messageRouter = new SinglePartitionMessageRouterImpl(numPartitions);
}


return messageRouter; /**
* Get the message router set by {@link #setMessageRouter(MessageRouter)}.
*
* @return message router set by {@link #setMessageRouter(MessageRouter)}.
*/
public MessageRouter getMessageRouter() {
return customMessageRouter;
} }


/** /**
Expand Down
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Metadata of a topic that can be used for message routing.
*/
public interface TopicMetadata {

/**
* Return the number of partitions per topic.
*
* @return the number of partitions per topic.
*/
int numPartitions();

}
Expand Up @@ -19,9 +19,11 @@
package org.apache.pulsar.client.impl; package org.apache.pulsar.client.impl;


import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;


import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


Expand All @@ -30,7 +32,9 @@
import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -41,20 +45,43 @@
public class PartitionedProducerImpl extends ProducerBase { public class PartitionedProducerImpl extends ProducerBase {


private List<ProducerImpl> producers; private List<ProducerImpl> producers;
private int numPartitions;
private MessageRouter routerPolicy; private MessageRouter routerPolicy;
private final ProducerStats stats; private final ProducerStats stats;
private final TopicMetadata topicMetadata;


public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, int numPartitions, public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, int numPartitions,
CompletableFuture<Producer> producerCreatedFuture) { CompletableFuture<Producer> producerCreatedFuture) {
super(client, topic, conf, producerCreatedFuture); super(client, topic, conf, producerCreatedFuture);
this.producers = Lists.newArrayListWithCapacity(numPartitions); this.producers = Lists.newArrayListWithCapacity(numPartitions);
this.numPartitions = numPartitions; this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = conf.getMessageRouter(numPartitions); this.routerPolicy = getMessageRouter();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null; stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null;
start(); start();
} }


private MessageRouter getMessageRouter() {
MessageRouter messageRouter;

MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();
MessageRouter customMessageRouter = conf.getMessageRouter();

switch (messageRouteMode) {
case CustomPartition:
checkNotNull(customMessageRouter);
messageRouter = customMessageRouter;
break;
case RoundRobinPartition:
messageRouter = new RoundRobinPartitionMessageRouterImpl();
break;
case SinglePartition:
default:
messageRouter = new SinglePartitionMessageRouterImpl(
ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()));
}

return messageRouter;
}

@Override @Override
public String getProducerName() { public String getProducerName() {
return producers.get(0).getProducerName(); return producers.get(0).getProducerName();
Expand All @@ -70,7 +97,7 @@ public long getLastSequenceId() {
private void start() { private void start() {
AtomicReference<Throwable> createFail = new AtomicReference<Throwable>(); AtomicReference<Throwable> createFail = new AtomicReference<Throwable>();
AtomicInteger completed = new AtomicInteger(); AtomicInteger completed = new AtomicInteger();
for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) { for (int partitionIndex = 0; partitionIndex < topicMetadata.numPartitions(); partitionIndex++) {
String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString(); String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString();
ProducerImpl producer = new ProducerImpl(client, partitionName, conf, new CompletableFuture<Producer>(), ProducerImpl producer = new ProducerImpl(client, partitionName, conf, new CompletableFuture<Producer>(),
partitionIndex); partitionIndex);
Expand All @@ -85,7 +112,7 @@ private void start() {
// due to any // due to any
// failure in one of the partitions and close the successfully // failure in one of the partitions and close the successfully
// created partitions // created partitions
if (completed.incrementAndGet() == numPartitions) { if (completed.incrementAndGet() == topicMetadata.numPartitions()) {
if (createFail.get() == null) { if (createFail.get() == null) {
setState(State.Ready); setState(State.Ready);
producerCreatedFuture().complete(PartitionedProducerImpl.this); producerCreatedFuture().complete(PartitionedProducerImpl.this);
Expand Down Expand Up @@ -121,8 +148,8 @@ public CompletableFuture<MessageId> sendAsync(Message message) {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
} }


int partition = routerPolicy.choosePartition(message); int partition = routerPolicy.choosePartition(message, topicMetadata);
checkArgument(partition >= 0 && partition < numPartitions, checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
"Illegal partition index chosen by the message routing policy"); "Illegal partition index chosen by the message routing policy");
return producers.get(partition).sendAsync(message); return producers.get(partition).sendAsync(message);
} }
Expand All @@ -147,7 +174,7 @@ public CompletableFuture<Void> closeAsync() {
setState(State.Closing); setState(State.Closing);


AtomicReference<Throwable> closeFail = new AtomicReference<Throwable>(); AtomicReference<Throwable> closeFail = new AtomicReference<Throwable>();
AtomicInteger completed = new AtomicInteger(numPartitions); AtomicInteger completed = new AtomicInteger(topicMetadata.numPartitions());
CompletableFuture<Void> closeFuture = new CompletableFuture<>(); CompletableFuture<Void> closeFuture = new CompletableFuture<>();
for (Producer producer : producers) { for (Producer producer : producers) {
if (producer != null) { if (producer != null) {
Expand Down Expand Up @@ -183,7 +210,7 @@ public synchronized ProducerStats getStats() {
return null; return null;
} }
stats.reset(); stats.reset();
for (int i = 0; i < numPartitions; i++) { for (int i = 0; i < topicMetadata.numPartitions(); i++) {
stats.updateCumulativeStats(producers.get(i).getStats()); stats.updateCumulativeStats(producers.get(i).getStats());
} }
return stats; return stats;
Expand Down
Expand Up @@ -22,26 +22,25 @@


import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TopicMetadata;


public class RoundRobinPartitionMessageRouterImpl implements MessageRouter { public class RoundRobinPartitionMessageRouterImpl implements MessageRouter {


private static final AtomicIntegerFieldUpdater<RoundRobinPartitionMessageRouterImpl> PARTITION_INDEX_UPDATER = private static final AtomicIntegerFieldUpdater<RoundRobinPartitionMessageRouterImpl> PARTITION_INDEX_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex"); AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex");
private volatile int partitionIndex = 0; private volatile int partitionIndex = 0;
private final int numPartitions;


public RoundRobinPartitionMessageRouterImpl(int numPartitions) { public RoundRobinPartitionMessageRouterImpl() {
this.numPartitions = numPartitions;
PARTITION_INDEX_UPDATER.set(this, 0); PARTITION_INDEX_UPDATER.set(this, 0);
} }


@Override @Override
public int choosePartition(Message msg) { public int choosePartition(Message msg, TopicMetadata topicMetadata) {
// If the message has a key, it supersedes the round robin routing policy // If the message has a key, it supersedes the round robin routing policy
if (msg.hasKey()) { if (msg.hasKey()) {
return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % numPartitions); return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % topicMetadata.numPartitions());
} }
return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & Integer.MAX_VALUE) % numPartitions); return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & Integer.MAX_VALUE) % topicMetadata.numPartitions());
} }


} }
Expand Up @@ -18,26 +18,23 @@
*/ */
package org.apache.pulsar.client.impl; package org.apache.pulsar.client.impl;


import java.util.Random;

import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TopicMetadata;


public class SinglePartitionMessageRouterImpl implements MessageRouter { public class SinglePartitionMessageRouterImpl implements MessageRouter {


private final int partitionIndex; private final int partitionIndex;
private final int numPartitions;


public SinglePartitionMessageRouterImpl(int numPartitions) { public SinglePartitionMessageRouterImpl(int partitionIndex) {
this.partitionIndex = new Random().nextInt(numPartitions); this.partitionIndex = partitionIndex;
this.numPartitions = numPartitions;
} }


@Override @Override
public int choosePartition(Message msg) { public int choosePartition(Message msg, TopicMetadata metadata) {
// If the message has a key, it supersedes the single partition routing policy // If the message has a key, it supersedes the single partition routing policy
if (msg.hasKey()) { if (msg.hasKey()) {
return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % numPartitions); return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % metadata.numPartitions());
} }


return partitionIndex; return partitionIndex;
Expand Down
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.TopicMetadata;

class TopicMetadataImpl implements TopicMetadata {

private final int numPartitions;

TopicMetadataImpl(int numPartitions) {
this.numPartitions = numPartitions;
}

@Override
public int numPartitions() {
return numPartitions;
}
}
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;

import org.testng.annotations.Test;

/**
* Unit test of {@link MessageRouter}.
*/
public class MessageRouterTest {

@SuppressWarnings("deprecation")
private static class TestMessageRouter implements MessageRouter {

@Override
public int choosePartition(Message msg) {
return 1234;
}
}

@SuppressWarnings("deprecation")
@Test
public void testChoosePartition() {
MessageRouter router = spy(new TestMessageRouter());
Message mockedMsg = mock(Message.class);
TopicMetadata mockedMetadata = mock(TopicMetadata.class);

assertEquals(1234, router.choosePartition(mockedMsg));
assertEquals(1234, router.choosePartition(mockedMsg, mockedMetadata));

verify(router, times(2)).choosePartition(eq(mockedMsg));
}

}

0 comments on commit 1fb1938

Please sign in to comment.