Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add epoch for connection handler to handle create producer timeout. #5571

Merged
merged 6 commits into from Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,10 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -38,8 +40,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +53,7 @@ public abstract class AbstractTopic implements Topic {
protected final String topic;

// Producers currently connected to this topic
protected final ConcurrentOpenHashSet<Producer> producers;
protected final ConcurrentHashMap<String, Producer> producers;

protected final BrokerService brokerService;

Expand Down Expand Up @@ -86,7 +86,7 @@ public abstract class AbstractTopic implements Topic {
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
this.producers = new ConcurrentOpenHashSet<>(16, 1);
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.lastActive = System.nanoTime();
Expand Down Expand Up @@ -123,7 +123,7 @@ protected boolean isProducersExceeded() {

protected boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.forEach(producer -> {
producers.values().forEach(producer -> {
if (!producer.isRemote()) {
foundLocal.set(true);
}
Expand All @@ -138,7 +138,7 @@ public String toString() {
}

@Override
public ConcurrentOpenHashSet<Producer> getProducers() {
public Map<String, Producer> getProducers() {
return producers;
}

Expand Down Expand Up @@ -267,7 +267,7 @@ public void resetPublishCountAndEnableReadIfRequired() {
*/
protected void enableProduerRead() {
if (producers != null) {
producers.forEach(producer -> producer.getCnx().enableCnxAutoRead());
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -184,9 +185,9 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
*/
private void disconnectProducers(PersistentTopic persistentTopic) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
ConcurrentOpenHashSet<Producer> producers = persistentTopic.getProducers();
Map<String, Producer> producers = persistentTopic.getProducers();

producers.forEach(producer -> {
producers.values().forEach(producer -> {
log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer",
producer.getProducerName(), persistentTopic.getName());
futures.add(producer.disconnect());
Expand Down
Expand Up @@ -61,6 +61,8 @@ public class Producer {
private final Topic topic;
private final ServerCnx cnx;
private final String producerName;
private final long epoch;
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
private Rate msgIn;
Expand All @@ -86,11 +88,14 @@ public class Producer {
private final SchemaVersion schemaVersion;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion) {
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
this.producerName = checkNotNull(producerName);
this.userProvidedProducerName = userProvidedProducerName;
this.epoch = epoch;
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
Expand Down Expand Up @@ -478,6 +483,14 @@ public boolean isNonPersistentTopic() {
return isNonPersistentTopic;
}

public long getEpoch() {
return epoch;
}

public boolean isUserProvidedProducerName() {
return userProvidedProducerName;
}

@VisibleForTesting
long getPendingPublishAcks() {
return pendingPublishAcks;
Expand Down
Expand Up @@ -817,6 +817,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final long epoch = cmdProducer.getEpoch();
final boolean userProvidedProducerName = cmdProducer.getUserProvidedProducerName();
final boolean isEncrypted = cmdProducer.getEncrypted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;
Expand Down Expand Up @@ -938,7 +940,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {

schemaVersionFuture.thenAccept(schemaVersion -> {
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata, schemaVersion);
isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName);

try {
topic.addProducer(producer);
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;

Expand Down Expand Up @@ -103,7 +102,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> delete();

ConcurrentOpenHashSet<Producer> getProducers();
Map<String, Producer> getProducers();

String getName();

Expand Down
Expand Up @@ -207,9 +207,20 @@ public void addProducer(Producer producer) throws BrokerServiceException {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}

if (!producers.add(producer)) {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
boolean canOverwrite = false;
if (existProducer.equals(producer) && !existProducer.isUserProvidedProducerName()
&& !producer.isUserProvidedProducerName() && producer.getEpoch() > existProducer.getEpoch()) {
existProducer.close();
canOverwrite = true;
}
if (canOverwrite) {
producers.put(producer.getProducerName(), producer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use producers.replace(producer.getProducerName(), existingProducer, producer) to make sure one can successfully add the producer.

} else {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
}
}

USAGE_COUNT_UPDATER.incrementAndGet(this);
Expand All @@ -231,7 +242,7 @@ public void checkMessageDeduplicationInfo() {
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer)) {
if (producers.remove(producer.getProducerName(), producer)) {
// decrement usage only if this was a valid producer close
USAGE_COUNT_UPDATER.decrementAndGet(this);
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -363,7 +374,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
Expand Down Expand Up @@ -452,7 +463,7 @@ public CompletableFuture<Void> close() {
List<CompletableFuture<Void>> futures = Lists.newArrayList();

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

FutureUtil.waitForAll(futures).thenRun(() -> {
Expand Down Expand Up @@ -642,7 +653,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsStream.startObject(topic);

topicStatsStream.startList("publishers");
producers.forEach(producer -> {
producers.values().forEach(producer -> {
producer.updateRates();
PublisherStats publisherStats = producer.getStats();

Expand Down Expand Up @@ -760,7 +771,7 @@ public NonPersistentTopicStats getStats() {

ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();

producers.forEach(producer -> {
producers.values().forEach(producer -> {
NonPersistentPublisherStats publisherStats = (NonPersistentPublisherStats) producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
stats.msgThroughputIn += publisherStats.msgThroughputIn;
Expand Down Expand Up @@ -875,7 +886,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
schemaValidationEnforced = data.schema_validation_enforced;

producers.forEach(producer -> {
producers.values().forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
Expand Down
Expand Up @@ -367,7 +367,7 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
isFenced = true;
// close all producers
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.forEach(producer -> futures.add(producer.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
decrementPendingWriteOpsAndCheck();
return null;
Expand Down Expand Up @@ -424,9 +424,20 @@ public void addProducer(Producer producer) throws BrokerServiceException {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}

if (!producers.add(producer)) {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
boolean canOverwrite = false;
if (existProducer.equals(producer) && !existProducer.isUserProvidedProducerName()
&& !producer.isUserProvidedProducerName() && producer.getEpoch() > existProducer.getEpoch()) {
existProducer.close();
canOverwrite = true;
}
if (canOverwrite) {
producers.put(producer.getProducerName(), producer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use replace and check the return result

} else {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
}
}

USAGE_COUNT_UPDATER.incrementAndGet(this);
Expand All @@ -445,7 +456,7 @@ public void addProducer(Producer producer) throws BrokerServiceException {

private boolean hasRemoteProducers() {
AtomicBoolean foundRemote = new AtomicBoolean(false);
producers.forEach(producer -> {
producers.values().forEach(producer -> {
if (producer.isRemote()) {
foundRemote.set(true);
}
Expand Down Expand Up @@ -491,7 +502,8 @@ private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer)) {

if (producers.remove(producer.getProducerName(), producer)) {
// decrement usage only if this was a valid producer close
USAGE_COUNT_UPDATER.decrementAndGet(this);
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -825,7 +837,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
Expand Down Expand Up @@ -935,7 +947,7 @@ public CompletableFuture<Void> close() {
List<CompletableFuture<Void>> futures = Lists.newArrayList();

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

FutureUtil.waitForAll(futures).thenRun(() -> {
Expand Down Expand Up @@ -1277,7 +1289,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

// start publisher stats
topicStatsStream.startList("publishers");
producers.forEach(producer -> {
producers.values().forEach(producer -> {
producer.updateRates();
PublisherStats publisherStats = producer.getStats();

Expand Down Expand Up @@ -1464,7 +1476,7 @@ public TopicStats getStats() {

ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();

producers.forEach(producer -> {
producers.values().forEach(producer -> {
PublisherStats publisherStats = producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
stats.msgThroughputIn += publisherStats.msgThroughputIn;
Expand Down Expand Up @@ -1696,7 +1708,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

this.updateMaxPublishRate(data);

producers.forEach(producer -> {
producers.values().forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
Expand Down Expand Up @@ -1772,7 +1784,7 @@ public CompletableFuture<MessageId> terminate() {
ledger.asyncTerminate(new TerminateCallback() {
@Override
public void terminateComplete(Position lastCommittedPosition, Object ctx) {
producers.forEach(Producer::disconnect);
producers.values().forEach(Producer::disconnect);
subscriptions.forEach((name, sub) -> sub.topicTerminated());

PositionImpl lastPosition = (PositionImpl) lastCommittedPosition;
Expand Down
Expand Up @@ -105,7 +105,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
}

stats.producersCount = 0;
topic.getProducers().forEach(producer -> {
topic.getProducers().values().forEach(producer -> {
if (producer.isRemote()) {
AggregatedReplicationStats replStats = stats.replicationStats
.computeIfAbsent(producer.getRemoteCluster(), k -> new AggregatedReplicationStats());
Expand Down