Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#612] test: cleanup shuffleServer instance for each test #658

Merged
merged 1 commit into from
Feb 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
Expand Down Expand Up @@ -75,9 +76,20 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {

private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);

@AfterAll
public static void tearDown() {
private ShuffleServer shuffleServer;

@BeforeEach
public void beforeEach() {
ShuffleServerMetrics.clear();
ShuffleServerMetrics.register();
}

@AfterEach
public void afterEach() throws Exception {
if (shuffleServer != null) {
shuffleServer.stopServer();
shuffleServer = null;
}
}

@Test
Expand All @@ -89,7 +101,7 @@ public void hugePartitionMemoryUsageLimitTest() throws Exception {
conf.setString("rss.server.buffer.capacity", "10K");
conf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 0.1);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "hugePartitionMemoryUsageLimitTest_appId";
Expand Down Expand Up @@ -139,7 +151,7 @@ public void partitionDataSizeSummaryTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "partitionDataSizeSummaryTest";
Expand Down Expand Up @@ -188,7 +200,7 @@ public void registerShuffleTest() throws Exception {
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "registerTest1";
Expand Down Expand Up @@ -244,7 +256,7 @@ public void writeProcessTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
Expand Down Expand Up @@ -377,7 +389,7 @@ public void writeProcessTest() throws Exception {
public void removeShuffleDataWithHdfsTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
String storageBasePath = HDFS_URI + "rss/clearTest";
String storageBasePath = HDFS_URI + "rss/removeShuffleDataWithHdfsTest";
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
Expand All @@ -390,7 +402,7 @@ public void removeShuffleDataWithHdfsTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);

ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

Expand Down Expand Up @@ -462,7 +474,7 @@ public void removeShuffleDataWithLocalfileTest() throws Exception {
conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
path1.toAbsolutePath().toString() + "," + path2.toAbsolutePath().toString());

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "removeShuffleDataWithLocalfileTest";
Expand Down Expand Up @@ -518,7 +530,7 @@ public void clearTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
"clearTest1",
Expand Down Expand Up @@ -576,7 +588,7 @@ public void clearTest() throws Exception {
@Test
public void clearMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearTest";
String storageBasePath = HDFS_URI + "rss/clearMultiTimesTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
Expand All @@ -592,7 +604,7 @@ public void clearMultiTimesTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "clearMultiTimesTest";
shuffleTaskManager.registerShuffle(
Expand Down Expand Up @@ -642,7 +654,7 @@ public void removeResourcesByShuffleIdsMultiTimesTest() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeResourcesByShuffleIdsMultiTimesTest";
shuffleTaskManager.registerShuffle(
Expand Down Expand Up @@ -772,7 +784,7 @@ public void testGetFinishedBlockIds() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
Expand Down Expand Up @@ -840,7 +852,7 @@ public void testAddFinishedBlockIdsWithoutRegister() throws Exception {
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
Expand Down Expand Up @@ -878,7 +890,7 @@ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Excep
// make sure not to check leak shuffle data automatically
conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL, 600 * 1000L);

ShuffleServer shuffleServer = new ShuffleServer(conf);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
Expand Down