Skip to content

Commit

Permalink
On batch-msg dispatching: broker should disconnect consumer which doe…
Browse files Browse the repository at this point in the history
…sn't support batch-message (#215)

* On batch-msg dispatching: broker should disconnect consumer which doesn't support batch-message

* Close consumer at broker-side without closing connection if consumer doesn't support batch-message

* Fail unsupportedBatchVersion-subscriber if topic has served batch-message
  • Loading branch information
rdhabalia committed Feb 24, 2017
1 parent 16a5221 commit dfbec5c
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public SubscriptionInvalidCursorPosition(String msg) {
super(msg);
}
}

public static class UnsupportedVersionException extends BrokerServiceException {
public UnsupportedVersionException(String msg) {
super(msg);
}
}

public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
if (t instanceof ServerMetadataException) {
Expand All @@ -109,6 +115,8 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
return PulsarApi.ServerError.PersistenceError;
} else if (t instanceof ConsumerBusyException) {
return PulsarApi.ServerError.ConsumerBusy;
} else if (t instanceof UnsupportedVersionException) {
return PulsarApi.ServerError.UnsupportedVersionError;
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck;
Expand Down Expand Up @@ -148,8 +149,20 @@ public Pair<ChannelPromise, Integer> sendMessages(final List<Entry> entries) {
return sentMessages;
}

sentMessages.setRight(updatePermitsAndPendingAcks(entries));

try {
sentMessages.setRight(updatePermitsAndPendingAcks(entries));
} catch (PulsarServerException pe) {
log.warn("[{}] [{}] consumer doesn't support batch-message {}", subscription, consumerId,
cnx.getRemoteEndpointProtocolVersion());

subscription.markTopicWithBatchMessagePublished();
sentMessages.setRight(0);
// disconnect consumer: it will update dispatcher's availablePermits and resend pendingAck-messages of this
// consumer to other consumer
disconnect();
return sentMessages;
}

ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
Expand Down Expand Up @@ -213,9 +226,11 @@ int getBatchSizeforEntry(ByteBuf metadataAndPayload) {
return -1;
}

int updatePermitsAndPendingAcks(final List<Entry> entries) {
int updatePermitsAndPendingAcks(final List<Entry> entries) throws PulsarServerException {
int permitsToReduce = 0;
Iterator<Entry> iter = entries.iterator();
boolean unsupportedVersion = false;
boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion();
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
Expand All @@ -232,11 +247,18 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) {
PositionImpl pos = PositionImpl.get((PositionImpl) entry.getPosition());
pendingAcks.put(pos, batchSize);
}
// check if consumer supports batch message
if (batchSize > 1 && !clientSupportBatchMessages) {
unsupportedVersion = true;
}
permitsToReduce += batchSize;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (unsupportedVersion) {
throw new PulsarServerException("Consumer does not support batch-message");
}
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -768,4 +769,8 @@ public String getRole() {
boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}

public boolean isBatchMessageCompatibleVersion() {
return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ public interface Subscription {
void redeliverUnacknowledgedMessages(Consumer consumer);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);

void markTopicWithBatchMessagePublished();
}
Original file line number Diff line number Diff line change
Expand Up @@ -592,5 +592,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List
dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
}

@Override
public void markTopicWithBatchMessagePublished() {
topic.markBatchMessagePublished();
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.NamingException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand Down Expand Up @@ -128,6 +129,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {

// Timestamp of when this topic was last seen active
private volatile long lastActive;

// Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
// doesn't support batch-message
private volatile boolean hasBatchMessagePublished = false;

private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
Expand Down Expand Up @@ -321,6 +326,14 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

final CompletableFuture<Consumer> future = new CompletableFuture<>();

if(hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if(log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
}
future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message"));
return future;
}

if (subscriptionName.startsWith(replicatorPrefix)) {
log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName);
future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted"));
Expand Down Expand Up @@ -1224,5 +1237,9 @@ public CompletableFuture<Void> clearBacklog(String cursorName) {
return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
}

public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -1064,6 +1065,38 @@ public void testSubscribeCommand() throws Exception {

channel.finish();
}

@Test(timeOut = 30000)
public void testUnsupportedBatchMsgSubscribeCommand() throws Exception {
final String failSubName = "failSub";

resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v3.getNumber());
doReturn(false).when(brokerService).isAuthenticationEnabled();
doReturn(false).when(brokerService).isAuthorizationEnabled();
// test SUBSCRIBE on topic and cursor creation success
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandSuccess);

PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName);
topicRef.markBatchMessagePublished();

// test SUBSCRIBE on topic and cursor creation success
clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive,
"test" /* consumer name */);
channel.writeInbound(clientCommand);
Object response = getResponse();
assertTrue(response instanceof CommandError);
assertTrue(((CommandError) response).getError().equals(ServerError.UnsupportedVersionError));

