Skip to content

Commit

Permalink
Framework changes and tests for RM queue selection (#571)
Browse files Browse the repository at this point in the history
* Framework changes and tests for RM queue selection

- Added a new class for RM queue selection tests.
- Added new methods in ConnectionPool to create connections based on user, password, tags and groups.
- Refactored restartDrill out of TestDriver, since it is a utility.
- Create connection with schema, group and queryTags properties.
- Changes in JavaTestBase to get all Drillbits' hostnames.
- Fixed faulty tests in TestSSLProperties

* Cosmetic changes. Add comments for the tests.

* Add package to manage DrillCluster from SSH

- Added classes to manage SSH sessions from a user to a DrillCluster.
- Most of the code is re-used from mapr/ycsb-driver implementations.
- The DrillCluster instance should be used when the test framework has to talk to the cluster for copying files or running certain commands.
- Modified the test cases accordingly.
- Added ThrowingConsumer to support lambda for functions with checked exceptions.

* Cosmetic changes, add javadoc for new utility methods

* Existing TestDriver continue using older method

* Small change, missed synchronized

* Add a method for cleaning up cluster before every test

- Remove any existing RM config in DrillCluster.

* Add cleanup after class as well, other cosmetic changes

* Minor change, add description to testng methods

* Run cleanup once for a class

* Cleanup and add test case for unknown tag
  • Loading branch information
abhidotravi authored and Agirish committed Apr 15, 2019
1 parent 7ea28ba commit 0c07fc7
Show file tree
Hide file tree
Showing 20 changed files with 1,069 additions and 70 deletions.
5 changes: 5 additions & 0 deletions framework/pom.xml
Expand Up @@ -130,6 +130,11 @@
<artifactId>config</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.53</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Expand Up @@ -90,8 +90,32 @@ private synchronized Connection getOrCreateConnection(String username, String pa
}

@VisibleForTesting
public Connection createConnection(Properties connectionProperties) throws SQLException {
return DriverManager.getConnection(DrillTestDefaults.CONNECTION_STRING, connectionProperties);
public static Connection createConnection(Properties connectionProperties) throws SQLException {
return createConnection(DrillTestDefaults.CONNECTION_STRING, connectionProperties);
}

/**
* Create a connection with a custom URL and properties.
*
* @param url
* @param props
* @return Connection instance to drill cluster.
* @throws SQLException
*/
@VisibleForTesting
public static Connection createConnection(final String url,
final Properties props) throws SQLException {
return createConnection(url, DrillTestDefaults.USERNAME, DrillTestDefaults.PASSWORD, props);
}

@VisibleForTesting
public static synchronized Connection createConnection(final String url,
final String username,
final String password,
final Properties props) throws SQLException {
props.put("user", username == null ? DrillTestDefaults.USERNAME : username);
props.put("password", password == null ? DrillTestDefaults.PASSWORD : password);
return DriverManager.getConnection(url, props);
}

/**
Expand Down
Expand Up @@ -113,6 +113,8 @@ public class DrillTestDefaults {

public static final String DRILL_RM_OVERRIDE_CONF_FILENAME = "drill-rm-override.conf";

public static final long DEFAULT_SLEEP_IN_MILLIS = 20000; //default sleep

// Adding classifications for Execution Failures
public static enum DRILL_EXCEPTION{
VALIDATION_ERROR_INVALID_SCHEMA,
Expand Down
Expand Up @@ -618,7 +618,7 @@ else if(ii<executionFailureExceptions.size()-1)

executor.close();
connectionPool.close();
restartDrill();
Utils.restartDrill();
return totalExecutionFailures + totalDataVerificationFailures + totalPlanVerificationFailures + totalTimeoutFailures + totalRandomFailures;
}

Expand Down Expand Up @@ -784,7 +784,7 @@ public void run() {
LOG.info("\n>> Generation duration: " + stopwatch + "\n");

if (restartDrillbits) {
restartDrill();
Utils.restartDrill();
}
}

Expand Down Expand Up @@ -973,19 +973,4 @@ private void generateReports(List<DrillTest> tests, int iteration) {
e.printStackTrace();
}
}

private int restartDrill() {
int exitCode = 0;
String command = DrillTestDefaults.TEST_ROOT_DIR + "/" + DrillTestDefaults.RESTART_DRILL_SCRIPT;
File commandFile = new File(command);
if (commandFile.exists() && commandFile.canExecute()) {
LOG.info("\n> Executing Post Build Script");
LOG.info("\n>> Path: " + command);
exitCode = Utils.execCmd(command).exitCode;
if (exitCode != 0) {
LOG.error("\n>> Error restarting drillbits");
}
}
return exitCode;
}
}
@@ -0,0 +1,19 @@
package org.apache.drill.test.framework;

import java.util.function.Consumer;

@FunctionalInterface
public interface ThrowingConsumer<T, E extends Throwable> {
void accept(T t) throws E;

static <T> Consumer<T> throwingConsumerWrapper(
ThrowingConsumer<T, Exception> throwingConsumer) {
return i -> {
try {
throwingConsumer.accept(i);
} catch (Exception ex) {
Utils.sneakyThrow(ex);
}
};
}
}
116 changes: 90 additions & 26 deletions framework/src/main/java/org/apache/drill/test/framework/Utils.java
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.test.framework;

import com.google.common.base.Preconditions;
import oadd.org.apache.drill.exec.proto.UserBitShared;
import org.apache.commons.io.FilenameUtils;

Expand All @@ -43,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -55,6 +57,7 @@
import com.google.common.collect.Maps;
import com.google.common.io.Resources;

import org.apache.drill.test.framework.ssh.DrillCluster;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
Expand Down Expand Up @@ -115,6 +118,24 @@ public static Properties createConnectionProperties() {
return connectionProperties;
}

public static Properties createConnectionProperties(final String schema,
final String group,
final String queryTags) {
Properties props = createConnectionProperties();
if(schema != null) {
props.put("schema", schema);
}

if(group != null) {
props.put("group", group);
}

if(queryTags != null) {
props.put("queryTags", queryTags);
}
return props;
}

// Accept self-signed certificate
public static class MyHostNameVerifier implements HostnameVerifier {

Expand Down Expand Up @@ -919,52 +940,35 @@ public static List<String> getDrillbitHosts(Connection connection) throws SQLExc
}

/**
* Apply RM config represented by DrillRMConfig to a specified Drillbit.
* Apply RM config represented by DrillRMConfig to all drillbits part of {@link DrillCluster}
*
* As a part of this method
* - Write the config to a temporary file (remove if file exists previously.
* - Copy the file to specified Drillbit node.
* - Copy the file to all nodes part of the {@link DrillCluster}.
*
* @param config
* @param drillbitHost
* @param drillCluster
* @throws IOException
*/
public static synchronized void applyRMConfigToDrillbit(final DrillRMConfig config,
final String drillbitHost) throws IOException {
public static synchronized void applyRMConfigToDrillCluster(final DrillRMConfig config,
final DrillCluster drillCluster) throws IOException {
final String drillRMConfFilePath = DrillTestDefaults.TEST_ROOT_DIR + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME;

File drillRMConfFile = new File(drillRMConfFilePath);

CmdConsOut out;
if(drillRMConfFile.exists()) {
LOG.warn(drillRMConfFilePath + " exists! Removing the file");
if ((out = Utils.execCmd("rm -rf " + drillRMConfFilePath)).exitCode != 0) {
LOG.error("Could not remove config file " +
drillRMConfFilePath + "\n\n" +
out);
LOG.error("Could not remove config file " + drillRMConfFilePath + "\n\n" + out);
throw new IOException(out.consoleErr);
}
}

try (BufferedWriter writer = new BufferedWriter(new FileWriter(drillRMConfFilePath))) {
writer.write(DRILL_EXEC_RM_CONFIG_KEY + ":" + config.render());
}

final String scpCommand = new StringBuilder("scp ")
.append(drillRMConfFilePath)
.append(" ")
.append(USERNAME)
.append("@").append(drillbitHost)
.append(":").append(DRILL_HOME)
.append("/conf/")
.append(DRILL_RM_OVERRIDE_CONF_FILENAME)
.toString();

LOG.info("Copying config " + scpCommand);
if ((out = Utils.execCmd(scpCommand)).exitCode != 0) {
LOG.error("Copying config to drillbit failed!\n\n" + out);
throw new IOException(out.consoleErr);
}
//Remove if an override conf exists
drillCluster.runCommand("rm -rf " + DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME);
drillCluster.copyToRemote(drillRMConfFilePath, DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME);
}

public static boolean sanityTest(Connection connection) {
Expand Down Expand Up @@ -1072,6 +1076,66 @@ public static boolean matches(String actual, String expected) {
return true;
}

/**
* Restart drillbits, ignore IOExceptions, if any.
* This version of the utility uses the restart drillbit script configured.
*
* Refactored out of {@link TestDriver}, kept for backward compatibility.
* Use {@link #restartDrillbits(DrillCluster)} instead.
*/
@Deprecated
public static synchronized int restartDrill() {
int exitCode = 0;
String command = DrillTestDefaults.TEST_ROOT_DIR + "/" + DrillTestDefaults.RESTART_DRILL_SCRIPT;
File commandFile = new File(command);
if (commandFile.exists() && commandFile.canExecute()) {
LOG.info("\n> Executing Post Build Script");
LOG.info("\n>> Path: " + command);
exitCode = Utils.execCmd(command).exitCode;
if (exitCode != 0) {
LOG.error("\n>> Error restarting drillbits");
}
}
return exitCode;
}

/**
* Restart drillbits available as a part of {@link DrillCluster} instance passed.
* @param drillCluster instance of a drill cluster.
*/
public static synchronized void restartDrillbits(final DrillCluster drillCluster) {
Preconditions.checkNotNull(drillCluster, "drillCluster cannot be null!");
drillCluster.runCommand(DRILL_HOME + "/bin/drillbit.sh restart");
sleepForTimeInMillis(DEFAULT_SLEEP_IN_MILLIS);
}

/**
* Utility method to sleep for specified amount of time (in milliseconds).
*
* @param timeInMillis
*/
public static void sleepForTimeInMillis(final long timeInMillis) {
try {
LOG.info("Waiting for " + timeInMillis + "ms .. ");
Thread.sleep(timeInMillis);
} catch (Exception e) {
//Ignore
}
}

/**
* Passed exception is hidden from compiler but re-thrown.
* Used by functional interfaces that allow checked exceptions.
*
* @param e
* @param <E>
* @throws E
*/
@SuppressWarnings("unchecked")
public static <E extends Throwable> void sneakyThrow(Throwable e) throws E {
throw (E) e;
}

public static String getFrameworkVersion() {
String commitID = "";
String commitAuthor = "";
Expand Down

0 comments on commit 0c07fc7

Please sign in to comment.