Skip to content

Commit

Permalink
PHOENIX-1267 Set scan.setSmall(true) when appropriate (Abhishek Singh…
Browse files Browse the repository at this point in the history
… Chouhan)
  • Loading branch information
jtaylor-sfdc committed Mar 9, 2018
1 parent 494b17e commit c745088
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
Expand Up @@ -237,12 +237,14 @@ public final ResultIterator iterator(final Map<ImmutableBytesPtr,ServerCache> ca
scan = context.getScan();
}

ScanRanges scanRanges = context.getScanRanges();

/*
* For aggregate queries, we still need to let the AggregationPlan to
* proceed so that we can give proper aggregates even if there are no
* row to be scanned.
*/
if (context.getScanRanges() == ScanRanges.NOTHING && !getStatement().isAggregate()) {
if (scanRanges == ScanRanges.NOTHING && !getStatement().isAggregate()) {
return getWrappedIterator(caches, ResultIterator.EMPTY_ITERATOR);
}

Expand All @@ -269,11 +271,15 @@ public final ResultIterator iterator(final Map<ImmutableBytesPtr,ServerCache> ca
}
}

if (statement.getHint().hasHint(Hint.SMALL)) {

PhoenixConnection connection = context.getConnection();
final int smallScanThreshold = connection.getQueryServices().getProps().getInt(QueryServices.SMALL_SCAN_THRESHOLD_ATTRIB,
QueryServicesOptions.DEFAULT_SMALL_SCAN_THRESHOLD);

if (statement.getHint().hasHint(Hint.SMALL) || (scanRanges.isPointLookup() && scanRanges.getPointLookupCount() < smallScanThreshold)) {
scan.setSmall(true);
}

PhoenixConnection connection = context.getConnection();

// set read consistency
if (table.getType() != PTableType.SYSTEM) {
Expand All @@ -282,7 +288,7 @@ public final ResultIterator iterator(final Map<ImmutableBytesPtr,ServerCache> ca
// TODO fix this in PHOENIX-2415 Support ROW_TIMESTAMP with transactional tables
if (!table.isTransactional()) {
// Get the time range of row_timestamp column
TimeRange rowTimestampRange = context.getScanRanges().getRowTimestampRange();
TimeRange rowTimestampRange = scanRanges.getRowTimestampRange();
// Get the already existing time range on the scan.
TimeRange scanTimeRange = scan.getTimeRange();
Long scn = connection.getSCN();
Expand Down
Expand Up @@ -300,6 +300,7 @@ public interface QueryServices extends SQLCloseable {

// Whether to enable cost-based-decision in the query optimizer
public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled";
public static final String SMALL_SCAN_THRESHOLD_ATTRIB = "phoenix.query.smallScanThreshold";

/**
* Get executor service used for parallel scans
Expand Down
Expand Up @@ -330,6 +330,7 @@ public class QueryServicesOptions {

//default update cache frequency
public static final int DEFAULT_UPDATE_CACHE_FREQUENCY = 0;
public static final int DEFAULT_SMALL_SCAN_THRESHOLD = 100;

@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
Expand Down
Expand Up @@ -4482,6 +4482,32 @@ public void testNoLocalIndexPruning() throws SQLException {
}
}

@Test
public void testSmallScanForPointLookups() throws SQLException {
Properties props = PropertiesUtil.deepCopy(new Properties());
createTestTable(getUrl(), "CREATE TABLE FOO(\n" +
" a VARCHAR NOT NULL,\n" +
" b VARCHAR NOT NULL,\n" +
" c VARCHAR,\n" +
" CONSTRAINT pk PRIMARY KEY (a, b DESC, c)\n" +
" )");

props.put(QueryServices.SMALL_SCAN_THRESHOLD_ATTRIB, "2");
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String query = "select * from foo where a = 'a' and b = 'b' and c in ('x','y','z')";
PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
QueryPlan plan = stmt.optimizeQuery(query);
plan.iterator();
//Fail since we have 3 rows in pointLookup
assertFalse(plan.getContext().getScan().isSmall());
query = "select * from foo where a = 'a' and b = 'b' and c = 'c'";
plan = stmt.compileQuery(query);
plan.iterator();
//Should be small scan, query is for single row pointLookup
assertTrue(plan.getContext().getScan().isSmall());
}
}

@Test
public void testLocalIndexPruningInSortMergeJoin() throws SQLException {
verifyLocalIndexPruningWithMultipleTables("SELECT /*+ USE_SORT_MERGE_JOIN*/ *\n" +
Expand Down

0 comments on commit c745088

Please sign in to comment.