Skip to content

Commit

Permalink
Expose control of sequence id in Java producer API (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Sep 15, 2017
1 parent dd635ac commit d4a8c25
Show file tree
Hide file tree
Showing 24 changed files with 499 additions and 116 deletions.
Expand Up @@ -42,17 +42,17 @@ public abstract class AbstractReplicator {

protected volatile ProducerImpl producer;

protected static final ProducerConfiguration producerConfiguration = new ProducerConfiguration()
.setSendTimeout(0, TimeUnit.SECONDS).setBlockIfQueueFull(true);
protected final int producerQueueSize;
protected final ProducerConfiguration producerConfiguration;

protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);

protected final String replicatorPrefix;

protected static final AtomicReferenceFieldUpdater<AbstractReplicator, State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(AbstractReplicator.class, State.class, "state");
private volatile State state = State.Stopped;

protected enum State {
Stopped, Starting, Started, Stopping
}
Expand All @@ -66,6 +66,12 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.remoteCluster = remoteCluster;
this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerConfiguration = new ProducerConfiguration();
this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped);
}

Expand All @@ -74,9 +80,13 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
protected abstract Position getReplicatorReadPosition();

protected abstract long getNumberOfEntriesInBacklog();

protected abstract void disableReplicatorRead();


public ProducerConfiguration getProducerConfiguration() {
return producerConfiguration;
}

public String getRemoteCluster() {
return remoteCluster;
}
Expand Down Expand Up @@ -111,23 +121,22 @@ public synchronized void startProducer() {
}

log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
client.createProducerAsync(topicName, producerConfiguration, getReplicatorName(replicatorPrefix, localCluster))
.thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);

// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
return null;
});
client.createProducerAsync(topicName, producerConfiguration).thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);

// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
return null;
});

}

Expand Down Expand Up @@ -196,10 +205,6 @@ protected boolean isWritable() {
return producer != null && producer.isWritable();
}

public static void setReplicatorQueueSize(int queueSize) {
producerConfiguration.setMaxPendingMessages(queueSize);
}

public static String getRemoteCluster(String remoteCursor) {
String[] split = remoteCursor.split("\\.");
return split[split.length - 1];
Expand Down
Expand Up @@ -262,7 +262,6 @@ public void recordLatency(EventType eventType, long latencyMs) {
pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
}
};
PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}

public void start() throws Exception {
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.DestinationName;
Expand Down Expand Up @@ -185,6 +186,18 @@ public void recordMessageDrop(int batchSize) {
}
}

/**
* Return the sequence id of
* @return
*/
public long getLastSequenceId() {
if (isNonPersistentTopic) {
return -1;
} else {
return ((PersistentTopic) topic).getLastPublishedSequenceId(producerName);
}
}

private static final class MessagePublishContext implements PublishContext, Runnable {
private Producer producer;
private long sequenceId;
Expand Down
Expand Up @@ -509,7 +509,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName));
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
producer.getLastSequenceId()));
return;
} else {
// The producer's future was completed before by
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,8 +52,6 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St
BrokerService brokerService) {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);

producerConfiguration
.setMaxPendingMessages(brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize());
producerConfiguration.setBlockIfQueueFull(false);

startProducer();
Expand Down
Expand Up @@ -417,5 +417,10 @@ public synchronized void purgeInactiveProducers() {
}
}

public long getLastPublishedSequenceId(String producerName) {
Long sequenceId = highestSequencedPushed.get(producerName);
return sequenceId != null ? sequenceId : -1;
}

private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class);
}
Expand Up @@ -58,7 +58,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final PersistentTopic topic;
private final ManagedCursor cursor;

private final int producerQueueSize;

private static final int MaxReadBatchSize = 100;
private int readBatchSize;

Expand Down Expand Up @@ -97,7 +97,6 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);

producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
readBatchSize = Math.min(producerQueueSize, MaxReadBatchSize);
producerQueueThreshold = (int) (producerQueueSize * 0.9);

Expand Down Expand Up @@ -139,14 +138,14 @@ protected Position getReplicatorReadPosition() {
protected long getNumberOfEntriesInBacklog() {
return cursor.getNumberOfEntriesInBacklog();
}

@Override
protected void disableReplicatorRead() {
// deactivate cursor after successfully close the producer
this.cursor.setInactive();
}


protected void readMoreEntries() {
int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);

Expand Down
Expand Up @@ -1468,5 +1468,9 @@ public DispatchRateLimiter getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}

public long getLastPublishedSequenceId(String producerName) {
return messageDeduplication.getLastPublishedSequenceId(producerName);
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
}
Expand Up @@ -142,7 +142,7 @@ public void setup() throws Exception {
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(configCacheService).when(pulsar).getConfigurationCache();
doReturn(Optional.empty()).when(zkDataCache).get(anyString());

LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
doReturn(zkDataCache).when(zkCache).policiesCache();
Expand Down Expand Up @@ -945,31 +945,28 @@ public void testClosingReplicationProducerTwice() throws Exception {
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();

PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
String remoteReplicatorName = topic.replicatorPrefix + "." + localCluster;

final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
PulsarClient client = spy( PulsarClient.create(brokerUrl.toString()) );
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
Field conf = AbstractReplicator.class.getDeclaredField("producerConfiguration");
conf.setAccessible(true);

ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);

doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());

replicator.startProducer();
verify(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
verify(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());

replicator.disconnect(false);
replicator.disconnect(false);

replicator.startProducer();

verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
}

}
Expand Up @@ -212,8 +212,7 @@ public void testConcurrentReplicator() throws Exception {
}
Thread.sleep(3000);

Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject(),
Mockito.anyString());
Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject());

}

Expand Down Expand Up @@ -623,7 +622,7 @@ public Void call() throws Exception {
/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
*
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -750,7 +749,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception {
/**
* It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and
* it should closed the producer as cursor is already closed because replicator is already deleted.
*
*
* @throws Exception
*/
@Test(timeOut = 5000)
Expand Down

0 comments on commit d4a8c25

Please sign in to comment.