Skip to content

Commit

Permalink
[#612] test: cleanup shuffleServer instance for each test (#658)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Cleanup shuffleServer instance for each test

### Why are the changes needed?

When digging into the root cause of #612, I found some problems that the shuffle server instance should be released after each test case is finished.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Don't need
  • Loading branch information
zuston committed Feb 27, 2023
1 parent c70d154 commit b19c8e3
Showing 1 changed file with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
Expand Down Expand Up @@ -75,9 +76,20 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {

private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);

@AfterAll
public static void tearDown() {
private ShuffleServer shuffleServer;

@BeforeEach
public void beforeEach() {
ShuffleServerMetrics.clear();
ShuffleServerMetrics.register();
}

@AfterEach
public void afterEach() throws Exception {
if (shuffleServer != null) {
shuffleServer.stopServer();
shuffleServer = null;
}
}

@Test
Expand All @@ -89,7 +101,7 @@ public void hugePartitionMemoryUsageLimitTest() throws Exception {
conf.setString("rss.server.buffer.capacity", "10K");
conf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 0.1);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "hugePartitionMemoryUsageLimitTest_appId";
Expand Down Expand Up @@ -139,7 +151,7 @@ public void partitionDataSizeSummaryTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "partitionDataSizeSummaryTest";
Expand Down Expand Up @@ -188,7 +200,7 @@ public void registerShuffleTest() throws Exception {
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "registerTest1";
Expand Down Expand Up @@ -244,7 +256,7 @@ public void writeProcessTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
Expand Down Expand Up @@ -377,7 +389,7 @@ public void writeProcessTest() throws Exception {
public void removeShuffleDataWithHdfsTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
String storageBasePath = HDFS_URI + "rss/clearTest";
String storageBasePath = HDFS_URI + "rss/removeShuffleDataWithHdfsTest";
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
Expand All @@ -390,7 +402,7 @@ public void removeShuffleDataWithHdfsTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);

ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

Expand Down Expand Up @@ -462,7 +474,7 @@ public void removeShuffleDataWithLocalfileTest() throws Exception {
conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
path1.toAbsolutePath().toString() + "," + path2.toAbsolutePath().toString());

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "removeShuffleDataWithLocalfileTest";
Expand Down Expand Up @@ -518,7 +530,7 @@ public void clearTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
"clearTest1",
Expand Down Expand Up @@ -576,7 +588,7 @@ public void clearTest() throws Exception {
@Test
public void clearMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearTest";
String storageBasePath = HDFS_URI + "rss/clearMultiTimesTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
Expand All @@ -592,7 +604,7 @@ public void clearMultiTimesTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "clearMultiTimesTest";
shuffleTaskManager.registerShuffle(
Expand Down Expand Up @@ -642,7 +654,7 @@ public void removeResourcesByShuffleIdsMultiTimesTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeResourcesByShuffleIdsMultiTimesTest";
shuffleTaskManager.registerShuffle(
Expand Down Expand Up @@ -772,7 +784,7 @@ public void testGetFinishedBlockIds() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
Expand Down Expand Up @@ -840,7 +852,7 @@ public void testAddFinishedBlockIdsWithoutRegister() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
Expand Down Expand Up @@ -878,7 +890,7 @@ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Excep
// make sure not to check leak shuffle data automatically
conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL, 600 * 1000L);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
Expand Down

0 comments on commit b19c8e3

Please sign in to comment.