Skip to content

Commit

Permalink
PHOENIX-2285 Changes to store the query timeout in milliseconds, to a…
Browse files Browse the repository at this point in the history
…llow users to specify timeouts with millisecond granularity via phoenix.query.timeout
  • Loading branch information
jfernandosf committed Oct 2, 2015
1 parent a9bd640 commit ff35d95
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 14 deletions.
@@ -0,0 +1,97 @@
package org.apache.phoenix.iterate;

import static org.junit.Assert.*;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.junit.Test;

/**
* Tests to validate that user specified property phoenix.query.timeoutMs
* works as expected.
*/
public class PhoenixQueryTimeoutIT extends BaseHBaseManagedTimeIT {

@Test
/**
* This test validates that we timeout as expected. It does do by
* setting the timeout value to 1 ms.
*/
public void testCustomQueryTimeoutWithVeryLowTimeout() throws Exception {
// Arrange
PreparedStatement ps = loadDataAndPrepareQuery(1, 1);

// Act + Assert
try {
ResultSet rs = ps.executeQuery();
// Trigger HBase scans by calling rs.next
while (rs.next()) {};
fail("Expected query to timeout with a 1 ms timeout");
} catch (Exception e) {
// Expected
}
}

@Test
public void testCustomQueryTimeoutWithNormalTimeout() throws Exception {
// Arrange
PreparedStatement ps = loadDataAndPrepareQuery(30000, 30);

// Act + Assert
try {
ResultSet rs = ps.executeQuery();
// Trigger HBase scans by calling rs.next
int count = 0;
while (rs.next()) {
count++;
}
assertEquals("Unexpected number of records returned", 1000, count);
} catch (Exception e) {
fail("Expected query to suceed");
}
}


//-----------------------------------------------------------------
// Private Helper Methods
//-----------------------------------------------------------------

private PreparedStatement loadDataAndPrepareQuery(int timeoutMs, int timeoutSecs) throws Exception, SQLException {
createTableAndInsertRows(1000);
Properties props = new Properties();
props.setProperty("phoenix.query.timeoutMs", String.valueOf(timeoutMs));
Connection conn = DriverManager.getConnection(getUrl(), props);
PreparedStatement ps = conn.prepareStatement("SELECT * FROM QUERY_TIMEOUT_TEST");
PhoenixStatement phoenixStmt = ps.unwrap(PhoenixStatement.class);
assertEquals(timeoutMs, phoenixStmt.getQueryTimeoutInMillis());
assertEquals(timeoutSecs, phoenixStmt.getQueryTimeout());
return ps;
}

private Set<String> createTableAndInsertRows(int numRows) throws Exception {
String ddl = "CREATE TABLE QUERY_TIMEOUT_TEST (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute(ddl);
String dml = "UPSERT INTO QUERY_TIMEOUT_TEST VALUES (?, ?)";
PreparedStatement stmt = conn.prepareStatement(dml);
final Set<String> expectedKeys = new HashSet<>(numRows);
for (int i = 1; i <= numRows; i++) {
String key = "key" + i;
expectedKeys.add(key);
stmt.setString(1, key);
stmt.setString(2, "value" + i);
stmt.executeUpdate();
}
conn.commit();
return expectedKeys;
}
}
Expand Up @@ -531,8 +531,7 @@ public List<PeekingResultIterator> getIterators() throws SQLException {
final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans); final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
allFutures.add(futures); allFutures.add(futures);
SQLException toThrow = null; SQLException toThrow = null;
// Get query time out from Statement and convert from seconds back to milliseconds int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
int queryTimeOut = context.getStatement().getQueryTimeout() * 1000;
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
final long maxQueryEndTime = startTime + queryTimeOut; final long maxQueryEndTime = startTime + queryTimeOut;
try { try {
Expand Down
Expand Up @@ -160,6 +160,9 @@
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ListMultimap; import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.math.IntMath;

import sun.jvmstat.monitor.IntegerMonitor;
/** /**
* *
* JDBC Statement implementation of Phoenix. * JDBC Statement implementation of Phoenix.
Expand Down Expand Up @@ -214,17 +217,20 @@ public String toString() {
private boolean isClosed = false; private boolean isClosed = false;
private int maxRows; private int maxRows;
private int fetchSize = -1; private int fetchSize = -1;
private int queryTimeout; private int queryTimeoutMillis;


public PhoenixStatement(PhoenixConnection connection) { public PhoenixStatement(PhoenixConnection connection) {
this.connection = connection; this.connection = connection;
this.queryTimeout = getDefaultQueryTimeout(); this.queryTimeoutMillis = getDefaultQueryTimeoutMillis();
} }


private int getDefaultQueryTimeout() { /**
// Convert milliseconds to seconds by taking the CEIL up to the next second * Internally to Phoenix we allow callers to set the query timeout in millis
return (connection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, * via the phoenix.query.timeoutMs. Therefore we store the time in millis.
QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS) + 999) / 1000; */
private int getDefaultQueryTimeoutMillis() {
return connection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
} }


protected List<PhoenixResultSet> getResultSets() { protected List<PhoenixResultSet> getResultSets() {
Expand Down Expand Up @@ -1608,21 +1614,45 @@ public void setPoolable(boolean poolable) throws SQLException {
} }


@Override @Override
/**
* When setting the query timeout via JDBC timeouts must be expressed in seconds. Therefore
* we need to convert the default timeout to milliseconds for internal use.
*/
public void setQueryTimeout(int seconds) throws SQLException { public void setQueryTimeout(int seconds) throws SQLException {
if (seconds < 0) { if (seconds < 0) {
this.queryTimeout = getDefaultQueryTimeout(); this.queryTimeoutMillis = getDefaultQueryTimeoutMillis();
} else if (seconds == 0) { } else if (seconds == 0) {
this.queryTimeout = Integer.MAX_VALUE; this.queryTimeoutMillis = Integer.MAX_VALUE;
} else { } else {
this.queryTimeout = seconds; this.queryTimeoutMillis = seconds * 1000;
} }
} }


@Override @Override
/**
* When getting the query timeout via JDBC timeouts must be expressed in seconds. Therefore
* we need to convert the default millisecond timeout to seconds.
*/
public int getQueryTimeout() throws SQLException { public int getQueryTimeout() throws SQLException {
return queryTimeout; // Convert milliseconds to seconds by taking the CEIL up to the next second
int scaledValue;
try {
scaledValue = IntMath.checkedAdd(queryTimeoutMillis, 999);
} catch (ArithmeticException e) {
scaledValue = Integer.MAX_VALUE;
}
return scaledValue / 1000;
} }


/**
* Returns the configured timeout in milliseconds. This
* internally enables the of use millisecond timeout granularity
* and honors the exact value configured by phoenix.query.timeoutMs.
*/
public int getQueryTimeoutInMillis() {
return queryTimeoutMillis;
}

@Override @Override
public boolean isWrapperFor(Class<?> iface) throws SQLException { public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface.isInstance(this); return iface.isInstance(this);
Expand Down
Expand Up @@ -28,6 +28,8 @@


import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.junit.Test; import org.junit.Test;


public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest { public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
Expand Down Expand Up @@ -85,5 +87,97 @@ public void testQueriesUsingExecuteUpdateShouldFail() throws Exception {
assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode()); assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
} }
} }

