Skip to content

Commit

Permalink
PHOENIX-5316 Use callable instead of runnable so that Pherf exception…
Browse files Browse the repository at this point in the history
…s cause tests to fail
  • Loading branch information
twdsilva committed Jun 6, 2019
1 parent 5303d29 commit 13fd777
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 83 deletions.
Expand Up @@ -22,15 +22,24 @@
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;

import java.util.concurrent.Future;

public class PherfMainIT extends ResultBaseTestIT {
@Rule
public final ExpectedSystemExit exit = ExpectedSystemExit.none();

@Test
public void testPherfMain() {
String[] args = { "-q",
"--scenarioFile", ".*prod_test_unsalted_scenario.*",
public void testPherfMain() throws Exception {
String[] args = { "-q", "-l",
"--schemaFile", ".*create_prod_test_unsalted.sql",
"--scenarioFile", ".*prod_test_unsalted_scenario.xml",
"-m", "--monitorFrequency", "10" };
Pherf.main(args);
Pherf pherf = new Pherf(args);
pherf.run();

// verify that none of the scenarios threw any exceptions
for (Future<Void> future : pherf.workloadExecutor.jobs.values()) {
future.get();
}
}
}
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Properties;

import com.google.common.annotations.VisibleForTesting;
import jline.internal.TestAccessible;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
Expand Down Expand Up @@ -115,6 +117,9 @@ public class Pherf {
private final boolean thinDriver;
private final String queryServerUrl;

@VisibleForTesting
WorkloadExecutor workloadExecutor;

public Pherf(String[] args) throws Exception {
CommandLineParser parser = new PosixParser();
CommandLine command = null;
Expand Down Expand Up @@ -201,7 +206,7 @@ public static void main(String[] args) {
public void run() throws Exception {
MonitorManager monitorManager = null;
List<Workload> workloads = new ArrayList<>();
WorkloadExecutor workloadExecutor = new WorkloadExecutor(properties, workloads, !isFunctional);
workloadExecutor = new WorkloadExecutor(properties, workloads, !isFunctional);
try {
if (listFiles) {
ResourceList list = new ResourceList(PherfConstants.RESOURCE_DATAMODEL);
Expand Down
Expand Up @@ -29,7 +29,9 @@ public enum DataTypeMapping {
UNSIGNED_LONG("UNSIGNED_LONG", Types.LONGVARCHAR),
VARCHAR_ARRAY("VARCHAR ARRAY", Types.ARRAY),
VARBINARY("VARBINARY", Types.VARBINARY),
TIMESTAMP("TIMESTAMP", Types.TIMESTAMP);
TIMESTAMP("TIMESTAMP", Types.TIMESTAMP),
BIGINT("BIGINT", Types.BIGINT),
TINYINT("TINYINT", Types.TINYINT);

private final String sType;

Expand Down
Expand Up @@ -27,6 +27,7 @@
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.phoenix.pherf.util.PhoenixUtil;

Expand Down Expand Up @@ -161,6 +162,7 @@ public String getTableNameWithoutSchemaName() {
*/
@XmlAttribute()
public String getName() {
Preconditions.checkNotNull(name);
return name;
}

Expand Down
Expand Up @@ -96,7 +96,8 @@ public synchronized List<Scenario> getScenarios() throws Exception {
scenarios.add(scenario);
}
} catch (JAXBException e) {
e.printStackTrace();
logger.error("Unable to parse scenario file "+path, e);
throw e;
}
}
return scenarios;
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -91,9 +92,9 @@ public MonitorManager(long monitorFrequency) throws Exception {
this.shouldStop.set(true);
}

@Override public Runnable execute() {
return new Runnable() {
@Override public void run() {
@Override public Callable<Void> execute() {
return new Callable<Void>() {
@Override public Void call() throws Exception {
try {
while (!shouldStop()) {
isRunning.set(true);
Expand Down Expand Up @@ -131,6 +132,7 @@ public MonitorManager(long monitorFrequency) throws Exception {
} catch (Exception e) {
Thread.currentThread().interrupt();
e.printStackTrace();
throw e;
}
}
}
Expand All @@ -144,6 +146,7 @@ public MonitorManager(long monitorFrequency) throws Exception {
throw new FileLoaderRuntimeException("Could not close monitor results.", e);
}
}
return null;
}
};
}
Expand Down
Expand Up @@ -35,6 +35,7 @@

import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

public class RulesApplier {
Expand Down Expand Up @@ -228,24 +229,30 @@ public DataValue getDataValue(Column column) throws Exception{
data = new DataValue(column.getType(), String.valueOf(dbl));
}
break;
case TINYINT:
case INTEGER:
if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
data = pickDataValueFromList(dataValues);
} else {
int minInt = (int) column.getMinValue();
int maxInt = (int) column.getMaxValue();
Preconditions.checkArgument((minInt > 0) && (maxInt > 0), "min and max values need to be set in configuration for integers " + column.getName());
int intVal = RandomUtils.nextInt(minInt, maxInt);
if (column.getType() == DataTypeMapping.TINYINT) {
Preconditions.checkArgument((minInt >= -128) && (minInt <= 128), "min value need to be set in configuration for tinyints " + column.getName());
Preconditions.checkArgument((maxInt >= -128) && (maxInt <= 128), "max value need to be set in configuration for tinyints " + column.getName());
}
int intVal = ThreadLocalRandom.current().nextInt(minInt, maxInt + 1);
data = new DataValue(column.getType(), String.valueOf(intVal));
}
break;
case BIGINT:
case UNSIGNED_LONG:
if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
data = pickDataValueFromList(dataValues);
} else {
long minLong = column.getMinValue();
long maxLong = column.getMaxValue();
Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName());
if (column.getType() == DataTypeMapping.UNSIGNED_LONG)
Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName());
long longVal = RandomUtils.nextLong(minLong, maxLong);
data = new DataValue(column.getType(), String.valueOf(longVal));
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.sql.ResultSet;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;

import org.apache.phoenix.pherf.result.DataModelResult;
import org.apache.phoenix.pherf.result.ResultManager;
Expand All @@ -38,7 +39,7 @@
import org.apache.phoenix.pherf.configuration.XMLConfigParser;
import org.apache.phoenix.pherf.util.PhoenixUtil;

class MultiThreadedRunner implements Runnable {
class MultiThreadedRunner implements Callable<Void> {
private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
private Query query;
private ThreadTime threadTime;
Expand Down Expand Up @@ -85,29 +86,28 @@ class MultiThreadedRunner implements Runnable {
* Executes run for a minimum of number of execution or execution duration
*/
@Override
public void run() {
public Void call() throws Exception {
logger.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
+ numberOfExecutions + "times\n\n");
Long start = System.currentTimeMillis();
for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
< executionDurationInMs)); i--) {
try {
synchronized (resultManager) {
timedQuery();
if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
resultManager.write(dataModelResult, ruleApplier);
lastResultWritten = System.currentTimeMillis();
}
synchronized (workloadExecutor) {
timedQuery();
if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
resultManager.write(dataModelResult, ruleApplier);
lastResultWritten = System.currentTimeMillis();
}
} catch (Exception e) {
e.printStackTrace();
}
}

// Make sure all result have been dumped before exiting
resultManager.flush();
synchronized (workloadExecutor) {
resultManager.flush();
}

logger.info("\n\nThread exiting." + threadName + "\n\n");
return null;
}

private synchronized ThreadTime getThreadTime() {
Expand Down Expand Up @@ -165,8 +165,9 @@ private void timedQuery() throws Exception {
conn.commit();
}
} catch (Exception e) {
e.printStackTrace();
logger.error("Exception while executing query", e);
exception = e.getMessage();
throw e;
} finally {
getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
(int) (System.currentTimeMillis() - start)));
Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;

import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.Query;
Expand All @@ -29,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MultithreadedDiffer implements Runnable {
class MultithreadedDiffer implements Callable<Void> {
private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
private Thread t;
private Query query;
Expand Down Expand Up @@ -80,7 +81,7 @@ private void diffQuery() throws Exception {
/**
* Executes verification runs for a minimum of number of execution or execution duration
*/
public void run() {
public Void call() throws Exception {
logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
+ numberOfExecutions + "times\n\n");
Long start = System.currentTimeMillis();
Expand All @@ -93,5 +94,6 @@ public void run() {
}
}
logger.info("\n\nThread exiting." + t.getName() + "\n\n");
return null;
}
}

0 comments on commit 13fd777

Please sign in to comment.