Skip to content

Commit

Permalink
Introduce ActiveConsumerListener for realizing if a consumer is activ…
Browse files Browse the repository at this point in the history
…e in a failover subscription group (#1156)

* Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group

* Rename ConsumerGroupListener to ActiveConsumerListener

* Fail subscribe if active consumer listener is provided with non-failover subscription.

* Notify consumer state change only after the cursor is rewinded.

* Address comments

* Rename ActiveConsumerListener to ConsumerEventListener
Use consumer object as identifier for comparison

* Fix test after rebase

* Fix license headers

* Ignore active consumer change command in cpp client

* rename "become" to "became"
  • Loading branch information
sijie authored and merlimat committed Feb 15, 2018
1 parent 350d297 commit f2160c0
Show file tree
Hide file tree
Showing 19 changed files with 993 additions and 27 deletions.
Expand Up @@ -26,9 +26,9 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,20 +72,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part

protected abstract boolean isConsumersExceededOnSubscription();

protected void pickAndScheduleActiveConsumer() {
protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
if (null != activeConsumer && subscriptionType == SubType.Failover) {
consumers.forEach(consumer ->
consumer.notifyActiveConsumerChange(activeConsumer));
}
}

/**
* @return the previous active consumer if the consumer is changed, otherwise null.
*/
protected boolean pickAndScheduleActiveConsumer() {
checkArgument(!consumers.isEmpty());

consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName()));

int index = partitionIndex % consumers.size();
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));

if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) {
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (prevConsumer == activeConsumer) {
// Active consumer did not change. Do nothing at this point
return;
return false;
} else {
// If the active consumer is changed, send notification.
scheduleReadOnActiveConsumer();
return true;
}

scheduleReadOnActiveConsumer();
}

public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
Expand All @@ -109,8 +122,17 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

consumers.add(consumer);

// Pick an active consumer and start it
pickAndScheduleActiveConsumer();
if (!pickAndScheduleActiveConsumer()) {
// the active consumer is not changed
Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (null == currentActiveConsumer) {
if (log.isDebugEnabled()) {
log.debug("Current active consumer disappears while adding consumer {}", consumer);
}
} else {
consumer.notifyActiveConsumerChange(currentActiveConsumer);
}
}

}

Expand Down
Expand Up @@ -152,6 +152,21 @@ public String consumerName() {
return consumerName;
}

void notifyActiveConsumerChange(Consumer activeConsumer) {
if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) {
// if the client is older than `v12`, we don't need to send consumer group changes.
return;
}

if (log.isDebugEnabled()) {
log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}",
consumerId, topicName, subscription.getName(), activeConsumer);
}
cnx.ctx().writeAndFlush(
Commands.newActiveConsumerChange(consumerId, this == activeConsumer),
cnx.ctx().voidPromise());
}

public boolean readCompacted() {
return readCompacted;
}
Expand Down
Expand Up @@ -85,7 +85,10 @@ protected void scheduleReadOnActiveConsumer() {
log.debug("[{}] Rewind cursor and read more entries without delay", name);
}
cursor.rewind();
readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));

Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
return;
}

Expand All @@ -102,7 +105,10 @@ protected void scheduleReadOnActiveConsumer() {
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
cursor.rewind();
readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));

Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
}
Expand Down
Expand Up @@ -22,23 +22,29 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.matches;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand All @@ -57,17 +63,18 @@
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -83,9 +90,12 @@ public class PersistentDispatcherFailoverConsumerTest {
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private ServerCnx serverCnxWithOldVersion;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
private ConfigurationCacheService configCacheService;
private ChannelHandlerContext channelCtx;
private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;

final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic";
final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic";
Expand Down Expand Up @@ -115,10 +125,50 @@ public void setup() throws Exception {
brokerService = spy(new BrokerService(pulsar));
doReturn(brokerService).when(pulsar).getBrokerService();

consumerChanges = new LinkedBlockingQueue<>();
this.channelCtx = mock(ChannelHandlerContext.class);
doAnswer(invocationOnMock -> {
ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);

ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4);
try {
int cmdSize = (int) cmdBuf.readUnsignedInt();
int writerIndex = cmdBuf.writerIndex();
cmdBuf.writerIndex(cmdBuf.readerIndex() + cmdSize);
ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(cmdBuf);

BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
cmdBuilder.recycle();
cmdBuf.writerIndex(writerIndex);
cmdInputStream.recycle();

if (cmd.hasActiveConsumerChange()) {
consumerChanges.put(cmd.getActiveConsumerChange());
}
cmd.recycle();
} finally {
cmdBuf.release();
}

return null;
}).when(channelCtx).writeAndFlush(any(), any());