// Server will not close the connection
assertTrue(channel.isOpen());

channel.finish();
}

@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
Expand Down Expand Up @@ -1163,6 +1196,13 @@ private void setChannelConnected() throws Exception {
channelState.set(serverCnx, State.Connected);
}

private void setConnectionVersion(int version) throws Exception {
PulsarHandler cnx = (PulsarHandler) serverCnx;
Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
versionField.setAccessible(true);
versionField.set(cnx, version);
}

private Object getResponse() throws Exception {
// Wait at most for 10s to get a response
final long sleepTimeMs = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1856,4 +1856,4 @@ public void testRedeliveryFailOverConsumer() throws Exception {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,34 @@

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.namespace.OwnershipCache;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.HandlerBase.State;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
Expand All @@ -52,6 +65,11 @@ protected void setup() throws Exception {
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider
public Object[][] subType() {
return new Object[][] {{SubscriptionType.Shared}, {SubscriptionType.Failover}};
}


/**
Expand Down Expand Up @@ -226,5 +244,98 @@ public void testCloseBrokerService() throws Exception {

}

/**
* It verifies that consumer which doesn't support batch-message:
* <p>
* 1. broker disconnects that consumer
* <p>
* 2. redeliver all those messages to other supported consumer under the same subscription
*
* @param subType
* @throws Exception
*/
@Test(timeOut = 7000, dataProvider = "subType")
public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws Exception {
log.info("-- Starting {} test --", methodName);

final int batchMessageDelayMs = 1000;
final String topicName = "persistent://my-property/use/my-ns/my-topic1";
final String subscriptionName = "my-subscriber-name" + subType;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(subType);
ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(topicName, subscriptionName, conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

if (batchMessageDelayMs != 0) {
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerConf.setBatchingMaxMessages(20);
}

Producer producer = pulsarClient.createProducer(topicName, new ProducerConfiguration());
Producer batchProducer = pulsarClient.createProducer(topicName, producerConf);

// update consumer's version to incompatible batch-message version = Version.V3
Topic topic = pulsar.getBrokerService().getTopic(topicName).get();
com.yahoo.pulsar.broker.service.Consumer brokerConsumer = topic.getSubscriptions().get(subscriptionName)
.getConsumers().get(0);
Field cnxField = com.yahoo.pulsar.broker.service.Consumer.class.getDeclaredField("cnx");
cnxField.setAccessible(true);
PulsarHandler cnx = (PulsarHandler) cnxField.get(brokerConsumer);
Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
versionField.setAccessible(true);
versionField.set(cnx, 3);

// (1) send non-batch message: consumer should be able to consume
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Set<String> messageSet = Sets.newHashSet();
Message msg = null;
for (int i = 0; i < 10; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
consumer1.acknowledge(msg);
}

// Also set clientCnx of the consumer to null so, it avoid reconnection so, other consumer can consume for
// verification
consumer1.setClientCnx(null);
// (2) send batch-message which should not be able to consume: as broker will disconnect the consumer
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
batchProducer.sendAsync(message.getBytes());
}

Thread.sleep(batchMessageDelayMs);

// consumer should have not received any message as it should have been disconnected
msg = consumer1.receive(2, TimeUnit.SECONDS);
assertNull(msg);

// subscrie consumer2 with supporting batch version
pulsarClient = PulsarClient.create(brokerUrl.toString());
Consumer consumer2 = pulsarClient.subscribe(topicName, subscriptionName, conf);

messageSet.clear();
for (int i = 0; i < 10; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
consumer2.acknowledge(msg);
}

consumer2.close();
producer.close();
batchProducer.close();
log.info("-- Exiting {} test --", methodName);
}

}
Loading

0 comments on commit dfbec5c

Please sign in to comment.