Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.helix;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.helix.common.ZkTestBase;


public class ThreadLeakageChecker {
private static ThreadGroup getRootThreadGroup() {
ThreadGroup candidate = Thread.currentThread().getThreadGroup();
while (candidate.getParent() != null) {
candidate = candidate.getParent();
}
return candidate;
}

private static List<Thread> getAllThreads() {
ThreadGroup rootThreadGroup = getRootThreadGroup();
Thread[] threads = new Thread[32];
int count = rootThreadGroup.enumerate(threads);
while (count == threads.length) {
threads = new Thread[threads.length * 2];
count = rootThreadGroup.enumerate(threads);
}
return Arrays.asList(Arrays.copyOf(threads, count));
}

private static final String[] ZKSERVER_THRD_PATTERN =
{"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
private static final String[] ZKSESSION_THRD_PATTERN =
new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", "-EventThread", "-SendThread"};
private static final String[] FORKJOIN_THRD_PATTERN = new String[]{"ForkJoinPool"};
private static final String[] TIMER_THRD_PATTERN = new String[]{"time"};
private static final String[] TASKSTATEMODEL_THRD_PATTERN = new String[]{"TaskStateModel"};

/*
* The two threshold -- warning and limit, are mostly empirical.
*
* ZkServer, current version has only 4 threads. In case later version use more, we the limit to 100.
* The reasoning is that these ZkServer threads are not deemed as leaking no matter how much they have.
*
* ZkSession is the ZkClient and native Zookeeper client we have. ZkTestBase has 12 at starting up time.
* Thus, if there is more than that, it is the test code leaking ZkClient.
*
* ForkJoin is created by using parallel stream or similar Java features. This is out of our control.
* Similar to ZkServer. The limit is to 100 while keep a small _warningLimit.
*
* Timer should not happen. Setting limit to 2 not 0 mostly because even when you cancel the timer
* thread, it may take some not deterministic time for it to go away. So give it some slack here
*
* Also note, this ThreadLeakage checker depends on the fact that tests are running sequentially.
* Otherwise, the report is not going to be accurate.
*/
private static enum ThreadCategory {
ZkServer("zookeeper server threads", 4, 100, ZKSERVER_THRD_PATTERN),
ZkSession("zkclient/zooKeeper session threads", 12, 12, ZKSESSION_THRD_PATTERN),
ForkJoin("fork join pool threads", 2, 100, FORKJOIN_THRD_PATTERN),
Timer("timer threads", 0, 2, TIMER_THRD_PATTERN),
TaskStateModel("TaskStateModel threads", 0, 0, TASKSTATEMODEL_THRD_PATTERN),
Other("Other threads", 0, 2, new String[]{""});

private String _description;
private List<String> _pattern;
private int _warningLimit;
private int _limit;

public String getDescription() {
return _description;
}

public Predicate<String> getMatchPred() {
if (this.name() != ThreadCategory.Other.name()) {
Predicate<String> pred = target -> {
for (String p : _pattern) {
if (target.toLowerCase().contains(p.toLowerCase())) {
return true;
}
}
return false;
};
return pred;
}

List<Predicate<String>> predicateList = new ArrayList<>();
for (ThreadCategory threadCategory : ThreadCategory.values()) {
if (threadCategory == ThreadCategory.Other) {
continue;
}
predicateList.add(threadCategory.getMatchPred());
}
Predicate<String> pred = target -> {
for (Predicate<String> p : predicateList) {
if (p.test(target)) {
return false;
}
}
return true;
};

return pred;
}

public int getWarningLimit() {
return _warningLimit;
}

public int getLimit() {
return _limit;
}

private ThreadCategory(String description, int warningLimit, int limit, String[] patterns) {
_description = description;
_pattern = Arrays.asList(patterns);
_warningLimit = warningLimit;
_limit = limit;
}
}

public static boolean afterClassCheck(String classname) {
ZkTestBase.reportPhysicalMemory();
// step 1: get all active threads
List<Thread> threads = getAllThreads();
System.out.println(classname + " has active threads cnt:" + threads.size());

// step 2: categorize threads
Map<String, List<Thread>> threadByName = null;
Map<ThreadCategory, Integer> threadByCnt = new HashMap<>();
Map<ThreadCategory, Set<Thread>> threadByCat = new HashMap<>();
try {
threadByName = threads.
stream().
filter(p -> p.getThreadGroup() != null && p.getThreadGroup().getName() != null
&& ! "system".equals(p.getThreadGroup().getName())).
collect(Collectors.groupingBy(p -> p.getName()));
} catch (Exception e) {
System.out.println("filtering thread failure with exception:" + e.getStackTrace());
}

threadByName.entrySet().stream().forEach(entry -> {
String key = entry.getKey(); // thread name
Arrays.asList(ThreadCategory.values()).stream().forEach(category -> {
if (category.getMatchPred().test(key)) {
Integer count = threadByCnt.containsKey(category) ? threadByCnt.get(category) : 0;
threadByCnt.put(category, count + entry.getValue().size());
Set<Thread> thisSet = threadByCat.getOrDefault(category, new HashSet<>());
thisSet.addAll(entry.getValue());
threadByCat.put(category, thisSet);
}
});
});

// todo: We should make the following System.out as LOG.INfO once we achieve 0 thread leakage.
// todo: also the calling point of this method would fail the test
// step 3: enforce checking policy
boolean checkStatus = true;
for (ThreadCategory threadCategory : ThreadCategory.values()) {
int limit = threadCategory.getLimit();
int warningLimit = threadCategory.getWarningLimit();

Integer categoryThreadCnt = threadByCnt.get(threadCategory);
if (categoryThreadCnt != null) {
boolean dumpThread = false;
if (categoryThreadCnt > limit) {
checkStatus = false;
System.out.println(
"Failure " + threadCategory.getDescription() + " has " + categoryThreadCnt + " thread");
dumpThread = true;
} else if (categoryThreadCnt > warningLimit) {
System.out.println(
"Warning " + threadCategory.getDescription() + " has " + categoryThreadCnt + " thread");
dumpThread = true;
} else {
System.out.println(threadCategory.getDescription() + " has " + categoryThreadCnt + " thread");
}
if (!dumpThread) {
continue;
}
// print first 100 thread names
int i = 0;
for (Thread t : threadByCat.get(threadCategory)) {
System.out.println(i + " thread:" + t.getName());
i++;
if (i == 100) {
System.out.println(" skipping the rest");
break;
}
}
}
}

return checkStatus;
}
}
29 changes: 28 additions & 1 deletion helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.ThreadLeakageChecker;
import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.Pipeline;
Expand Down Expand Up @@ -127,8 +128,22 @@ public class ZkTestBase {
protected Map<String, ClusterSetup> _clusterSetupMap = new HashMap<>();
protected Map<String, BaseDataAccessor> _baseDataAccessorMap = new HashMap<>();

static public void reportPhysicalMemory() {
com.sun.management.OperatingSystemMXBean os = (com.sun.management.OperatingSystemMXBean)
java.lang.management.ManagementFactory.getOperatingSystemMXBean();
long physicalMemorySize = os.getTotalPhysicalMemorySize();
System.out.println("************ SYSTEM Physical Memory:" + physicalMemorySize);

long MB = 1024 * 1024;
Runtime runtime = Runtime.getRuntime();
long free = runtime.freeMemory()/MB;
long total = runtime.totalMemory()/MB;
System.out.println("************ total memory:" + total + " free memory:" + free);
}

@BeforeSuite
public void beforeSuite() throws Exception {
reportPhysicalMemory();
// TODO: use logging.properties file to config java.util.logging.Logger levels
java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
topJavaLogger.setLevel(Level.WARNING);
Expand Down Expand Up @@ -710,7 +725,8 @@ protected List<IdealState> setupIdealState(String clusterName, int[] nodes, Stri
}

@AfterClass
public void cleanupLiveInstanceOwners() {
public void cleanupLiveInstanceOwners() throws InterruptedException {
String testClassName = this.getShortClassName();
for (String cluster : _liveInstanceOwners.keySet()) {
Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(cluster);
for (HelixZkClient client : clientMap.values()) {
Expand All @@ -719,6 +735,17 @@ public void cleanupLiveInstanceOwners() {
clientMap.clear();
}
_liveInstanceOwners.clear();

boolean status = false;
try {
status = ThreadLeakageChecker.afterClassCheck(testClassName);
} catch (Exception e) {
LOG.error("ThreadLeakageChecker exception:", e);
}
// todo: We should fail test here once we achieved 0 leakage and remove the following System print
if (!status) {
System.out.println("---------- Test Class " + testClassName + " thread leakage detected! ---------------");
}
}

protected List<LiveInstance> setupLiveInstances(String clusterName, int[] liveInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.ThreadLeakageChecker;
import org.apache.helix.api.config.RebalanceConfig;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
Expand Down Expand Up @@ -83,6 +85,8 @@
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -95,6 +99,7 @@
* This test verifies that all Helix Java APIs work as expected.
*/
public class TestMultiZkHelixJavaApis {
private static Logger LOG = LoggerFactory.getLogger(TestMultiZkHelixJavaApis.class);
private static final int NUM_ZK = 3;
private static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
private static final Map<String, HelixZkClient> ZK_CLIENT_MAP = new HashMap<>();
Expand Down Expand Up @@ -170,6 +175,8 @@ public void beforeClass() throws Exception {

@AfterClass
public void afterClass() throws Exception {
String testClassName = getClass().getSimpleName();

try {
// Kill all mock controllers and participants
MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
Expand Down Expand Up @@ -216,6 +223,17 @@ public void afterClass() throws Exception {
System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
}
}

boolean status = false;
try {
status = ThreadLeakageChecker.afterClassCheck(testClassName);
} catch (Exception e) {
LOG.error("ThreadLeakageChecker exception:", e);
}
// todo: We should fail test here once we achieved 0 leakage and remove the following System print
if (!status) {
System.out.println("---------- Test Class " + testClassName + " thread leakage detected! ---------------");
}
}

/**
Expand Down