Skip to content

Commit

Permalink
DRILL-6415: Fixed TestGracefulShutdown.TestRestApi test from timing o…
Browse files Browse the repository at this point in the history
…ut (#1281)

DRILL-6415: Fixed TestGracefulShutdown.TestRestApi test from timing out
  • Loading branch information
dvjyothsna authored and parthchandra committed May 30, 2018
1 parent 1355bfd commit e05a7e8
Showing 1 changed file with 65 additions and 170 deletions.
Expand Up @@ -20,33 +20,29 @@
import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.work.WorkManager;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;


import java.io.BufferedWriter;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.Collection;
import java.util.Properties;
import java.nio.file.Path; import java.nio.file.Path;
import java.io.BufferedWriter; import java.util.Collection;


import static org.junit.Assert.fail; import static org.junit.Assert.fail;


@Category({SlowTest.class}) @Category({SlowTest.class})
public class TestGracefulShutdown extends BaseTestQuery { public class TestGracefulShutdown extends BaseTestQuery {


@Rule @Rule
public final TestRule TIMEOUT = TestTools.getTimeoutRule(180_000); public final TestRule TIMEOUT = TestTools.getTimeoutRule(120_000);

public static final int WAIT_TIMEOUT_MS = WorkManager.EXIT_TIMEOUT_MS + 30_000;


@BeforeClass @BeforeClass
public static void setUpTestData() throws Exception { public static void setUpTestData() throws Exception {
Expand All @@ -55,37 +51,17 @@ public static void setUpTestData() throws Exception {
} }
} }


public static final Properties WEBSERVER_CONFIGURATION = new Properties() { private static void enableWebServer(ClusterFixtureBuilder builder) {
{ enableDrillPortHunting(builder);
put(ExecConstants.HTTP_ENABLE, true); builder.configBuilder.put(ExecConstants.HTTP_ENABLE, true);
put(ExecConstants.HTTP_PORT_HUNT, true); builder.configBuilder.put(ExecConstants.HTTP_PORT_HUNT, true);
put(ExecConstants.DRILL_PORT_HUNT, true);
put(ExecConstants.GRACE_PERIOD, 10000);
put(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true);
}
};

public static final Properties DRILL_PORT_CONFIGURATION = new Properties() {
{
put(ExecConstants.DRILL_PORT_HUNT, true);
put(ExecConstants.GRACE_PERIOD, 10000);
put(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true);
}
};

public ClusterFixtureBuilder enableWebServer(ClusterFixtureBuilder builder) {
Properties props = new Properties();
props.putAll(WEBSERVER_CONFIGURATION);
builder.configBuilder.configProps(props);
builder.sessionOption(ExecConstants.SLICE_TARGET, 10); builder.sessionOption(ExecConstants.SLICE_TARGET, 10);
return builder;
} }


public ClusterFixtureBuilder enableDrillPortHunting(ClusterFixtureBuilder builder) { private static void enableDrillPortHunting(ClusterFixtureBuilder builder) {
Properties props = new Properties(); builder.configBuilder.put(ExecConstants.DRILL_PORT_HUNT, true);
props.putAll(DRILL_PORT_CONFIGURATION); builder.configBuilder.put(ExecConstants.GRACE_PERIOD, 500);
builder.configBuilder.configProps(props); builder.configBuilder.put(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true);
return builder;
} }


/* /*
Expand All @@ -95,98 +71,30 @@ public ClusterFixtureBuilder enableDrillPortHunting(ClusterFixtureBuilder builde
@Test @Test
public void testOnlineEndPoints() throws Exception { public void testOnlineEndPoints() throws Exception {


String[] drillbits = {"db1" ,"db2","db3", "db4", "db5", "db6"}; String[] drillbits = {"db1" ,"db2","db3"};
ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits); ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
enableDrillPortHunting(builder); enableDrillPortHunting(builder);


try ( ClusterFixture cluster = builder.build()) { try ( ClusterFixture cluster = builder.build()) {


Drillbit drillbit = cluster.drillbit("db2"); Drillbit drillbit = cluster.drillbit("db2");
int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint(); DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint();
int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); cluster.closeDrillbit("db2");

new Thread(new Runnable() {
public void run() {
try {
cluster.closeDrillbit("db2");
} catch (Exception e) {
fail();
}
}
}).start();

Thread.sleep(grace_period);


long currentTime = System.currentTimeMillis(); while (true) {
long stopTime = currentTime + WAIT_TIMEOUT_MS;

while (currentTime < stopTime) {
Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit() Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
.getContext() .getContext()
.getClusterCoordinator() .getClusterCoordinator()
.getOnlineEndPoints(); .getOnlineEndPoints();


if (!drillbitEndpoints.contains(drillbitEndpoint)) { if (!drillbitEndpoints.contains(drillbitEndpoint)) {
// Success // Success
return; return;
} }


Thread.sleep(100L); Thread.sleep(zkRefresh);
currentTime = System.currentTimeMillis();
}

Assert.fail("Timed out");
}
}
/*
Test if the drillbit transitions from ONLINE state when a shutdown
request is initiated
*/
@Test
public void testStateChange() throws Exception {

String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
enableDrillPortHunting(builder);

try (ClusterFixture cluster = builder.build()) {
Drillbit drillbit = cluster.drillbit("db2");
int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD);
DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint();
new Thread(new Runnable() {
public void run() {
try {
cluster.closeDrillbit("db2");
} catch (Exception e) {
fail();
}
}
}).start();

Thread.sleep(grace_period);

long currentTime = System.currentTimeMillis();
long stopTime = currentTime + WAIT_TIMEOUT_MS;

while (currentTime < stopTime) {
Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
.getContext()
.getClusterCoordinator()
.getAvailableEndpoints();
for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) {
if (drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress()) && drillbitEndpoint.getUserPort() == dbEndpoint.getUserPort()) {
if (!dbEndpoint.getState().equals(DrillbitEndpoint.State.ONLINE)) {
// Success
return;
}
}
}

Thread.sleep(100L);
currentTime = System.currentTimeMillis();
} }

