Skip to content

Commit

Permalink
PHOENIX-2524 Fixes for pherf to run with queryserver (elserj)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mujtaba committed Dec 18, 2015
1 parent b38989d commit 135b978
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 27 deletions.
8 changes: 8 additions & 0 deletions phoenix-assembly/src/build/components/all-common-jars.xml
Expand Up @@ -104,5 +104,13 @@
</excludes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/../phoenix-pherf/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>phoenix-*.jar</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
</component>
19 changes: 18 additions & 1 deletion phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
Expand Up @@ -89,6 +89,8 @@ public class Pherf {
options.addOption("label", true, "Label a run. Result file name will be suffixed with specified label");
options.addOption("compare", true, "Specify labeled run(s) to compare");
options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time.");
options.addOption("t", "thin", false, "Use the Phoenix Thin Driver");
options.addOption("s", "server", true, "The URL for the Phoenix QueryServer");
}

private final String zookeeper;
Expand All @@ -109,6 +111,8 @@ public class Pherf {
private final String label;
private final String compareResults;
private final CompareType compareType;
private final boolean thinDriver;
private final String queryServerUrl;

public Pherf(String[] args) throws Exception {
CommandLineParser parser = new PosixParser();
Expand Down Expand Up @@ -155,14 +159,27 @@ public Pherf(String[] args) throws Exception {
label = command.getOptionValue("label", null);
compareResults = command.getOptionValue("compare", null);
compareType = command.hasOption("useAverageCompareType") ? CompareType.AVERAGE : CompareType.MINIMUM;
thinDriver = command.hasOption("thin");
if (thinDriver) {
queryServerUrl = command.getOptionValue("server", "http://localhost:8765");
} else {
queryServerUrl = null;
}

if ((command.hasOption("h") || (args == null || args.length == 0)) && !command
.hasOption("listFiles")) {
hf.printHelp("Pherf", options);
System.exit(1);
}
PhoenixUtil.setZookeeper(zookeeper);
PhoenixUtil.setRowCountOverride(rowCountOverride);
if (!thinDriver) {
logger.info("Using thick driver with ZooKeepers '{}'", zookeeper);
PhoenixUtil.setZookeeper(zookeeper);
} else {
logger.info("Using thin driver with PQS '{}'", queryServerUrl);
// Enables the thin-driver and sets the PQS URL
PhoenixUtil.useThinDriver(queryServerUrl);
}
ResultUtil.setFileSuffix(label);
}

Expand Down
Expand Up @@ -84,15 +84,24 @@ public synchronized void write(DataLoadTimeSummary dataLoadTime) throws IOExcept
ensureBaseResultDirExists();

CSVResultHandler writer = null;
ResultFileDetails resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD;
ResultFileDetails resultFileDetails;
if (PhoenixUtil.isThinDriver()) {
resultFileDetails = ResultFileDetails.CSV_THIN_AGGREGATE_DATA_LOAD;
} else {
resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD;
}
try {
writer = new CSVFileResultHandler();
writer.setResultFileDetails(resultFileDetails);
writer.setResultFileName("Data_Load_Summary");

for (TableLoadTime loadTime : dataLoadTime.getTableLoadTime()) {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
if (PhoenixUtil.isThinDriver()) {
rowValues.add(new ResultValue(PhoenixUtil.getQueryServerUrl()));
} else {
rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
}
rowValues.addAll(loadTime.getCsvRepresentation(this));
Result
result =
Expand Down
Expand Up @@ -28,6 +28,7 @@ public enum Header {
DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
MONITOR("STAT_NAME,STAT_VALUE,TIME_STAMP");

private String header;
Expand Down
Expand Up @@ -24,6 +24,7 @@ public enum ResultFileDetails {
CSV_DETAILED_PERFORMANCE(Header.DETAILED_PERFORMANCE, Extension.DETAILED_CSV),
CSV_DETAILED_FUNCTIONAL(Header.DETAILED_FUNCTIONAL, Extension.DETAILED_CSV),
CSV_AGGREGATE_DATA_LOAD(Header.AGGREGATE_DATA_LOAD, Extension.CSV),
CSV_THIN_AGGREGATE_DATA_LOAD(Header.THIN_AGGREGATE_DATA_LOAD, Extension.CSV),
CSV_MONITOR(Header.MONITOR, Extension.CSV),
XML(Header.EMPTY, Extension.XML),
IMAGE(Header.EMPTY, Extension.VISUALIZATION);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
Expand All @@ -39,6 +40,8 @@ public class PhoenixUtil {
private static int rowCountOverride = 0;
private boolean testEnabled;
private static PhoenixUtil instance;
private static boolean useThinDriver;
private static String queryServerUrl;

private PhoenixUtil() {
this(false);
Expand All @@ -57,6 +60,19 @@ public static PhoenixUtil create(final boolean testEnabled) {
return instance;
}

public static void useThinDriver(String queryServerUrl) {
PhoenixUtil.useThinDriver = true;
PhoenixUtil.queryServerUrl = Objects.requireNonNull(queryServerUrl);
}

public static String getQueryServerUrl() {
return PhoenixUtil.queryServerUrl;
}

public static boolean isThinDriver() {
return PhoenixUtil.useThinDriver;
}

public Connection getConnection() throws Exception {
return getConnection(null);
}
Expand All @@ -66,17 +82,31 @@ public Connection getConnection(String tenantId) throws Exception {
}

private Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
if (null == zookeeper) {
throw new IllegalArgumentException(
"Zookeeper must be set before initializing connection!");
}
Properties props = new Properties();
if (null != tenantId) {
props.setProperty("TenantId", tenantId);
logger.debug("\nSetting tenantId to " + tenantId);
if (useThinDriver) {
if (null == queryServerUrl) {
throw new IllegalArgumentException("QueryServer URL must be set before" +
" initializing connection");
}
Properties props = new Properties();
if (null != tenantId) {
props.setProperty("TenantId", tenantId);
logger.debug("\nSetting tenantId to " + tenantId);
}
String url = "jdbc:phoenix:thin:url=" + queryServerUrl + ";serialization=PROTOBUF";
return DriverManager.getConnection(url, props);
} else {
if (null == zookeeper) {
throw new IllegalArgumentException(
"Zookeeper must be set before initializing connection!");
}
Properties props = new Properties();
if (null != tenantId) {
props.setProperty("TenantId", tenantId);
logger.debug("\nSetting tenantId to " + tenantId);
}
String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
return DriverManager.getConnection(url, props);
}
String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
return DriverManager.getConnection(url, props);
}

public boolean executeStatement(String sql, Scenario scenario) throws Exception {
Expand Down Expand Up @@ -278,7 +308,12 @@ public static String getZookeeper() {

public static void setZookeeper(String zookeeper) {
logger.info("Setting zookeeper: " + zookeeper);
PhoenixUtil.zookeeper = zookeeper;
useThickDriver(zookeeper);
}

public static void useThickDriver(String zookeeper) {
PhoenixUtil.useThinDriver = false;
PhoenixUtil.zookeeper = Objects.requireNonNull(zookeeper);
}

public static int getRowCountOverride() {
Expand Down
Expand Up @@ -168,7 +168,7 @@ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
// Execute any Scenario DDL before running workload
pUtil.executeScenarioDdl(scenario);

List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario);
List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario);

waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);

Expand All @@ -182,12 +182,12 @@ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
}
}

private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
private List<Future<Info>> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
throws Exception {
RowCalculator
rowCalculator =
new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
List<Future> writeBatches = new ArrayList<>();
List<Future<Info>> writeBatches = new ArrayList<>();

for (int i = 0; i < getThreadPoolSize(); i++) {
List<Column>
Expand All @@ -212,7 +212,7 @@ private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario
}

private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario,
long start, List<Future> writeBatches)
long start, List<Future<Info>> writeBatches)
throws InterruptedException, java.util.concurrent.ExecutionException {
int sumRows = 0, sumDuration = 0;
// Wait for all the batch threads to complete
Expand All @@ -223,10 +223,12 @@ private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario sc
logger.info("Executor (" + this.hashCode() + ") writes complete with row count ("
+ writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
}
logger.info("Writes completed with total row count (" + sumRows + ") with total time of("
+ sumDuration + ") Ms");
long testDuration = System.currentTimeMillis() - start;
logger.info("Writes completed with total row count (" + sumRows
+ ") with total elapsed time of (" + testDuration
+ ") ms and total CPU execution time of (" + sumDuration + ") ms");
dataLoadTimeSummary
.add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
.add(scenario.getTableName(), sumRows, (int) testDuration);
}

public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
Expand All @@ -235,9 +237,10 @@ public Future<Info> upsertData(final Scenario scenario, final List<Column> colum
Future<Info> future = pool.submit(new Callable<Info>() {
@Override public Info call() throws Exception {
int rowsCreated = 0;
long start = 0, duration, totalDuration;
long start = 0, last = 0, duration, totalDuration;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Connection connection = null;
PreparedStatement stmt = null;
try {
connection = pUtil.getConnection(scenario.getTenantId());
long logStartTime = System.currentTimeMillis();
Expand All @@ -247,17 +250,16 @@ public Future<Info> upsertData(final Scenario scenario, final List<Column> colum
Long.MAX_VALUE :
WriteWorkload.this.writeParams.getExecutionDurationInMs();

last = start = System.currentTimeMillis();
String sql = buildSql(columns, tableName);
stmt = connection.prepareStatement(sql);
for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
< maxDuration); i--) {
String sql = buildSql(columns, tableName);
PreparedStatement stmt = connection.prepareStatement(sql);
stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
start = System.currentTimeMillis();
rowsCreated += stmt.executeUpdate();
stmt.close();
if ((i % getBatchSize()) == 0) {
connection.commit();
duration = System.currentTimeMillis() - start;
duration = System.currentTimeMillis() - last;
logger.info("Writer (" + Thread.currentThread().getName()
+ ") committed Batch. Total " + getBatchSize()
+ " rows for this thread (" + this.hashCode() + ") in ("
Expand All @@ -272,9 +274,14 @@ public Future<Info> upsertData(final Scenario scenario, final List<Column> colum

// Pause for throttling if configured to do so
Thread.sleep(threadSleepDuration);
// Re-compute the start time for the next batch
last = System.currentTimeMillis();
}
}
} finally {
if (stmt != null) {
stmt.close();
}
if (connection != null) {
try {
connection.commit();
Expand Down

0 comments on commit 135b978

Please sign in to comment.