Skip to content

Commit

Permalink
Avoid making copies of internal maps when iterating (#10691)
Browse files Browse the repository at this point in the history
### Motivation

In several places in the code when iterating over the custom hashmaps, we are taking over a copy of the map. This was done every time the iteration could end up modifying the map, since there was a non-reentrant mutex taken during the iteration. Any modification would lead to a deadlock. 

Since the behavior was changed in #9787 to not hold the section mutex during the iteration, there's no more need to make a copy of the maps.
  • Loading branch information
merlimat committed May 25, 2021
1 parent a19de10 commit ed2dfc9
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -313,13 +312,13 @@ public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception
// create non-partitioned topic manually and close the previous reader if present.
try {
pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
for (Subscription value : t.getSubscriptions().values()) {
t.getSubscriptions().forEach((__, value) -> {
try {
value.deleteForcefully();
} catch (Exception e) {
LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e);
}
}
});
});
} catch (Exception e) {
LOG.warn("Failed to try to delete subscriptions for health check", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,8 +1237,8 @@ public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceNam
synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
if (pulsar.getBrokerService().getMultiLayerTopicMap()
.containsKey(namespaceName.toString())) {
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
.forEach(bundle -> {
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString())
.forEach((__, bundle) -> {
bundle.forEach((topicName, topic) -> {
if (topic instanceof NonPersistentTopic
&& ((NonPersistentTopic) topic).isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -226,9 +225,10 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
if (log.isDebugEnabled()) {
log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target);
}
for (PersistentSubscription subscription : persistentTopic.getSubscriptions().values()) {
subscription.getExpiryMonitor().expireMessages(target);
}

persistentTopic.getSubscriptions().forEach((__, subscription) ->
subscription.getExpiryMonitor().expireMessages(target)
);
} else {
// If disabled precise time based backlog quota check, will try to remove whole ledger from cursor's backlog
Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,14 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
cnxsPerThread.get().remove(this);

// Connection is gone, close the producers immediately
producers.values().forEach((producerFuture) -> {
producers.forEach((__, producerFuture) -> {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
}
});

consumers.values().forEach((consumerFuture) -> {
consumers.forEach((__, consumerFuture) -> {
Consumer consumer;
if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumer = consumerFuture.getNow(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,8 +1328,8 @@ public void checkMessageExpiry() {
int messageTtlInSeconds = getMessageTTL();

if (messageTtlInSeconds != 0) {
subscriptions.values().forEach((sub) -> sub.expireMessages(messageTtlInSeconds));
replicators.values().forEach((replicator)
subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
}
} catch (Exception e) {
Expand Down

0 comments on commit ed2dfc9

Please sign in to comment.