Assert.fail("Timed out");
} }
} }


Expand All @@ -196,48 +104,35 @@ public void run() {
@Test @Test
public void testRestApi() throws Exception { public void testRestApi() throws Exception {


String[] drillbits = { "db1" ,"db2", "db3" }; String[] drillbits = {"db1", "db2", "db3"};
ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits); ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
builder = enableWebServer(builder); enableWebServer(builder);
QueryBuilder.QuerySummaryFuture listener; QueryBuilder.QuerySummaryFuture listener;
final String sql = "select * from dfs.root.`.`"; final String sql = "select * from dfs.root.`.`";
try ( ClusterFixture cluster = builder.build(); try (ClusterFixture cluster = builder.build();
final ClientFixture client = cluster.clientFixture()) { final ClientFixture client = cluster.clientFixture()) {
Drillbit drillbit = cluster.drillbit("db1"); Drillbit drillbit = cluster.drillbit("db1");
int port = drillbit.getContext().getConfig().getInt("drill.exec.http.port"); int port = drillbit.getWebServerPort();
int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
listener = client.queryBuilder().sql(sql).futureSummary(); listener = client.queryBuilder().sql(sql).futureSummary();
Thread.sleep(60000); URL url = new URL("http://localhost:" + port + "/gracefulShutdown");
while( port < 8049) { HttpURLConnection conn = (HttpURLConnection) url.openConnection();
URL url = new URL("http://localhost:"+port+"/gracefulShutdown"); conn.setRequestMethod("POST");
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); if (conn.getResponseCode() != 200) {
conn.setRequestMethod("POST"); throw new RuntimeException("Failed : HTTP error code : "
if (conn.getResponseCode() != 200) { + conn.getResponseCode());
throw new RuntimeException("Failed : HTTP error code : "
+ conn.getResponseCode());
}
port++;
} }
Thread.sleep(grace_period); while (true) {
Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
.getContext()
.getClusterCoordinator()
.getOnlineEndPoints();

long currentTime = System.currentTimeMillis();
long stopTime = currentTime + WAIT_TIMEOUT_MS;

while (currentTime < stopTime) {
if (listener.isDone()) { if (listener.isDone()) {
break; break;
} }

Thread.sleep(100L); Thread.sleep(100L);
currentTime = System.currentTimeMillis();
} }


Assert.assertTrue(listener.isDone()); if (waitAndAssertDrillbitCount(cluster, zkRefresh)) {
Assert.assertEquals(1,drillbitEndpoints.size()); return;
}
Assert.fail("Timed out");
} }
} }


Expand All @@ -249,48 +144,48 @@ public void testRestApiShutdown() throws Exception {


String[] drillbits = {"db1" ,"db2", "db3"}; String[] drillbits = {"db1" ,"db2", "db3"};
ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits); ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
builder = enableWebServer(builder); enableWebServer(builder);
QueryBuilder.QuerySummaryFuture listener; QueryBuilder.QuerySummaryFuture listener;
final String sql = "select * from dfs.root.`.`"; final String sql = "select * from dfs.root.`.`";
try ( ClusterFixture cluster = builder.build(); try (ClusterFixture cluster = builder.build();
final ClientFixture client = cluster.clientFixture()) { final ClientFixture client = cluster.clientFixture()) {
Drillbit drillbit = cluster.drillbit("db1"); Drillbit drillbit = cluster.drillbit("db1");
int port = drillbit.getContext().getConfig().getInt("drill.exec.http.port"); int port = drillbit.getWebServerPort();
int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
listener = client.queryBuilder().sql(sql).futureSummary(); listener = client.queryBuilder().sql(sql).futureSummary();
Thread.sleep(10000); while (true) {

if (listener.isDone()) {
while( port < 8048) { break;
URL url = new URL("http://localhost:"+port+"/shutdown");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
if (conn.getResponseCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : "
+ conn.getResponseCode());
} }
port++;
Thread.sleep(100L);
} }
URL url = new URL("http://localhost:" + port + "/shutdown");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
if (conn.getResponseCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : "
+ conn.getResponseCode());
}
if (waitAndAssertDrillbitCount(cluster, zkRefresh)) {
return;
}
Assert.fail("Timed out");
}
}


Thread.sleep(grace_period); private static boolean waitAndAssertDrillbitCount(ClusterFixture cluster, int zkRefresh) throws InterruptedException {


while (true) {
Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit() Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
.getContext() .getContext()
.getClusterCoordinator() .getClusterCoordinator()
.getAvailableEndpoints(); .getAvailableEndpoints();

if (drillbitEndpoints.size() == 2) {
long currentTime = System.currentTimeMillis(); return true;
long stopTime = currentTime + WAIT_TIMEOUT_MS;

while (currentTime < stopTime) {
if (listener.isDone() && drillbitEndpoints.size() == 2) {
return;
}

Thread.sleep(100L);
currentTime = System.currentTimeMillis();
} }


Assert.fail("Timed out"); Thread.sleep(zkRefresh);
} }
} }


Expand Down

0 comments on commit e05a7e8

Please sign in to comment.