Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add epoch for connection handler to handle create producer timeout. #5571

Merged
merged 6 commits into from Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,10 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -38,8 +40,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +53,7 @@ public abstract class AbstractTopic implements Topic {
protected final String topic;

// Producers currently connected to this topic
protected final ConcurrentOpenHashSet<Producer> producers;
protected final ConcurrentHashMap<String, Producer> producers;

protected final BrokerService brokerService;

Expand Down Expand Up @@ -86,7 +86,7 @@ public abstract class AbstractTopic implements Topic {
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
this.producers = new ConcurrentOpenHashSet<>(16, 1);
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.lastActive = System.nanoTime();
Expand Down Expand Up @@ -123,7 +123,7 @@ protected boolean isProducersExceeded() {

protected boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.forEach(producer -> {
producers.values().forEach(producer -> {
if (!producer.isRemote()) {
foundLocal.set(true);
}
Expand All @@ -138,7 +138,7 @@ public String toString() {
}

@Override
public ConcurrentOpenHashSet<Producer> getProducers() {
public Map<String, Producer> getProducers() {
return producers;
}

Expand Down Expand Up @@ -258,19 +258,66 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
@Override
public void resetPublishCountAndEnableReadIfRequired() {
if (this.publishRateLimiter.resetPublishCount()) {
enableProduerRead();
enableProducerRead();
}
}

/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
*/
protected void enableProduerRead() {
protected void enableProducerRead() {
if (producers != null) {
producers.forEach(producer -> producer.getCnx().enableCnxAutoRead());
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}

protected void checkTopicFenced() throws BrokerServiceException {
if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
}

if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
}
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
boolean canOverwrite = false;
if (oldProducer.equals(newProducer) && !oldProducer.isUserProvidedProducerName()
&& !newProducer.isUserProvidedProducerName() && newProducer.getEpoch() > oldProducer.getEpoch()) {
oldProducer.close(false);
canOverwrite = true;
}
if (canOverwrite) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be simplified with

if (canOverwrite && !producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
      throw new BrokerServiceException.NamingException(
                    "Producer with name '" + newProducer.getProducerName() + "' is already connected 
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be if can simplified with

if (!canOverwrite || !producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
    throw new BrokerServiceException.NamingException(
                    "Producer with name '" + newProducer.getProducerName() + "' is already connected")
}

Right?

if(!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
} else {
handleProducerRemoved(oldProducer);
}
} else {
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
}
}

protected abstract void handleProducerRemoved(Producer producer);

@Override
public boolean isPublishRateExceeded() {
return this.publishRateLimiter.isPublishRateExceeded();
Expand Down Expand Up @@ -304,7 +351,7 @@ private void updatePublishDispatcher(Policies policies) {
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.publishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProduerRead();
enableProducerRead();
}
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -184,9 +185,9 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
*/
private void disconnectProducers(PersistentTopic persistentTopic) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
ConcurrentOpenHashSet<Producer> producers = persistentTopic.getProducers();
Map<String, Producer> producers = persistentTopic.getProducers();

producers.forEach(producer -> {
producers.values().forEach(producer -> {
log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer",
producer.getProducerName(), persistentTopic.getName());
futures.add(producer.disconnect());
Expand Down
Expand Up @@ -61,6 +61,8 @@ public class Producer {
private final Topic topic;
private final ServerCnx cnx;
private final String producerName;
private final long epoch;
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
private Rate msgIn;
Expand All @@ -86,11 +88,14 @@ public class Producer {
private final SchemaVersion schemaVersion;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion) {
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
this.producerName = checkNotNull(producerName);
this.userProvidedProducerName = userProvidedProducerName;
this.epoch = epoch;
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
Expand Down Expand Up @@ -216,7 +221,7 @@ private void publishOperationCompleted() {
if (newPendingPublishAcks == 0 && !closeFuture.isDone()) {
synchronized (this) {
if (isClosed && !closeFuture.isDone()) {
closeNow();
closeNow(true);
}
}
}
Expand Down Expand Up @@ -405,7 +410,7 @@ public String toString() {
*
* @return completable future indicate completion of close
*/
public synchronized CompletableFuture<Void> close() {
public synchronized CompletableFuture<Void> close(boolean removeFromTopic) {
if (log.isDebugEnabled()) {
log.debug("Closing producer {} -- isClosed={}", this, isClosed);
}
Expand All @@ -417,14 +422,16 @@ public synchronized CompletableFuture<Void> close() {
cnx.isActive(), pendingPublishAcks);
}
if (!cnx.isActive() || pendingPublishAcks == 0) {
closeNow();
closeNow(removeFromTopic);
}
}
return closeFuture;
}

void closeNow() {
topic.removeProducer(this);
void closeNow(boolean removeFromTopic) {
if (removeFromTopic) {
topic.removeProducer(this);
}
cnx.removedProducer(this);

if (log.isDebugEnabled()) {
Expand All @@ -444,7 +451,7 @@ public CompletableFuture<Void> disconnect() {
log.info("Disconnecting producer: {}", this);
cnx.ctx().executor().execute(() -> {
cnx.closeProducer(this);
closeNow();
closeNow(true);
});
}
return closeFuture;
Expand Down Expand Up @@ -478,6 +485,14 @@ public boolean isNonPersistentTopic() {
return isNonPersistentTopic;
}

public long getEpoch() {
return epoch;
}

public boolean isUserProvidedProducerName() {
return userProvidedProducerName;
}

@VisibleForTesting
long getPendingPublishAcks() {
return pendingPublishAcks;
Expand Down
Expand Up @@ -183,7 +183,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
producers.values().forEach((producerFuture) -> {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow();
producer.closeNow(true);
}
});

Expand Down Expand Up @@ -817,6 +817,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final long epoch = cmdProducer.getEpoch();
final boolean userProvidedProducerName = cmdProducer.getUserProvidedProducerName();
final boolean isEncrypted = cmdProducer.getEncrypted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;
Expand Down Expand Up @@ -938,7 +940,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {

schemaVersionFuture.thenAccept(schemaVersion -> {
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata, schemaVersion);
isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName);

try {
topic.addProducer(producer);
Expand All @@ -952,12 +954,12 @@ protected void handleProducer(final CommandProducer cmdProducer) {
} else {
// The producer's future was completed before by
// a close command
producer.closeNow();
producer.closeNow(true);
log.info("[{}] Cleared producer created after timeout on client side {}",
remoteAddress, producer);
}
} else {
producer.closeNow();
producer.closeNow(true);
log.info("[{}] Cleared producer created after connection was closed: {}",
remoteAddress, producer);
producerFuture.completeExceptionally(
Expand Down Expand Up @@ -1214,7 +1216,7 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
Producer producer = producerFuture.getNow(null);
log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress);

producer.close().thenAccept(v -> {
producer.close(true).thenAccept(v -> {
log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(),
remoteAddress);
ctx.writeAndFlush(Commands.newSuccess(requestId));
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;

Expand Down Expand Up @@ -103,7 +102,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> delete();

ConcurrentOpenHashSet<Producer> getProducers();
Map<String, Producer> getProducers();

String getName();

Expand Down