Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add epoch for connection handler to handle create producer timeout. #5571

Merged
merged 6 commits into from Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,10 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -38,8 +40,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +53,7 @@ public abstract class AbstractTopic implements Topic {
protected final String topic;

// Producers currently connected to this topic
protected final ConcurrentOpenHashSet<Producer> producers;
protected final ConcurrentHashMap<String, Producer> producers;

protected final BrokerService brokerService;

Expand Down Expand Up @@ -86,7 +86,7 @@ public abstract class AbstractTopic implements Topic {
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
this.producers = new ConcurrentOpenHashSet<>(16, 1);
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.lastActive = System.nanoTime();
Expand Down Expand Up @@ -123,7 +123,7 @@ protected boolean isProducersExceeded() {

protected boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.forEach(producer -> {
producers.values().forEach(producer -> {
if (!producer.isRemote()) {
foundLocal.set(true);
}
Expand All @@ -138,7 +138,7 @@ public String toString() {
}

@Override
public ConcurrentOpenHashSet<Producer> getProducers() {
public Map<String, Producer> getProducers() {
return producers;
}

Expand Down Expand Up @@ -258,16 +258,59 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
@Override
public void resetPublishCountAndEnableReadIfRequired() {
if (this.publishRateLimiter.resetPublishCount()) {
enableProduerRead();
enableProducerRead();
}
}

/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
*/
protected void enableProduerRead() {
protected void enableProducerRead() {
if (producers != null) {
producers.forEach(producer -> producer.getCnx().enableCnxAutoRead());
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}

protected void checkTopicFenced() throws BrokerServiceException {
if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
}

if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
}
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
boolean canOverwrite = false;
if (oldProducer.equals(newProducer) && !oldProducer.isUserProvidedProducerName()
&& !newProducer.isUserProvidedProducerName() && newProducer.getEpoch() > oldProducer.getEpoch()) {
oldProducer.close();
canOverwrite = true;
}
if (canOverwrite) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be simplified with

if (canOverwrite && !producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
      throw new BrokerServiceException.NamingException(
                    "Producer with name '" + newProducer.getProducerName() + "' is already connected 
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be if can simplified with

if (!canOverwrite || !producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
    throw new BrokerServiceException.NamingException(
                    "Producer with name '" + newProducer.getProducerName() + "' is already connected")
}

Right?

if(!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
}
} else {
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
}
}

Expand Down Expand Up @@ -304,7 +347,7 @@ private void updatePublishDispatcher(Policies policies) {
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.publishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProduerRead();
enableProducerRead();
}
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -184,9 +185,9 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
*/
private void disconnectProducers(PersistentTopic persistentTopic) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
ConcurrentOpenHashSet<Producer> producers = persistentTopic.getProducers();
Map<String, Producer> producers = persistentTopic.getProducers();

producers.forEach(producer -> {
producers.values().forEach(producer -> {
log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer",
producer.getProducerName(), persistentTopic.getName());
futures.add(producer.disconnect());
Expand Down
Expand Up @@ -61,6 +61,8 @@ public class Producer {
private final Topic topic;
private final ServerCnx cnx;
private final String producerName;
private final long epoch;
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
private Rate msgIn;
Expand All @@ -86,11 +88,14 @@ public class Producer {
private final SchemaVersion schemaVersion;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion) {
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
this.producerName = checkNotNull(producerName);
this.userProvidedProducerName = userProvidedProducerName;
this.epoch = epoch;
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
Expand Down Expand Up @@ -478,6 +483,14 @@ public boolean isNonPersistentTopic() {
return isNonPersistentTopic;
}

public long getEpoch() {
return epoch;
}

public boolean isUserProvidedProducerName() {
return userProvidedProducerName;
}

@VisibleForTesting
long getPendingPublishAcks() {
return pendingPublishAcks;
Expand Down
Expand Up @@ -817,6 +817,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final long epoch = cmdProducer.getEpoch();
final boolean userProvidedProducerName = cmdProducer.getUserProvidedProducerName();
final boolean isEncrypted = cmdProducer.getEncrypted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;
Expand Down Expand Up @@ -938,7 +940,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {

schemaVersionFuture.thenAccept(schemaVersion -> {
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata, schemaVersion);
isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName);

try {
topic.addProducer(producer);
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;

Expand Down Expand Up @@ -103,7 +102,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> delete();

ConcurrentOpenHashSet<Producer> getProducers();
Map<String, Producer> getProducers();

String getName();

Expand Down
Expand Up @@ -193,24 +193,9 @@ public void addProducer(Producer producer) throws BrokerServiceException {
try {
brokerService.checkTopicNsOwnership(getName());

if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new TopicFencedException("Topic is temporarily unavailable");
}

if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new ProducerBusyException("Topic reached max producers limit");
}
checkTopicFenced();

if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}

if (!producers.add(producer)) {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
}
internalAddProducer(producer);

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
Expand All @@ -231,7 +216,7 @@ public void checkMessageDeduplicationInfo() {
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer)) {
if (producers.remove(producer.getProducerName(), producer)) {
// decrement usage only if this was a valid producer close
USAGE_COUNT_UPDATER.decrementAndGet(this);
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -363,7 +348,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
Expand Down Expand Up @@ -452,7 +437,7 @@ public CompletableFuture<Void> close() {
List<CompletableFuture<Void>> futures = Lists.newArrayList();

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

FutureUtil.waitForAll(futures).thenRun(() -> {
Expand Down Expand Up @@ -642,7 +627,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsStream.startObject(topic);

topicStatsStream.startList("publishers");
producers.forEach(producer -> {
producers.values().forEach(producer -> {
producer.updateRates();
PublisherStats publisherStats = producer.getStats();

Expand Down Expand Up @@ -760,7 +745,7 @@ public NonPersistentTopicStats getStats() {

ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();

producers.forEach(producer -> {
producers.values().forEach(producer -> {
NonPersistentPublisherStats publisherStats = (NonPersistentPublisherStats) producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
stats.msgThroughputIn += publisherStats.msgThroughputIn;
Expand Down Expand Up @@ -875,7 +860,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
schemaValidationEnforced = data.schema_validation_enforced;

producers.forEach(producer -> {
producers.values().forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
Expand Down