Skip to content

Commit

Permalink
Fix: register zk-stats listener after broker service started (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Jul 17, 2017
1 parent 3047d1c commit 524ff06
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 19 deletions.
Expand Up @@ -72,6 +72,8 @@
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -165,6 +167,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies

private final int keepAliveIntervalSeconds;
private final PulsarStats pulsarStats;
private final EventListner zkStatsListener;
private final AuthenticationService authenticationService;

public static final String BROKER_SERVICE_CONFIGURATION_PATH = "/admin/configuration";
Expand All @@ -190,10 +193,6 @@ public BrokerService(PulsarService pulsar) throws Exception {

this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
this.pulsarStats = new PulsarStats(pulsar);
// register listener to capture zk-latency
ClientCnxnAspect.addListener((eventType, latencyMs) -> {
this.pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
});
this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();

final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");
Expand Down Expand Up @@ -265,8 +264,14 @@ public Map<String, String> deserialize(String key, byte[] content) throws Except
"Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ",
pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked());
}


// register listener to capture zk-latency
zkStatsListener = new EventListner() {
@Override
public void recordLatency(EventType eventType, long latencyMs) {
pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
}
};
PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}

Expand Down Expand Up @@ -307,6 +312,9 @@ public void start() throws Exception {
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startBacklogQuotaChecker();
// register listener to capture zk-latency
ClientCnxnAspect.addListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
}

void startStatsUpdater() {
Expand Down Expand Up @@ -370,6 +378,8 @@ public void close() throws IOException {
backlogQuotaChecker.shutdown();
authenticationService.close();
pulsarStats.close();
ClientCnxnAspect.removeListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(null);
log.info("Broker service completely shut down");
}

Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.bookkeeper.util.MathUtils;
import org.apache.jute.Record;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class ClientCnxnAspect {
public static enum EventType {
write, read, other;
}

private static ExecutorService eventProcessExecutor;

public static interface EventListner {
public void recordLatency(EventType eventType, long latencyMiliSecond);
Expand All @@ -68,7 +71,19 @@ public void processEvent() {
@Around("processEvent()")
public void timedProcessEvent(ProceedingJoinPoint joinPoint) throws Throwable {
joinPoint.proceed();
// zkResponse event shouldn't be blocked and it should be processed
// async
if (eventProcessExecutor != null && !eventProcessExecutor.isShutdown()) {
eventProcessExecutor.submit(new Runnable() {
@Override
public void run() {
processEvent(joinPoint);
}
});
}
}

private void processEvent(ProceedingJoinPoint joinPoint) {
long startTimeMs = getStartTime(joinPoint.getArgs()[0]);
if (startTimeMs == -1) {
// couldn't find start time
Expand Down Expand Up @@ -139,7 +154,8 @@ private long getStartTime(Object packet) {
Field ctxField = Class.forName("org.apache.zookeeper.ClientCnxn$Packet").getDeclaredField("ctx");
ctxField.setAccessible(true);
Object zooworker = ctxField.get(packet);
if (zooworker.getClass().getName().equals("org.apache.bookkeeper.zookeeper.ZooWorker")) {
if (zooworker != null
&& zooworker.getClass().getName().equals("org.apache.bookkeeper.zookeeper.ZooWorker")) {
Field timeField = Class.forName("org.apache.bookkeeper.zookeeper.ZooWorker")
.getDeclaredField("startTimeMs");
timeField.setAccessible(true);
Expand Down Expand Up @@ -172,10 +188,18 @@ private Record getEventType(Object packet) {
return null;
}

public static void registerExecutor(ExecutorService executor) {
eventProcessExecutor = executor;
}

public static void addListener(EventListner listener) {
listeners.add(listener);
}

public static void removeListener(EventListner listener) {
listeners.remove(listener);
}

private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnAspect.class);

}
}
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.stats.Metrics;
Expand All @@ -52,15 +53,13 @@
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.aspectj.weaver.loadtime.Agent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.ea.agentloader.AgentLoader;
import com.google.common.util.concurrent.AtomicDouble;

public class ZooKeeperClientAspectJTest {
Expand All @@ -74,11 +73,11 @@ public class ZooKeeperClientAspectJTest {
static {
// load agent with aspectjweaver-Agent for testing
// maven-test waves advice on build-goal so, maven doesn't need explicit loading
// uncomment it while testing on eclipse:
//AgentLoader.loadAgentClass(Agent.class.getName(), null);
// uncomment it while testing on eclipse:
// AgentLoader.loadAgentClass(Agent.class.getName(), null);
}

@Test(enabled=false)
@Test
public void testZkConnected() throws Exception {
try {
ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl();
Expand Down Expand Up @@ -122,13 +121,17 @@ void testZkClientAspectJTrigger() throws Exception {

final AtomicInteger writeCount = new AtomicInteger(0);
final AtomicInteger readCount = new AtomicInteger(0);
ClientCnxnAspect.addListener((EventType eventType, long latencyMiliSecond) -> {
if (eventType.equals(EventType.write)) {
writeCount.incrementAndGet();
} else if (eventType.equals(EventType.read)) {
readCount.incrementAndGet();
EventListner listener = new EventListner() {
@Override
public void recordLatency(EventType eventType, long latencyMiliSecond) {
if (eventType.equals(EventType.write)) {
writeCount.incrementAndGet();
} else if (eventType.equals(EventType.read)) {
readCount.incrementAndGet();
}
}
});
};
ClientCnxnAspect.addListener(listener);
CountDownLatch createLatch = new CountDownLatch(1);
CountDownLatch deleteLatch = new CountDownLatch(1);
CountDownLatch readLatch = new CountDownLatch(1);
Expand All @@ -152,6 +155,7 @@ void testZkClientAspectJTrigger() throws Exception {
Thread.sleep(500);
Assert.assertEquals(writeCount.get(), 2);
Assert.assertEquals(readCount.get(), 2);
ClientCnxnAspect.removeListener(listener);
} finally {
if (localZkc != null) {
localZkc.close();
Expand Down Expand Up @@ -219,7 +223,7 @@ public void testZkOpStatsMetrics() throws Exception {
deleteLatch.await();
existLatch.await();
readLatch.await();
Thread.sleep(500);
Thread.sleep(10);

BrokerService brokerService = pulsar.getBrokerService();
brokerService.updateRates();
Expand Down

0 comments on commit 524ff06

Please sign in to comment.