From 524ff06ebf8b72a8b9923f546db13faf7ebc2033 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 17 Jul 2017 12:24:38 -0700 Subject: [PATCH] Fix: register zk-stats listener after broker service started (#573) --- .../pulsar/broker/service/BrokerService.java | 20 +++++++++---- .../zookeeper/aspectj/ClientCnxnAspect.java | 28 +++++++++++++++++-- .../zookeeper/ZooKeeperClientAspectJTest.java | 28 +++++++++++-------- 3 files changed, 57 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 0cbab32410079..7df09fbd9aafd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -72,6 +72,8 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect; +import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner; +import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -165,6 +167,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener(); this.pulsarStats = new PulsarStats(pulsar); - // register listener to capture zk-latency - ClientCnxnAspect.addListener((eventType, latencyMs) -> { - this.pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs); - }); this.offlineTopicStatCache = new ConcurrentOpenHashMap<>(); final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor"); @@ -265,8 +264,14 @@ public Map deserialize(String key, byte[] content) throws Except "Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ", pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()); } - + // register listener to capture zk-latency + zkStatsListener = new EventListner() { + @Override + public void recordLatency(EventType eventType, long latencyMs) { + pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs); + } + }; PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize()); } @@ -307,6 +312,9 @@ public void start() throws Exception { this.startInactivityMonitor(); this.startMessageExpiryMonitor(); this.startBacklogQuotaChecker(); + // register listener to capture zk-latency + ClientCnxnAspect.addListener(zkStatsListener); + ClientCnxnAspect.registerExecutor(pulsar.getExecutor()); } void startStatsUpdater() { @@ -370,6 +378,8 @@ public void close() throws IOException { backlogQuotaChecker.shutdown(); authenticationService.close(); pulsarStats.close(); + ClientCnxnAspect.removeListener(zkStatsListener); + ClientCnxnAspect.registerExecutor(null); log.info("Broker service completely shut down"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java index 006660576ed26..35e7b3d1d66fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java @@ -20,6 +20,7 @@ import java.lang.reflect.Field; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.util.MathUtils; import org.apache.jute.Record; @@ -54,6 +55,8 @@ public class ClientCnxnAspect { public static enum EventType { write, read, other; } + + private static ExecutorService eventProcessExecutor; public static interface EventListner { public void recordLatency(EventType eventType, long latencyMiliSecond); @@ -68,7 +71,19 @@ public void processEvent() { @Around("processEvent()") public void timedProcessEvent(ProceedingJoinPoint joinPoint) throws Throwable { joinPoint.proceed(); + // zkResponse event shouldn't be blocked and it should be processed + // async + if (eventProcessExecutor != null && !eventProcessExecutor.isShutdown()) { + eventProcessExecutor.submit(new Runnable() { + @Override + public void run() { + processEvent(joinPoint); + } + }); + } + } + private void processEvent(ProceedingJoinPoint joinPoint) { long startTimeMs = getStartTime(joinPoint.getArgs()[0]); if (startTimeMs == -1) { // couldn't find start time @@ -139,7 +154,8 @@ private long getStartTime(Object packet) { Field ctxField = Class.forName("org.apache.zookeeper.ClientCnxn$Packet").getDeclaredField("ctx"); ctxField.setAccessible(true); Object zooworker = ctxField.get(packet); - if (zooworker.getClass().getName().equals("org.apache.bookkeeper.zookeeper.ZooWorker")) { + if (zooworker != null + && zooworker.getClass().getName().equals("org.apache.bookkeeper.zookeeper.ZooWorker")) { Field timeField = Class.forName("org.apache.bookkeeper.zookeeper.ZooWorker") .getDeclaredField("startTimeMs"); timeField.setAccessible(true); @@ -172,10 +188,18 @@ private Record getEventType(Object packet) { return null; } + public static void registerExecutor(ExecutorService executor) { + eventProcessExecutor = executor; + } + public static void addListener(EventListner listener) { listeners.add(listener); } + + public static void removeListener(EventListner listener) { + listeners.remove(listener); + } private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnAspect.class); -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java index d00d8ed6242a5..93a3b08d68aef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect; +import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.stats.Metrics; @@ -52,7 +53,6 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; -import org.aspectj.weaver.loadtime.Agent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -60,7 +60,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.ea.agentloader.AgentLoader; import com.google.common.util.concurrent.AtomicDouble; public class ZooKeeperClientAspectJTest { @@ -74,11 +73,11 @@ public class ZooKeeperClientAspectJTest { static { // load agent with aspectjweaver-Agent for testing // maven-test waves advice on build-goal so, maven doesn't need explicit loading - // uncomment it while testing on eclipse: - //AgentLoader.loadAgentClass(Agent.class.getName(), null); + // uncomment it while testing on eclipse: + // AgentLoader.loadAgentClass(Agent.class.getName(), null); } - @Test(enabled=false) + @Test public void testZkConnected() throws Exception { try { ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl(); @@ -122,13 +121,17 @@ void testZkClientAspectJTrigger() throws Exception { final AtomicInteger writeCount = new AtomicInteger(0); final AtomicInteger readCount = new AtomicInteger(0); - ClientCnxnAspect.addListener((EventType eventType, long latencyMiliSecond) -> { - if (eventType.equals(EventType.write)) { - writeCount.incrementAndGet(); - } else if (eventType.equals(EventType.read)) { - readCount.incrementAndGet(); + EventListner listener = new EventListner() { + @Override + public void recordLatency(EventType eventType, long latencyMiliSecond) { + if (eventType.equals(EventType.write)) { + writeCount.incrementAndGet(); + } else if (eventType.equals(EventType.read)) { + readCount.incrementAndGet(); + } } - }); + }; + ClientCnxnAspect.addListener(listener); CountDownLatch createLatch = new CountDownLatch(1); CountDownLatch deleteLatch = new CountDownLatch(1); CountDownLatch readLatch = new CountDownLatch(1); @@ -152,6 +155,7 @@ void testZkClientAspectJTrigger() throws Exception { Thread.sleep(500); Assert.assertEquals(writeCount.get(), 2); Assert.assertEquals(readCount.get(), 2); + ClientCnxnAspect.removeListener(listener); } finally { if (localZkc != null) { localZkc.close(); @@ -219,7 +223,7 @@ public void testZkOpStatsMetrics() throws Exception { deleteLatch.await(); existLatch.await(); readLatch.await(); - Thread.sleep(500); + Thread.sleep(10); BrokerService brokerService = pulsar.getBrokerService(); brokerService.updateRates();