@Test
/**
* Validates that if a user sets the query timeout via the
* stmt.setQueryTimeout() JDBC method, we correctly store the timeout
* in both milliseconds and seconds.
*/
public void testSettingQueryTimeoutViaJdbc() throws Exception {
// Arrange
Connection connection = DriverManager.getConnection(getUrl());
PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);

// Act
stmt.setQueryTimeout(3);

// Assert
assertEquals(3, stmt.getQueryTimeout());
assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
}

@Test
/**
* Validates if a user sets the timeout to zero that we store the timeout
* in millis as the Integer.MAX_VALUE.
*/
public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
// Arrange
Connection connection = DriverManager.getConnection(getUrl());
PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);

// Act
stmt.setQueryTimeout(0);

// Assert
assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
}

@Test
/**
* Validates that is negative value is supplied we set the timeout to the default.
*/
public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
// Arrange
Connection connection = DriverManager.getConnection(getUrl());
PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);

// Act
stmt.setQueryTimeout(-1);

// Assert
assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis());
}

@Test
/**
* Validates that setting custom phoenix query timeout using
* the phoenix.query.timeoutMs config property is honored.
*/
public void testCustomQueryTimeout() throws Exception {
// Arrange
Properties connectionProperties = new Properties();
connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);

// Assert
assertEquals(3, stmt.getQueryTimeout());
assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
}

@Test
public void testZeroCustomQueryTimeout() throws Exception {
// Arrange
Properties connectionProperties = new Properties();
connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);

// Assert
assertEquals(0, stmt.getQueryTimeout());
assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
}


} }

0 comments on commit ff35d95

Please sign in to comment.