Skip to content

Commit

Permalink
complete test case
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed May 9, 2024
1 parent 24c0ef8 commit 15363bd
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,8 @@ public void blockUntilShutdown() throws InterruptedException {
public int getPort() {
return listenPort;
}

public List<Pair<BindableService, List<ServerInterceptor>>> getServicesWithInterceptors() {
return servicesWithInterceptors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;

import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerGrpcService;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class ServerActivateReassignTest extends SparkSQLTest {

private static String basePath;
public class ServerInActivateReassignTest extends PartitionBlockDataReassignBasicTest {

@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
Expand Down Expand Up @@ -68,34 +70,30 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
createShuffleServer(grpcShuffleServerConf4);

startServers();

// simulate one server that in decommission state.
ShuffleServer faultyShuffleServer = grpcShuffleServers.get(0);
faultyShuffleServer.decommission();
}

private static ShuffleServerConf buildShuffleServerConf(ServerType serverType) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name());
return shuffleServerConf;
}

@Override
public void updateRssStorage(SparkConf sparkConf) {
sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "2");
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
}

@Override
public void checkShuffleData() throws Exception {
Thread.sleep(12000);
String[] paths = basePath.split(",");
for (String path : paths) {
File f = new File(path);
assertEquals(0, f.list().length);
}
public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
sparkConf.set(
"spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
String.valueOf(grpcShuffleServers.size()));
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");

// simulate one server that is inactive.
ShuffleServer shuffleServer = grpcShuffleServers.get(0);
ShuffleServerGrpcService grpcServer =
(ShuffleServerGrpcService)
((GrpcServer) shuffleServer.getServer()).getServicesWithInterceptors().get(0).getKey();
ShuffleServer spy = spy(shuffleServer);
when(spy.isActivateClientPartitionReassign()).thenReturn(true);
grpcServer.setShuffleServer(spy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public class ShuffleServerConf extends RssBaseConf {
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to activate client partition reassign mechanism for server quick decommission");
"Whether to activate client partition reassign mechanism for server quick decommission or inactive.");

public static final ConfigOption<Integer> NETTY_SERVER_PORT =
ConfigOptions.key("rss.server.netty.port")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -94,7 +95,7 @@
public class ShuffleServerGrpcService extends ShuffleServerImplBase {

private static final Logger LOG = LoggerFactory.getLogger(ShuffleServerGrpcService.class);
private final ShuffleServer shuffleServer;
private ShuffleServer shuffleServer;

public ShuffleServerGrpcService(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
Expand Down Expand Up @@ -1034,4 +1035,10 @@ private List<ShuffleDataBlockSegment> toShuffleDataBlockSegments(
}
return shuffleDataBlockSegments;
}

// only for tests
@VisibleForTesting
public void setShuffleServer(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
}
}

0 comments on commit 15363bd

Please sign in to comment.