From 135b978aea1bb5cf0053698583a2002e4452e5b0 Mon Sep 17 00:00:00 2001 From: Mujtaba Date: Fri, 18 Dec 2015 14:29:00 -0800 Subject: [PATCH] PHOENIX-2524 Fixes for pherf to run with queryserver (elserj) --- .../src/build/components/all-common-jars.xml | 8 +++ .../java/org/apache/phoenix/pherf/Pherf.java | 19 ++++++- .../phoenix/pherf/result/ResultUtil.java | 13 ++++- .../phoenix/pherf/result/file/Header.java | 1 + .../pherf/result/file/ResultFileDetails.java | 1 + .../phoenix/pherf/util/PhoenixUtil.java | 57 +++++++++++++++---- .../phoenix/pherf/workload/WriteWorkload.java | 33 ++++++----- 7 files changed, 105 insertions(+), 27 deletions(-) diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml index bed9f254a9a..960c3c982dc 100644 --- a/phoenix-assembly/src/build/components/all-common-jars.xml +++ b/phoenix-assembly/src/build/components/all-common-jars.xml @@ -104,5 +104,13 @@ 0644 + + ${project.basedir}/../phoenix-pherf/target/ + lib + + phoenix-*.jar + + 0644 + diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java index eaf199aec8d..b84183b5ccb 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java @@ -89,6 +89,8 @@ public class Pherf { options.addOption("label", true, "Label a run. Result file name will be suffixed with specified label"); options.addOption("compare", true, "Specify labeled run(s) to compare"); options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time."); + options.addOption("t", "thin", false, "Use the Phoenix Thin Driver"); + options.addOption("s", "server", true, "The URL for the Phoenix QueryServer"); } private final String zookeeper; @@ -109,6 +111,8 @@ public class Pherf { private final String label; private final String compareResults; private final CompareType compareType; + private final boolean thinDriver; + private final String queryServerUrl; public Pherf(String[] args) throws Exception { CommandLineParser parser = new PosixParser(); @@ -155,14 +159,27 @@ public Pherf(String[] args) throws Exception { label = command.getOptionValue("label", null); compareResults = command.getOptionValue("compare", null); compareType = command.hasOption("useAverageCompareType") ? CompareType.AVERAGE : CompareType.MINIMUM; + thinDriver = command.hasOption("thin"); + if (thinDriver) { + queryServerUrl = command.getOptionValue("server", "http://localhost:8765"); + } else { + queryServerUrl = null; + } if ((command.hasOption("h") || (args == null || args.length == 0)) && !command .hasOption("listFiles")) { hf.printHelp("Pherf", options); System.exit(1); } - PhoenixUtil.setZookeeper(zookeeper); PhoenixUtil.setRowCountOverride(rowCountOverride); + if (!thinDriver) { + logger.info("Using thick driver with ZooKeepers '{}'", zookeeper); + PhoenixUtil.setZookeeper(zookeeper); + } else { + logger.info("Using thin driver with PQS '{}'", queryServerUrl); + // Enables the thin-driver and sets the PQS URL + PhoenixUtil.useThinDriver(queryServerUrl); + } ResultUtil.setFileSuffix(label); } diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java index 92eb80a9b31..0c2a7b8f44b 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java @@ -84,7 +84,12 @@ public synchronized void write(DataLoadTimeSummary dataLoadTime) throws IOExcept ensureBaseResultDirExists(); CSVResultHandler writer = null; - ResultFileDetails resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD; + ResultFileDetails resultFileDetails; + if (PhoenixUtil.isThinDriver()) { + resultFileDetails = ResultFileDetails.CSV_THIN_AGGREGATE_DATA_LOAD; + } else { + resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD; + } try { writer = new CSVFileResultHandler(); writer.setResultFileDetails(resultFileDetails); @@ -92,7 +97,11 @@ public synchronized void write(DataLoadTimeSummary dataLoadTime) throws IOExcept for (TableLoadTime loadTime : dataLoadTime.getTableLoadTime()) { List rowValues = new ArrayList<>(); - rowValues.add(new ResultValue(PhoenixUtil.getZookeeper())); + if (PhoenixUtil.isThinDriver()) { + rowValues.add(new ResultValue(PhoenixUtil.getQueryServerUrl())); + } else { + rowValues.add(new ResultValue(PhoenixUtil.getZookeeper())); + } rowValues.addAll(loadTime.getCsvRepresentation(this)); Result result = diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java index 066fa7acc23..7d09f683d8e 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java @@ -28,6 +28,7 @@ public enum Header { DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"), DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"), AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"), + THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"), MONITOR("STAT_NAME,STAT_VALUE,TIME_STAMP"); private String header; diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java index a85f830d469..51aa407df8a 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java @@ -24,6 +24,7 @@ public enum ResultFileDetails { CSV_DETAILED_PERFORMANCE(Header.DETAILED_PERFORMANCE, Extension.DETAILED_CSV), CSV_DETAILED_FUNCTIONAL(Header.DETAILED_FUNCTIONAL, Extension.DETAILED_CSV), CSV_AGGREGATE_DATA_LOAD(Header.AGGREGATE_DATA_LOAD, Extension.CSV), + CSV_THIN_AGGREGATE_DATA_LOAD(Header.THIN_AGGREGATE_DATA_LOAD, Extension.CSV), CSV_MONITOR(Header.MONITOR, Extension.CSV), XML(Header.EMPTY, Extension.XML), IMAGE(Header.EMPTY, Extension.VISUALIZATION); diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java index 064c60451c7..b7788338630 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Properties; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; @@ -39,6 +40,8 @@ public class PhoenixUtil { private static int rowCountOverride = 0; private boolean testEnabled; private static PhoenixUtil instance; + private static boolean useThinDriver; + private static String queryServerUrl; private PhoenixUtil() { this(false); @@ -57,6 +60,19 @@ public static PhoenixUtil create(final boolean testEnabled) { return instance; } + public static void useThinDriver(String queryServerUrl) { + PhoenixUtil.useThinDriver = true; + PhoenixUtil.queryServerUrl = Objects.requireNonNull(queryServerUrl); + } + + public static String getQueryServerUrl() { + return PhoenixUtil.queryServerUrl; + } + + public static boolean isThinDriver() { + return PhoenixUtil.useThinDriver; + } + public Connection getConnection() throws Exception { return getConnection(null); } @@ -66,17 +82,31 @@ public Connection getConnection(String tenantId) throws Exception { } private Connection getConnection(String tenantId, boolean testEnabled) throws Exception { - if (null == zookeeper) { - throw new IllegalArgumentException( - "Zookeeper must be set before initializing connection!"); - } - Properties props = new Properties(); - if (null != tenantId) { - props.setProperty("TenantId", tenantId); - logger.debug("\nSetting tenantId to " + tenantId); + if (useThinDriver) { + if (null == queryServerUrl) { + throw new IllegalArgumentException("QueryServer URL must be set before" + + " initializing connection"); + } + Properties props = new Properties(); + if (null != tenantId) { + props.setProperty("TenantId", tenantId); + logger.debug("\nSetting tenantId to " + tenantId); + } + String url = "jdbc:phoenix:thin:url=" + queryServerUrl + ";serialization=PROTOBUF"; + return DriverManager.getConnection(url, props); + } else { + if (null == zookeeper) { + throw new IllegalArgumentException( + "Zookeeper must be set before initializing connection!"); + } + Properties props = new Properties(); + if (null != tenantId) { + props.setProperty("TenantId", tenantId); + logger.debug("\nSetting tenantId to " + tenantId); + } + String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : ""); + return DriverManager.getConnection(url, props); } - String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : ""); - return DriverManager.getConnection(url, props); } public boolean executeStatement(String sql, Scenario scenario) throws Exception { @@ -278,7 +308,12 @@ public static String getZookeeper() { public static void setZookeeper(String zookeeper) { logger.info("Setting zookeeper: " + zookeeper); - PhoenixUtil.zookeeper = zookeeper; + useThickDriver(zookeeper); + } + + public static void useThickDriver(String zookeeper) { + PhoenixUtil.useThinDriver = false; + PhoenixUtil.zookeeper = Objects.requireNonNull(zookeeper); } public static int getRowCountOverride() { diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java index 7c73e2234a5..a35e6e88bf5 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java @@ -168,7 +168,7 @@ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary, // Execute any Scenario DDL before running workload pUtil.executeScenarioDdl(scenario); - List writeBatches = getBatches(dataLoadThreadTime, scenario); + List> writeBatches = getBatches(dataLoadThreadTime, scenario); waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches); @@ -182,12 +182,12 @@ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary, } } - private List getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) + private List> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception { RowCalculator rowCalculator = new RowCalculator(getThreadPoolSize(), scenario.getRowCount()); - List writeBatches = new ArrayList<>(); + List> writeBatches = new ArrayList<>(); for (int i = 0; i < getThreadPoolSize(); i++) { List @@ -212,7 +212,7 @@ private List getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario } private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario, - long start, List writeBatches) + long start, List> writeBatches) throws InterruptedException, java.util.concurrent.ExecutionException { int sumRows = 0, sumDuration = 0; // Wait for all the batch threads to complete @@ -223,10 +223,12 @@ private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario sc logger.info("Executor (" + this.hashCode() + ") writes complete with row count (" + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")"); } - logger.info("Writes completed with total row count (" + sumRows + ") with total time of(" - + sumDuration + ") Ms"); + long testDuration = System.currentTimeMillis() - start; + logger.info("Writes completed with total row count (" + sumRows + + ") with total elapsed time of (" + testDuration + + ") ms and total CPU execution time of (" + sumDuration + ") ms"); dataLoadTimeSummary - .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start)); + .add(scenario.getTableName(), sumRows, (int) testDuration); } public Future upsertData(final Scenario scenario, final List columns, @@ -235,9 +237,10 @@ public Future upsertData(final Scenario scenario, final List colum Future future = pool.submit(new Callable() { @Override public Info call() throws Exception { int rowsCreated = 0; - long start = 0, duration, totalDuration; + long start = 0, last = 0, duration, totalDuration; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Connection connection = null; + PreparedStatement stmt = null; try { connection = pUtil.getConnection(scenario.getTenantId()); long logStartTime = System.currentTimeMillis(); @@ -247,17 +250,16 @@ public Future upsertData(final Scenario scenario, final List colum Long.MAX_VALUE : WriteWorkload.this.writeParams.getExecutionDurationInMs(); + last = start = System.currentTimeMillis(); + String sql = buildSql(columns, tableName); + stmt = connection.prepareStatement(sql); for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime) < maxDuration); i--) { - String sql = buildSql(columns, tableName); - PreparedStatement stmt = connection.prepareStatement(sql); stmt = buildStatement(scenario, columns, stmt, simpleDateFormat); - start = System.currentTimeMillis(); rowsCreated += stmt.executeUpdate(); - stmt.close(); if ((i % getBatchSize()) == 0) { connection.commit(); - duration = System.currentTimeMillis() - start; + duration = System.currentTimeMillis() - last; logger.info("Writer (" + Thread.currentThread().getName() + ") committed Batch. Total " + getBatchSize() + " rows for this thread (" + this.hashCode() + ") in (" @@ -272,9 +274,14 @@ public Future upsertData(final Scenario scenario, final List colum // Pause for throttling if configured to do so Thread.sleep(threadSleepDuration); + // Re-compute the start time for the next batch + last = System.currentTimeMillis(); } } } finally { + if (stmt != null) { + stmt.close(); + } if (connection != null) { try { connection.commit();