Skip to content

Commit

Permalink
[Issue-10611] consumer related topic stats only available while consu…
Browse files Browse the repository at this point in the history
…mer or reader are connected

(cherry picked from commit 2e1e213)
  • Loading branch information
dlg99 authored and eolivelli committed May 20, 2021
1 parent ed1f33e commit 3ebd2d8
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import com.google.common.base.MoreObjects;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -68,6 +69,9 @@ public class NonPersistentSubscription implements Subscription {
// Timestamp of when this subscription was last seen active
private volatile long lastActive;

private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
this.topic = topic;
this.topicName = topic.getName();
Expand Down Expand Up @@ -188,6 +192,10 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
// preserve accumulative stats form removed consumer
ConsumerStats stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);

// invalid consumer remove will throw an exception
// decrement usage is triggered only for valid consumer close
Expand Down Expand Up @@ -443,6 +451,8 @@ public boolean expireMessages(Position position) {

public NonPersistentSubscriptionStats getStats() {
NonPersistentSubscriptionStats subStats = new NonPersistentSubscriptionStats();
subStats.bytesOutCounter = bytesOutFromRemovedConsumers.longValue();
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();

NonPersistentDispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand All @@ -105,6 +106,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;

private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();

private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
Expand Down Expand Up @@ -790,6 +794,8 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog, boolean subsc
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();

subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStats subStats = subscription.getStats();
Expand Down Expand Up @@ -982,7 +988,13 @@ public boolean isReplicated() {
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
// checkInactiveSubscriptions iterates over subscriptions map and removing from the map with the same thread.
// That creates deadlock. so, execute remove it in different thread.
return CompletableFuture.runAsync(() -> subscriptions.remove(subscriptionName), brokerService.executor());
return CompletableFuture.runAsync(() -> {
NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
// preserve accumulative stats form removed subscription
SubscriptionStats stats = sub.getStats();
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}, brokerService.executor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -111,6 +111,9 @@ public class PersistentSubscription implements Subscription {
private volatile boolean isDeleteTransactionMarkerInProcess = false;
private final PendingAckHandle pendingAckHandle;

private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();

static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
Expand Down Expand Up @@ -255,7 +258,13 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
if (dispatcher.getConsumers().isEmpty()) {

// preserve accumulative stats form removed consumer
ConsumerStats stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();

if (!cursor.isDurable()) {
Expand Down Expand Up @@ -925,6 +934,8 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog, boolean subscriptio
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
subStats.lastMarkDeleteAdvancedTimestamp = lastMarkDeleteAdvancedTimestamp;
subStats.bytesOutCounter = bytesOutFromRemovedConsumers.longValue();
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -200,6 +201,9 @@ protected TopicStatsHelper initialValue() {

private ScheduledFuture<?> fencedTopicMonitoringTask = null;

private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();

private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -851,7 +855,7 @@ public void deleteCursorComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cursor deleted successfully", topic, subscriptionName);
}
subscriptions.remove(subscriptionName);
removeSubscription(subscriptionName);
unsubscribeFuture.complete(null);
lastActive = System.nanoTime();
}
Expand All @@ -869,7 +873,11 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
}

void removeSubscription(String subscriptionName) {
subscriptions.remove(subscriptionName);
PersistentSubscription sub = subscriptions.remove(subscriptionName);
// preserve accumulative stats form removed subscription
SubscriptionStats stats = sub.getStats(false, false);
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}

/**
Expand Down Expand Up @@ -1617,6 +1625,9 @@ public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklo
stats.bytesInCounter = getBytesInCounter();
stats.msgChunkPublished = this.msgChunkPublished;

stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();

subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.junit.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

@Test(groups = "broker")
public class NonPersistentTopicTest extends BrokerTestBase {

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testAccumulativeStats() throws Exception {
final String topicName = "non-persistent://prop/ns-abc/aTopic";
final String sharedSubName = "shared";
final String failoverSubName = "failOver";

Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();

NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();

// stats are at zero before any activity
TopicStats stats = topic.getStats(false, false);
assertEquals(stats.bytesInCounter, 0);
assertEquals(stats.msgInCounter, 0);
assertEquals(stats.bytesOutCounter, 0);
assertEquals(stats.msgOutCounter, 0);

producer.newMessage().value("test").eventTime(5).send();
producer.newMessage().value("test").eventTime(5).send();

Message<String> msg = consumer1.receive();
assertNotNull(msg);
msg = consumer2.receive();
assertNotNull(msg);

// send/receive result in non-zero stats
TopicStats statsBeforeUnsubscribe = topic.getStats(false, false);
assertTrue(statsBeforeUnsubscribe.bytesInCounter > 0);
assertTrue(statsBeforeUnsubscribe.msgInCounter > 0);
assertTrue(statsBeforeUnsubscribe.bytesOutCounter > 0);
assertTrue(statsBeforeUnsubscribe.msgOutCounter > 0);

consumer1.unsubscribe();
consumer2.unsubscribe();
producer.close();
topic.getProducers().values().forEach(topic::removeProducer);
assertEquals(topic.getProducers().size(), 0);

// consumer unsubscribe/producer removal does not result in stats loss
TopicStats statsAfterUnsubscribe = topic.getStats(false, false);
assertEquals(statsAfterUnsubscribe.bytesInCounter, statsBeforeUnsubscribe.bytesInCounter);
assertEquals(statsAfterUnsubscribe.msgInCounter, statsBeforeUnsubscribe.msgInCounter);
assertEquals(statsAfterUnsubscribe.bytesOutCounter, statsBeforeUnsubscribe.bytesOutCounter);
assertEquals(statsAfterUnsubscribe.msgOutCounter, statsBeforeUnsubscribe.msgOutCounter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -47,6 +52,9 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -153,4 +161,54 @@ public void testUnblockStuckSubscription() throws Exception {
msg = consumer2.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
}

@Test
public void testAccumulativeStats() throws Exception {
final String topicName = "persistent://prop/ns-abc/aTopic";
final String sharedSubName = "shared";
final String failoverSubName = "failOver";

Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();

// stats are at zero before any activity
TopicStats stats = topic.getStats(false, false);
assertEquals(stats.bytesInCounter, 0);
assertEquals(stats.msgInCounter, 0);
assertEquals(stats.bytesOutCounter, 0);
assertEquals(stats.msgOutCounter, 0);

producer.newMessage().value("test").eventTime(5).send();
producer.newMessage().value("test").eventTime(5).send();

Message<String> msg = consumer1.receive();
assertNotNull(msg);
msg = consumer2.receive();
assertNotNull(msg);

// send/receive result in non-zero stats
TopicStats statsBeforeUnsubscribe = topic.getStats(false, false);
assertTrue(statsBeforeUnsubscribe.bytesInCounter > 0);
assertTrue(statsBeforeUnsubscribe.msgInCounter > 0);
assertTrue(statsBeforeUnsubscribe.bytesOutCounter > 0);
assertTrue(statsBeforeUnsubscribe.msgOutCounter > 0);

consumer1.unsubscribe();
consumer2.unsubscribe();
producer.close();
topic.getProducers().values().forEach(topic::removeProducer);
assertEquals(topic.getProducers().size(), 0);

// consumer unsubscribe/producer removal does not result in stats loss
TopicStats statsAfterUnsubscribe = topic.getStats(false, false);
assertEquals(statsAfterUnsubscribe.bytesInCounter, statsBeforeUnsubscribe.bytesInCounter);
assertEquals(statsAfterUnsubscribe.msgInCounter, statsBeforeUnsubscribe.msgInCounter);
assertEquals(statsAfterUnsubscribe.bytesOutCounter, statsBeforeUnsubscribe.bytesOutCounter);
assertEquals(statsAfterUnsubscribe.msgOutCounter, statsBeforeUnsubscribe.msgOutCounter);
}
}

0 comments on commit 3ebd2d8

Please sign in to comment.