From 13fd7776d7629c222af63d9a4f38a426fc5ed163 Mon Sep 17 00:00:00 2001 From: Thomas D'Silva Date: Tue, 4 Jun 2019 16:46:43 -0700 Subject: [PATCH] PHOENIX-5316 Use callable instead of runnable so that Pherf exceptions cause tests to fail --- .../org/apache/phoenix/pherf/PherfMainIT.java | 17 ++++-- .../java/org/apache/phoenix/pherf/Pherf.java | 7 ++- .../pherf/configuration/DataTypeMapping.java | 4 +- .../phoenix/pherf/configuration/Scenario.java | 2 + .../pherf/configuration/XMLConfigParser.java | 3 +- .../phoenix/pherf/jmx/MonitorManager.java | 9 ++- .../phoenix/pherf/rules/RulesApplier.java | 13 ++++- .../pherf/workload/MultiThreadedRunner.java | 27 ++++----- .../pherf/workload/MultithreadedDiffer.java | 6 +- .../phoenix/pherf/workload/QueryExecutor.java | 56 +++++++++---------- .../phoenix/pherf/workload/Workload.java | 4 +- .../pherf/workload/WorkloadExecutor.java | 5 +- .../phoenix/pherf/workload/WriteWorkload.java | 53 +++++++++++------- .../scenario/prod_test_unsalted_scenario.xml | 6 +- 14 files changed, 129 insertions(+), 83 deletions(-) diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java index 2407ef4bcf8..6dc900e1e87 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java @@ -22,15 +22,24 @@ import org.junit.Test; import org.junit.contrib.java.lang.system.ExpectedSystemExit; +import java.util.concurrent.Future; + public class PherfMainIT extends ResultBaseTestIT { @Rule public final ExpectedSystemExit exit = ExpectedSystemExit.none(); @Test - public void testPherfMain() { - String[] args = { "-q", - "--scenarioFile", ".*prod_test_unsalted_scenario.*", + public void testPherfMain() throws Exception { + String[] args = { "-q", "-l", + "--schemaFile", ".*create_prod_test_unsalted.sql", + "--scenarioFile", ".*prod_test_unsalted_scenario.xml", "-m", "--monitorFrequency", "10" }; - Pherf.main(args); + Pherf pherf = new Pherf(args); + pherf.run(); + + // verify that none of the scenarios threw any exceptions + for (Future future : pherf.workloadExecutor.jobs.values()) { + future.get(); + } } } \ No newline at end of file diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java index d92ffdeb898..51d6743dfd7 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Properties; +import com.google.common.annotations.VisibleForTesting; +import jline.internal.TestAccessible; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -115,6 +117,9 @@ public class Pherf { private final boolean thinDriver; private final String queryServerUrl; + @VisibleForTesting + WorkloadExecutor workloadExecutor; + public Pherf(String[] args) throws Exception { CommandLineParser parser = new PosixParser(); CommandLine command = null; @@ -201,7 +206,7 @@ public static void main(String[] args) { public void run() throws Exception { MonitorManager monitorManager = null; List workloads = new ArrayList<>(); - WorkloadExecutor workloadExecutor = new WorkloadExecutor(properties, workloads, !isFunctional); + workloadExecutor = new WorkloadExecutor(properties, workloads, !isFunctional); try { if (listFiles) { ResourceList list = new ResourceList(PherfConstants.RESOURCE_DATAMODEL); diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java index 0476df2d96b..129bdc22ee4 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java @@ -29,7 +29,9 @@ public enum DataTypeMapping { UNSIGNED_LONG("UNSIGNED_LONG", Types.LONGVARCHAR), VARCHAR_ARRAY("VARCHAR ARRAY", Types.ARRAY), VARBINARY("VARBINARY", Types.VARBINARY), - TIMESTAMP("TIMESTAMP", Types.TIMESTAMP); + TIMESTAMP("TIMESTAMP", Types.TIMESTAMP), + BIGINT("BIGINT", Types.BIGINT), + TINYINT("TINYINT", Types.TINYINT); private final String sType; diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java index c867ae1f05b..513445ebcff 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java @@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlElementWrapper; import javax.xml.bind.annotation.XmlRootElement; +import com.google.common.base.Preconditions; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.phoenix.pherf.util.PhoenixUtil; @@ -161,6 +162,7 @@ public String getTableNameWithoutSchemaName() { */ @XmlAttribute() public String getName() { + Preconditions.checkNotNull(name); return name; } diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java index a0ee471d020..8f2a1d8a67c 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java @@ -96,7 +96,8 @@ public synchronized List getScenarios() throws Exception { scenarios.add(scenario); } } catch (JAXBException e) { - e.printStackTrace(); + logger.error("Unable to parse scenario file "+path, e); + throw e; } } return scenarios; diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java index bb299025674..5c434d87017 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -91,9 +92,9 @@ public MonitorManager(long monitorFrequency) throws Exception { this.shouldStop.set(true); } - @Override public Runnable execute() { - return new Runnable() { - @Override public void run() { + @Override public Callable execute() { + return new Callable() { + @Override public Void call() throws Exception { try { while (!shouldStop()) { isRunning.set(true); @@ -131,6 +132,7 @@ public MonitorManager(long monitorFrequency) throws Exception { } catch (Exception e) { Thread.currentThread().interrupt(); e.printStackTrace(); + throw e; } } } @@ -144,6 +146,7 @@ public MonitorManager(long monitorFrequency) throws Exception { throw new FileLoaderRuntimeException("Could not close monitor results.", e); } } + return null; } }; } diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java index da0a172ab9d..662037e4db7 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java @@ -35,6 +35,7 @@ import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; public class RulesApplier { @@ -228,24 +229,30 @@ public DataValue getDataValue(Column column) throws Exception{ data = new DataValue(column.getType(), String.valueOf(dbl)); } break; + case TINYINT: case INTEGER: if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) { data = pickDataValueFromList(dataValues); } else { int minInt = (int) column.getMinValue(); int maxInt = (int) column.getMaxValue(); - Preconditions.checkArgument((minInt > 0) && (maxInt > 0), "min and max values need to be set in configuration for integers " + column.getName()); - int intVal = RandomUtils.nextInt(minInt, maxInt); + if (column.getType() == DataTypeMapping.TINYINT) { + Preconditions.checkArgument((minInt >= -128) && (minInt <= 128), "min value need to be set in configuration for tinyints " + column.getName()); + Preconditions.checkArgument((maxInt >= -128) && (maxInt <= 128), "max value need to be set in configuration for tinyints " + column.getName()); + } + int intVal = ThreadLocalRandom.current().nextInt(minInt, maxInt + 1); data = new DataValue(column.getType(), String.valueOf(intVal)); } break; + case BIGINT: case UNSIGNED_LONG: if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) { data = pickDataValueFromList(dataValues); } else { long minLong = column.getMinValue(); long maxLong = column.getMaxValue(); - Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName()); + if (column.getType() == DataTypeMapping.UNSIGNED_LONG) + Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName()); long longVal = RandomUtils.nextLong(minLong, maxLong); data = new DataValue(column.getType(), String.valueOf(longVal)); } diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java index 7b9313f8f39..4423bbdd552 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java @@ -23,6 +23,7 @@ import java.sql.ResultSet; import java.util.Calendar; import java.util.Date; +import java.util.concurrent.Callable; import org.apache.phoenix.pherf.result.DataModelResult; import org.apache.phoenix.pherf.result.ResultManager; @@ -38,7 +39,7 @@ import org.apache.phoenix.pherf.configuration.XMLConfigParser; import org.apache.phoenix.pherf.util.PhoenixUtil; -class MultiThreadedRunner implements Runnable { +class MultiThreadedRunner implements Callable { private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class); private Query query; private ThreadTime threadTime; @@ -85,29 +86,28 @@ class MultiThreadedRunner implements Runnable { * Executes run for a minimum of number of execution or execution duration */ @Override - public void run() { + public Void call() throws Exception { logger.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for " + numberOfExecutions + "times\n\n"); Long start = System.currentTimeMillis(); for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start) < executionDurationInMs)); i--) { - try { - synchronized (resultManager) { - timedQuery(); - if ((System.currentTimeMillis() - lastResultWritten) > 1000) { - resultManager.write(dataModelResult, ruleApplier); - lastResultWritten = System.currentTimeMillis(); - } + synchronized (workloadExecutor) { + timedQuery(); + if ((System.currentTimeMillis() - lastResultWritten) > 1000) { + resultManager.write(dataModelResult, ruleApplier); + lastResultWritten = System.currentTimeMillis(); } - } catch (Exception e) { - e.printStackTrace(); } } // Make sure all result have been dumped before exiting - resultManager.flush(); + synchronized (workloadExecutor) { + resultManager.flush(); + } logger.info("\n\nThread exiting." + threadName + "\n\n"); + return null; } private synchronized ThreadTime getThreadTime() { @@ -165,8 +165,9 @@ private void timedQuery() throws Exception { conn.commit(); } } catch (Exception e) { - e.printStackTrace(); + logger.error("Exception while executing query", e); exception = e.getMessage(); + throw e; } finally { getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount, (int) (System.currentTimeMillis() - start))); diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java index decff51d3c6..6e828bdf426 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java @@ -20,6 +20,7 @@ import java.util.Calendar; import java.util.Date; +import java.util.concurrent.Callable; import org.apache.phoenix.pherf.PherfConstants; import org.apache.phoenix.pherf.configuration.Query; @@ -29,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class MultithreadedDiffer implements Runnable { +class MultithreadedDiffer implements Callable { private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class); private Thread t; private Query query; @@ -80,7 +81,7 @@ private void diffQuery() throws Exception { /** * Executes verification runs for a minimum of number of execution or execution duration */ - public void run() { + public Void call() throws Exception { logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for " + numberOfExecutions + "times\n\n"); Long start = System.currentTimeMillis(); @@ -93,5 +94,6 @@ public void run() { } } logger.info("\n\nThread exiting." + t.getName() + "\n\n"); + return null; } } \ No newline at end of file diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java index 8d0ced56347..c4a3517fdd4 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -78,16 +79,16 @@ public void complete() {} * @throws Exception */ @Override - public Runnable execute() throws Exception { - Runnable runnable = null; + public Callable execute() throws Exception { + Callable callable = null; for (DataModel dataModel : dataModels) { if (exportCSV) { - runnable = exportAllScenarios(dataModel); + callable = exportAllScenarios(dataModel); } else { - runnable = executeAllScenarios(dataModel); + callable = executeAllScenarios(dataModel); } } - return runnable; + return callable; } /** @@ -96,12 +97,11 @@ public Runnable execute() throws Exception { * @param dataModel * @throws Exception */ - protected Runnable exportAllScenarios(final DataModel dataModel) throws Exception { - return new Runnable() { + protected Callable exportAllScenarios(final DataModel dataModel) throws Exception { + return new Callable() { @Override - public void run() { + public Void call() throws Exception { try { - List scenarios = dataModel.getScenarios(); QueryVerifier exportRunner = new QueryVerifier(false); for (Scenario scenario : scenarios) { @@ -113,8 +113,10 @@ public void run() { } } } catch (Exception e) { - logger.warn("", e); + logger.error("Scenario throws exception", e); + throw e; } + return null; } }; } @@ -125,9 +127,9 @@ public void run() { * @param dataModel * @throws Exception */ - protected Runnable executeAllScenarios(final DataModel dataModel) throws Exception { - return new Runnable() { - @Override public void run() { + protected Callable executeAllScenarios(final DataModel dataModel) throws Exception { + return new Callable() { + @Override public Void call() throws Exception { List dataModelResults = new ArrayList<>(); DataModelResult dataModelResult = @@ -163,8 +165,10 @@ protected Runnable executeAllScenarios(final DataModel dataModel) throws Excepti resultManager.write(dataModelResults, ruleApplier); resultManager.flush(); } catch (Exception e) { - logger.warn("", e); + logger.error("Scenario throws exception", e); + throw e; } + return null; } }; } @@ -179,7 +183,7 @@ protected Runnable executeAllScenarios(final DataModel dataModel) throws Excepti * @throws InterruptedException */ protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet, - QuerySetResult querySetResult, Scenario scenario) throws InterruptedException { + QuerySetResult querySetResult, Scenario scenario) throws ExecutionException, InterruptedException { for (Query query : querySet.getQuery()) { QueryResult queryResult = new QueryResult(query); querySetResult.getQueryResults().add(queryResult); @@ -190,7 +194,7 @@ protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet q for (int i = 0; i < cr; i++) { - Runnable + Callable thread = executeRunner((i + 1) + "," + cr, dataModelResult, queryResult, querySetResult, scenario); @@ -198,11 +202,7 @@ protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet q } for (Future thread : threads) { - try { - thread.get(); - } catch (ExecutionException e) { - logger.error("", e); - } + thread.get(); } } } @@ -217,7 +217,7 @@ protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet q * @throws InterruptedException */ protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet, - QuerySetResult querySetResult, Scenario scenario) throws InterruptedException { + QuerySetResult querySetResult, Scenario scenario) throws ExecutionException, InterruptedException { for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) { List threads = new ArrayList<>(); for (int i = 0; i < cr; i++) { @@ -225,7 +225,7 @@ protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet QueryResult queryResult = new QueryResult(query); querySetResult.getQueryResults().add(queryResult); - Runnable + Callable thread = executeRunner((i + 1) + "," + cr, dataModelResult, queryResult, querySetResult, scenario); @@ -233,11 +233,7 @@ protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet } for (Future thread : threads) { - try { - thread.get(); - } catch (ExecutionException e) { - logger.error("", e); - } + thread.get(); } } } @@ -253,14 +249,14 @@ protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet * @param scenario * @return */ - protected Runnable executeRunner(String name, DataModelResult dataModelResult, + protected Callable executeRunner(String name, DataModelResult dataModelResult, QueryResult queryResult, QuerySet querySet, Scenario scenario) { ThreadTime threadTime = new ThreadTime(); queryResult.getThreadTimes().add(threadTime); threadTime.setThreadName(name); queryResult.setHint(this.queryHint); logger.info("\nExecuting query " + queryResult.getStatement()); - Runnable thread; + Callable thread; if (workloadExecutor.isPerformance()) { thread = new MultiThreadedRunner(threadTime.getThreadName(), queryResult, diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java index 882fa50af61..0532201dc2a 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java @@ -17,8 +17,10 @@ */ package org.apache.phoenix.pherf.workload; +import java.util.concurrent.Callable; + public interface Workload { - public Runnable execute() throws Exception; + public Callable execute() throws Exception; /** * Use this method to perform any cleanup or forced shutdown of the thread. diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java index 3cde7ae172a..4abb574d725 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java @@ -18,6 +18,8 @@ package org.apache.phoenix.pherf.workload; +import com.google.common.annotations.VisibleForTesting; +import jline.internal.TestAccessible; import org.apache.phoenix.pherf.PherfConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +36,8 @@ public class WorkloadExecutor { private final boolean isPerformance; // Jobs can be accessed by multiple threads - private final Map jobs = new ConcurrentHashMap<>(); + @VisibleForTesting + public final Map jobs = new ConcurrentHashMap<>(); private final ExecutorService pool; diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java index b340a2bcf5e..cae223c3664 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java @@ -151,9 +151,9 @@ public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigPa pool.shutdownNow(); } - public Runnable execute() throws Exception { - return new Runnable() { - @Override public void run() { + public Callable execute() throws Exception { + return new Callable() { + @Override public Void call() throws Exception { try { DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary(); DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime(); @@ -169,8 +169,10 @@ public Runnable execute() throws Exception { resultUtil.write(dataLoadThreadTime); } catch (Exception e) { - logger.warn("", e); + logger.error("WriteWorkLoad failed", e); + throw e; } + return null; } }; } @@ -292,21 +294,17 @@ public Future upsertData(final Scenario scenario, final List colum rowsCreated += result; } } - try { - connection.commit(); - duration = System.currentTimeMillis() - last; - logger.info("Writer (" + Thread.currentThread().getName() - + ") committed Batch. Total " + getBatchSize() - + " rows for this thread (" + this.hashCode() + ") in (" - + duration + ") Ms"); - - if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) { - dataLoadThreadTime.add(tableName, - Thread.currentThread().getName(), i, - System.currentTimeMillis() - logStartTime); - } - } catch (SQLException e) { - logger.warn("SQLException in commit operation", e); + connection.commit(); + duration = System.currentTimeMillis() - last; + logger.info("Writer (" + Thread.currentThread().getName() + + ") committed Batch. Total " + getBatchSize() + + " rows for this thread (" + this.hashCode() + ") in (" + + duration + ") Ms"); + + if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) { + dataLoadThreadTime.add(tableName, + Thread.currentThread().getName(), i, + System.currentTimeMillis() - logStartTime); } logStartTime = System.currentTimeMillis(); @@ -317,6 +315,7 @@ public Future upsertData(final Scenario scenario, final List colum } } } catch (SQLException e) { + logger.error("Scenario " + scenario.getName() + " failed with exception ", e); throw e; } finally { // Need to keep the statement open to send the remaining batch of updates @@ -396,11 +395,25 @@ private PreparedStatement buildStatement(Scenario scenario, List columns break; case UNSIGNED_LONG: if (dataValue.getValue().equals("")) { - statement.setNull(count, Types.LONGVARCHAR); + statement.setNull(count, Types.OTHER); + } else { + statement.setLong(count, Long.parseLong(dataValue.getValue())); + } + break; + case BIGINT: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.BIGINT); } else { statement.setLong(count, Long.parseLong(dataValue.getValue())); } break; + case TINYINT: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.TINYINT); + } else { + statement.setLong(count, Integer.parseInt(dataValue.getValue())); + } + break; case DATE: if (dataValue.getValue().equals("")) { statement.setNull(count, Types.DATE); diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml index 1c32b75f90a..fb89ef34721 100644 --- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml +++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml @@ -316,12 +316,12 @@ - CREATE INDEX IDX_DIVISION ON PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION) + - CREATE INDEX IDX_OLDVAL_STRING ON PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING) - CREATE INDEX IDX_CONNECTION_ID ON PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID) + +