Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-68: WaitForExclusive producer access mode #8992

Merged
merged 4 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -111,6 +114,8 @@ public abstract class AbstractTopic implements Topic {

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers =
new ConcurrentLinkedQueue<>();

private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
Expand Down Expand Up @@ -337,14 +342,15 @@ public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schem
}

@Override
public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
public CompletableFuture<Optional<Long>> addProducer(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
checkArgument(producer.getTopic() == this);

CompletableFuture<Optional<Long>> future = new CompletableFuture<>();

incrementTopicEpochIfNeeded(producer)
.thenAccept(epoch -> {
lock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @merlimat @sijie , this place makes me a little puzzled and I would like to ask why we need to change readlock to writelock?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incrementTopicEpochIfNeeded(producer, producerQueuedFuture)
.thenAccept(producerEpoch -> {
lock.writeLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
checkTopicFenced();
Expand All @@ -360,11 +366,11 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
USAGE_COUNT_UPDATER.get(this));
}

future.complete(epoch);
future.complete(producerEpoch);
} catch (Throwable e) {
future.completeExceptionally(e);
} finally {
lock.readLock().unlock();
lock.writeLock().unlock();
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
Expand All @@ -374,12 +380,13 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
return future;
}

protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
lock.writeLock().lock();
try {
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer) {
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerBusyException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else {
Expand All @@ -388,7 +395,7 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
}

case Exclusive:
if (hasExclusiveProducer) {
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else if (!producers.isEmpty()) {
Expand All @@ -410,17 +417,52 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
return null;
});

return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
}).exceptionally(ex -> {
});
}

case WaitForExclusive: {
if (hasExclusiveProducer || !producers.isEmpty()) {
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
log.info("[{}] Queuing producer {} since there's already a producer", topic, producer);
waitingExclusiveProducers.add(Pair.of(producer, future));
producerQueuedFuture.complete(null);
return future;
} else if (producer.getTopicEpoch().isPresent()
&& producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
// If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
// to be fenced, because a new producer had been present in between.
return FutureUtil.failedFuture(new ProducerFencedException(
String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
topicEpoch.get(), producer.getTopicEpoch().get())));
} else {
// There are currently no existing producers
hasExclusiveProducer = true;

CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(producer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
return null;
});
}

// case WaitForExclusive:
// TODO: Implementation
return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
});
}
}

default:
return FutureUtil.failedFuture(
Expand Down Expand Up @@ -584,7 +626,35 @@ protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
if (newCount == 0) {
hasExclusiveProducer = false;
lock.writeLock().lock();
try {
hasExclusiveProducer = false;
Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer =
waitingExclusiveProducers.poll();
if (nextWaitingProducer != null) {
Producer nextProducer = nextWaitingProducer.getKey();
CompletableFuture<Optional<Long>> producerFuture = nextWaitingProducer.getValue();
hasExclusiveProducer = true;

CompletableFuture<Long> future;
if (nextProducer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(nextProducer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}

future.thenAccept(epoch -> {
topicEpoch = Optional.of(epoch);
producerFuture.complete(topicEpoch);
}).exceptionally(ex -> {
hasExclusiveProducer = false;
producerFuture.completeExceptionally(ex);
return null;
});
}
} finally {
lock.writeLock().unlock();
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
Expand Down Expand Up @@ -733,6 +803,10 @@ public TopicPolicies getTopicPolicies(TopicName topicName) {
}
}

protected int getWaitingProducersCount() {
return waitingExclusiveProducers.size();
}

protected boolean isExceedMaximumMessageSize(int size) {
Integer maxMessageSize = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public interface PulsarCommandSender {
void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);

void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch);
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady);

void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ public void sendProducerSuccessResponse(long requestId, String producerName, Sch

@Override
public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady) {
PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
schemaVersion, topicEpoch);
schemaVersion, topicEpoch, isProducerReady);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
command.getProducerSuccess().recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,17 +1152,18 @@ protected void handleProducer(final CommandProducer cmdProducer) {
});

schemaVersionFuture.thenAccept(schemaVersion -> {
CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>();
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName,
getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, producerAccessMode, topicEpoch);

topic.addProducer(producer).thenAccept(newTopicEpoch -> {
topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch);
newTopicEpoch, true /* producer is ready now */);
return;
} else {
// The producer's future was completed before by
Expand Down Expand Up @@ -1192,6 +1193,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}
return null;
});

producerQueuedFuture.thenRun(() -> {
// If the producer is queued waiting, we will get an immediate notification
// that we need to pass to client
if (isActive()) {
log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
}
});
});
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ default long getOriginalHighestSequenceId() {
* Tries to add a producer to the topic. Several validations will be performed.
*
* @param producer
* @param producerQueuedFuture
* a future that will be triggered if the producer is being queued up prior of getting established
* @return the "topic epoch" if there is one or empty
*/
CompletableFuture<Optional<Long>> addProducer(Producer producer);
CompletableFuture<Optional<Long>> addProducer(Producer producer, CompletableFuture<Void> producerQueuedFuture);

void removeProducer(Producer producer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.waitingPublishers = getWaitingProducersCount();

subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStats subStats = subscription.getStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,14 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
}

@Override
public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
return super.addProducer(producer).thenApply(epoch -> {
public CompletableFuture<Optional<Long>> addProducer(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
return super.addProducer(producer, producerQueuedFuture).thenApply(topicEpoch -> {
messageDeduplication.producerAdded(producer.getProducerName());

// Start replication producers if not already
startReplProducers();
return epoch;
return topicEpoch;
});
}

Expand Down Expand Up @@ -1642,6 +1643,7 @@ public TopicStats getStats(boolean getPreciseBacklog) {
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.msgChunkPublished = this.msgChunkPublished;
stats.waitingPublishers = getWaitingProducersCount();

subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
Expand Down
Loading