serverCnx = spy(new ServerCnx(brokerService));
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
when(serverCnx.ctx()).thenReturn(channelCtx);

serverCnxWithOldVersion = spy(new ServerCnx(brokerService));
doReturn(true).when(serverCnxWithOldVersion).isActive();
doReturn(true).when(serverCnxWithOldVersion).isWritable();
doReturn(new InetSocketAddress("localhost", 1234))
.when(serverCnxWithOldVersion).clientAddress();
when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion())
.thenReturn(ProtocolVersion.v11.getNumber());
when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx);

NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
Expand Down Expand Up @@ -193,6 +243,51 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
}

private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
long consumerId,
boolean isActive) {
assertEquals(consumerId, change.getConsumerId());
assertEquals(isActive, change.getIsActive());
change.recycle();
}

@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);

int partitionIndex = 0;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic);

// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());

// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
assertEquals(1, consumers.size());
assertNull(consumerChanges.poll());

verify(channelCtx, times(0)).write(any());

// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
assertEquals(2, consumers.size());

CommandActiveConsumerChange change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);

verify(channelCtx, times(1)).writeAndFlush(any(), any());
}

@Test
public void testAddRemoveConsumer() throws Exception {
log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
Expand All @@ -208,13 +303,16 @@ public void testAddRemoveConsumer() throws Exception {
assertFalse(pdfc.isConsumerConnected());

// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */);
false /* read compacted */));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
assertEquals(1, consumers.size());
CommandActiveConsumerChange change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);
verify(consumer1, times(1)).notifyActiveConsumerChange(same(consumer1));

// 3. Add again, duplicate allowed
pdfc.addConsumer(consumer1);
Expand All @@ -224,31 +322,57 @@ public void testAddRemoveConsumer() throws Exception {

// 4. Verify active consumer
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
// get the notified with who is the leader
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */);
Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */));
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(3, consumers.size());
// get notified with who is the leader
change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));

// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */);
false /* read compacted */));
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
assertEquals(4, consumers.size());

// all consumers will receive notifications
change = consumerChanges.take();
verifyActiveConsumerChange(change, 0, true);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);
verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0));
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer0));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer0));

// 7. Remove last consumer
pdfc.removeConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
assertEquals(3, consumers.size());
// not consumer group changes
assertNull(consumerChanges.poll());

// 8. Verify if we can unsubscribe when more than one consumer is connected
// 8. Verify if we cannot unsubscribe when more than one consumer is connected
assertFalse(pdfc.canUnsubscribe(consumer0));

// 9. Remove active consumer
Expand All @@ -257,6 +381,12 @@ public void testAddRemoveConsumer() throws Exception {
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(2, consumers.size());

// the remaining consumers will receive notifications
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);

// 10. Attempt to remove already removed consumer
String cause = "";
try {
Expand All @@ -271,10 +401,11 @@ public void testAddRemoveConsumer() throws Exception {
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(1, consumers.size());
// not consumer group changes
assertNull(consumerChanges.poll());

// 11. With only one consumer, unsubscribe is allowed
assertTrue(pdfc.canUnsubscribe(consumer1));

}

@Test
Expand Down

0 comments on commit f2160c0

Please sign in to comment.