diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index ea36c21be2..1d799fb581 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -37,7 +37,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.roaringbitmap.longlong.Roaring64NavigableMap; @@ -75,9 +76,20 @@ public class ShuffleTaskManagerTest extends HdfsTestBase { private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0); - @AfterAll - public static void tearDown() { + private ShuffleServer shuffleServer; + + @BeforeEach + public void beforeEach() { ShuffleServerMetrics.clear(); + ShuffleServerMetrics.register(); + } + + @AfterEach + public void afterEach() throws Exception { + if (shuffleServer != null) { + shuffleServer.stopServer(); + shuffleServer = null; + } } @Test @@ -89,7 +101,7 @@ public void hugePartitionMemoryUsageLimitTest() throws Exception { conf.setString("rss.server.buffer.capacity", "10K"); conf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 0.1); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "hugePartitionMemoryUsageLimitTest_appId"; @@ -139,7 +151,7 @@ public void partitionDataSizeSummaryTest() throws Exception { String confFile = ClassLoader.getSystemResource("server.conf").getFile(); ShuffleServerConf conf = new ShuffleServerConf(confFile); conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name()); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "partitionDataSizeSummaryTest"; @@ -188,7 +200,7 @@ public void registerShuffleTest() throws Exception { conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "registerTest1"; @@ -244,7 +256,7 @@ public void writeProcessTest() throws Exception { conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L); conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); shuffleTaskManager.registerShuffle( appId, @@ -377,7 +389,7 @@ public void writeProcessTest() throws Exception { public void removeShuffleDataWithHdfsTest() throws Exception { String confFile = ClassLoader.getSystemResource("server.conf").getFile(); ShuffleServerConf conf = new ShuffleServerConf(confFile); - String storageBasePath = HDFS_URI + "rss/clearTest"; + String storageBasePath = HDFS_URI + "rss/removeShuffleDataWithHdfsTest"; conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234); conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527"); @@ -390,7 +402,7 @@ public void removeShuffleDataWithHdfsTest() throws Exception { conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); @@ -462,7 +474,7 @@ public void removeShuffleDataWithLocalfileTest() throws Exception { conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(), path1.toAbsolutePath().toString() + "," + path2.toAbsolutePath().toString()); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "removeShuffleDataWithLocalfileTest"; @@ -518,7 +530,7 @@ public void clearTest() throws Exception { conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); shuffleTaskManager.registerShuffle( "clearTest1", @@ -576,7 +588,7 @@ public void clearTest() throws Exception { @Test public void clearMultiTimesTest() throws Exception { ShuffleServerConf conf = new ShuffleServerConf(); - String storageBasePath = HDFS_URI + "rss/clearTest"; + String storageBasePath = HDFS_URI + "rss/clearMultiTimesTest"; final int shuffleId = 1; conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234); conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527"); @@ -592,7 +604,7 @@ public void clearMultiTimesTest() throws Exception { conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "clearMultiTimesTest"; shuffleTaskManager.registerShuffle( @@ -642,7 +654,7 @@ public void removeResourcesByShuffleIdsMultiTimesTest() throws Exception { conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "removeResourcesByShuffleIdsMultiTimesTest"; shuffleTaskManager.registerShuffle( @@ -772,7 +784,7 @@ public void testGetFinishedBlockIds() throws Exception { conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); StorageManager storageManager = shuffleServer.getStorageManager(); @@ -840,7 +852,7 @@ public void testAddFinishedBlockIdsWithoutRegister() throws Exception { conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); StorageManager storageManager = shuffleServer.getStorageManager(); @@ -878,7 +890,7 @@ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Excep // make sure not to check leak shuffle data automatically conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL, 600 * 1000L); - ShuffleServer shuffleServer = new ShuffleServer(conf); + shuffleServer = new ShuffleServer(conf); ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); shuffleTaskManager.registerShuffle( appId,