diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java index aa929e78157..d7ceeeba318 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java @@ -823,12 +823,189 @@ private void testSelectQueriesWithFilters(boolean useStatsForParallelization) th assertEquals(100 + i, rs.getInt(1)); i++; } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 3L, info.getEstimatedRows()); + assertEquals((Long) 160L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + // Query whose start key is between and end key is after data. + sql = "SELECT a FROM " + tableName + " WHERE K <= 120 AND K >= 100"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 10L, info.getEstimatedRows()); + assertEquals((Long) 720L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + // Query whose start key and end key are both between data. + sql = "SELECT a FROM " + tableName + " WHERE K <= 109 AND K >= 100"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 10L, info.getEstimatedRows()); + assertEquals((Long) 720L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + } + } + + @Test + public void testSelectQueriesWithGuidePostMovingWindows() throws Exception { + testSelectQueriesWithGuidePostMovingWindows(0); + testSelectQueriesWithGuidePostMovingWindows(1); + testSelectQueriesWithGuidePostMovingWindows(2); + testSelectQueriesWithGuidePostMovingWindows(10); + testSelectQueriesWithGuidePostMovingWindows(256); + } + + private void testSelectQueriesWithGuidePostMovingWindows(int movingWindowSize) throws Exception { + String tableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + int guidePostWidth = 20; + String ddl = + "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, a bigint, b bigint)" + + " GUIDE_POSTS_WIDTH=" + guidePostWidth + + ", USE_STATS_FOR_PARALLELIZATION=true" + " SPLIT ON (102, 105, 108)"; + conn.createStatement().execute(ddl); + conn.createStatement().execute("upsert into " + tableName + " values (100,100,3)"); + conn.createStatement().execute("upsert into " + tableName + " values (101,101,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (102,102,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (103,103,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (104,104,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (105,105,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (106,106,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (107,107,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (108,108,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (109,109,4)"); + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + tableName + ""); + } + List binds = Lists.newArrayList(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + int defaultGuidePostsMovingWindowSize = + PhoenixConfigurationUtil.getGuidePostsMovingWindowSize(conn.unwrap(PhoenixConnection.class)); + PhoenixConfigurationUtil.setGuidePostsMovingWindowSize( + conn.unwrap(PhoenixConnection.class), movingWindowSize); + + // Query whose start key is before any data + String sql = "SELECT a FROM " + tableName + " WHERE K >= 99"; + ResultSet rs = conn.createStatement().executeQuery(sql); + int i = 0; + int numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); + Estimate info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 10L, info.getEstimatedRows()); + assertEquals((Long) 720L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose start key is after any data + sql = "SELECT a FROM " + tableName + " WHERE K >= 110"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0L, info.getEstimatedRows()); + assertEquals((Long) 0L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Point lookup query. + sql = "SELECT a FROM " + tableName + " WHERE K = 101"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 1; + while (rs.next()) { + assertEquals(101, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 1L, info.getEstimatedRows()); + assertEquals((Long) 97L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() == 0); + + // Query whose end key is before any data + sql = "SELECT a FROM " + tableName + " WHERE K <= 98"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0L, info.getEstimatedRows()); + assertEquals((Long) 0L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose end key is after any data. + // In this case, we return the estimate as scanning all the guide posts. + sql = "SELECT a FROM " + tableName + " WHERE K <= 110"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 10L, info.getEstimatedRows()); + assertEquals((Long) 720L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose start key and end key is before any data. + // In this case, we return the estimate as scanning the first guide post + sql = "SELECT a FROM " + tableName + " WHERE K <= 90 AND K >= 80"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0L, info.getEstimatedRows()); + assertEquals((Long) 0L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose start key and end key is after any data. + // In this case, we return the estimate as scanning no guide post + sql = "SELECT a FROM " + tableName + " WHERE K <= 130 AND K >= 120"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0L, info.getEstimatedRows()); + assertEquals((Long) 0L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose start key is before and end key is between data. + // In this case, we return the estimate as scanning the guide posts + // between the first key and end ky + sql = "SELECT a FROM " + tableName + " WHERE K <= 102 AND K >= 90"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 3; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); info = getByteRowEstimates(conn, sql, binds); // Depending on the guidepost boundary, this estimate // can be slightly off. It's called estimate for a reason. assertEquals((Long) 3L, info.getEstimatedRows()); assertEquals((Long) 160L, info.getEstimatedBytes()); assertTrue(info.getEstimateInfoTs() > 0); + // Query whose start key is between and end key is after data. sql = "SELECT a FROM " + tableName + " WHERE K <= 120 AND K >= 100"; rs = conn.createStatement().executeQuery(sql); @@ -838,12 +1015,14 @@ private void testSelectQueriesWithFilters(boolean useStatsForParallelization) th assertEquals(100 + i, rs.getInt(1)); i++; } + assertEquals(numRows, i); info = getByteRowEstimates(conn, sql, binds); // Depending on the guidepost boundary, this estimate // can be slightly off. It's called estimate for a reason. assertEquals((Long) 10L, info.getEstimatedRows()); assertEquals((Long) 720L, info.getEstimatedBytes()); assertTrue(info.getEstimateInfoTs() > 0); + // Query whose start key and end key are both between data. sql = "SELECT a FROM " + tableName + " WHERE K <= 109 AND K >= 100"; rs = conn.createStatement().executeQuery(sql); @@ -853,12 +1032,52 @@ private void testSelectQueriesWithFilters(boolean useStatsForParallelization) th assertEquals(100 + i, rs.getInt(1)); i++; } + assertEquals(numRows, i); info = getByteRowEstimates(conn, sql, binds); // Depending on the guidepost boundary, this estimate // can be slightly off. It's called estimate for a reason. assertEquals((Long) 10L, info.getEstimatedRows()); assertEquals((Long) 720L, info.getEstimatedBytes()); assertTrue(info.getEstimateInfoTs() > 0); + + // Query with multiple scan ranges, and each range's start key and end key are both between data + sql = "SELECT a FROM " + tableName + " WHERE K <= 103 AND K >= 101 OR K <= 108 AND K >= 106"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 6; + int[] result = new int[] { 101, 102, 103, 106, 107, 108 }; + while (rs.next()) { + assertEquals(result[i++], rs.getInt(1)); + } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 6L, info.getEstimatedRows()); + assertEquals((Long) 460L, info.getEstimatedBytes()); + // TODO: the original code before this change will hit the following assertion. Need to investigate it. + // assertTrue(info.getEstimateInfoTs() > 0); + + // Query with multiple scan ranges which start with the min key and end with the max key + sql = "SELECT a FROM " + tableName + " WHERE K <= 102 AND K >= 100 OR K <= 109 AND K >= 107"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 6; + result = new int[] { 100, 101, 102, 107, 108, 109 }; + while (rs.next()) { + assertEquals(result[i++], rs.getInt(1)); + } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 6L, info.getEstimatedRows()); + assertEquals((Long) 390L, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Restore the default moving window size + PhoenixConfigurationUtil.setGuidePostsMovingWindowSize( + conn.unwrap(PhoenixConnection.class), defaultGuidePostsMovingWindowSize); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 4a51c87bcef..e141642c45d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -158,6 +158,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private boolean hasGuidePosts; private Scan scan; private final boolean useStatsForParallelization; + private final int gpsMovingWindowSize; protected Map caches; private final QueryPlan dataPlan; @@ -502,6 +503,7 @@ public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, initializeScan(plan, perScanLimit, offset, scan); this.useStatsForParallelization = PhoenixConfigurationUtil.getStatsForParallelizationProp(context.getConnection(), table); + this.gpsMovingWindowSize = PhoenixConfigurationUtil.getGuidePostsMovingWindowSize(context.getConnection()); this.scans = getParallelScans(); List splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); for (List scanList : scans) { @@ -885,20 +887,20 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw List regionLocations = getRegionBoundaries(scanGrouper); List regionBoundaries = toBoundaries(regionLocations); ScanRanges scanRanges = context.getScanRanges(); - PTable table = getTable(); - boolean isSalted = table.getBucketNum() != null; - boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL; GuidePostsInfo gps = getGuidePosts(); - // case when stats wasn't collected + // Case when stats wasn't collected hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST; // Case when stats collection did run but there possibly wasn't enough data. In such a - // case we generate an empty guide post with the byte estimate being set as guide post - // width. + // case we generate an empty guide post with the byte estimate being set as guide post width. boolean emptyGuidePost = gps.isEmptyGuidePost(); byte[] startRegionBoundaryKey = startKey; byte[] stopRegionBoundaryKey = stopKey; int columnsInCommon = 0; ScanRanges prefixScanRanges = ScanRanges.EVERYTHING; + + PTable table = getTable(); + boolean isSalted = table.getBucketNum() != null; + boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL; boolean traverseAllRegions = isSalted || isLocalIndex; if (isLocalIndex) { // TODO: when implementing PHOENIX-4585, we should change this to an assert @@ -951,6 +953,9 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw DataInput input = null; PrefixByteDecoder decoder = null; int guideIndex = 0; + int guideIndexInMovingWindow = 0; + int gpsMovingWindowSize = 0; + List gpsMovingWindow = null; GuidePostEstimate estimates = new GuidePostEstimate(); boolean gpsForFirstRegion = false; boolean intersectWithGuidePosts = true; @@ -958,7 +963,7 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw // gps that are in the scan range. We'll use this if we find // no gps in range. long fallbackTs = Long.MAX_VALUE; - // Determination of whether of not we found a guidepost in + // Determination of whether or not we found a guidepost in // every region between the start and stop key. If not, then // we cannot definitively say at what time the guideposts // were collected. @@ -972,32 +977,84 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw decoder = new PrefixByteDecoder(gps.getMaxLength()); firstRegionStartKey = new ImmutableBytesWritable(regionLocations.get(regionIndex).getRegionInfo().getStartKey()); try { - int c; - // Continue walking guideposts until we get past the currentKey - while ((c=currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input))) >= 0) { - // Detect if we found a guidepost that might be in the first region. This - // is for the case where the start key may be past the only guidepost in - // the first region. - if (!gpsForFirstRegion && firstRegionStartKey.compareTo(currentGuidePost) <= 0) { - gpsForFirstRegion = true; + if (firstRegionStartKey.getLength() > 0 && this.gpsMovingWindowSize > 0) { + // Continuously decode and load guide posts in batches (moving window). For each moving window, + // firstly compare the searching key with the last element to see whether the searching key is + // in the current window. If it isn't, perform binary search in the window; otherwise, move to + // the next window and repeat the above steps until it finds the start key of the first region + // in the scan ranges or its insertion position. + int gpsScanned = 0; + while (gpsScanned < gpsSize) { + gpsMovingWindowSize = Math.min(gpsSize - gpsScanned, this.gpsMovingWindowSize); + gpsMovingWindow = Lists.newArrayListWithExpectedSize(gpsMovingWindowSize); + for (int i = 0; i < gpsMovingWindowSize; i++) { + gpsMovingWindow.add(new ImmutableBytesWritable( + PrefixByteCodec.decode(decoder, input).copyBytes())); + } + + int ret = firstRegionStartKey.compareTo(gpsMovingWindow.get(gpsMovingWindowSize - 1)); + if (ret <= 0) { + // The start key of the first region must be in the current moving window + if (ret < 0) { + // Found the start key of the first region in the scan ranges or its insertion position + guideIndexInMovingWindow = + Collections.binarySearch(gpsMovingWindow, firstRegionStartKey); + if (guideIndexInMovingWindow < 0) { + // The guide post at the insertion position is the first guide post of the region + guideIndexInMovingWindow = -1 - guideIndexInMovingWindow; + } + } + else { + guideIndexInMovingWindow += (gpsMovingWindowSize - 1); + } + + guideIndex += guideIndexInMovingWindow; + break; + } + + // The start key exceeds the current moving window, continue the searching. + guideIndex += guideIndexInMovingWindow; + gpsScanned += gpsMovingWindowSize; } - // While we have gps in the region (but outside of start/stop key), track - // the min ts as a fallback for the time at which stas were calculated. - if (gpsForFirstRegion) { - fallbackTs = - Math.min(fallbackTs, - gps.getGuidePostTimestamps()[guideIndex]); + } + + if (guideIndex >= gpsSize) { + intersectWithGuidePosts = false; + } + else { + int c = 0; + currentGuidePost = guideIndexInMovingWindow < gpsMovingWindowSize ? + gpsMovingWindow.get(guideIndexInMovingWindow++) : PrefixByteCodec.decode(decoder, input); + // Continue walking guideposts until we get past the currentKey + while ((c = currentKey.compareTo(currentGuidePost)) >= 0) { + // Detect if we found a guidepost that might be in the first region. This + // is for the case where the start key may be past the only guidepost in + // the first region. + if (!gpsForFirstRegion + && firstRegionStartKey.compareTo(currentGuidePost) <= 0) { + gpsForFirstRegion = true; + } + // While we have gps in the region (but outside of start/stop key), track + // the min ts as a fallback for the time at which stas were calculated. + if (gpsForFirstRegion) { + fallbackTs = + Math.min(fallbackTs, gps.getGuidePostTimestamps()[guideIndex]); + } + // Special case for gp == startKey in which case we want to + // count this gp (if it's in range) though we go past it. + delayAddingEst = (c == 0); + + guideIndex++; + currentGuidePost = guideIndexInMovingWindow < gpsMovingWindowSize ? + gpsMovingWindow.get(guideIndexInMovingWindow++) : PrefixByteCodec.decode(decoder, input); } - // Special case for gp == startKey in which case we want to - // count this gp (if it's in range) though we go past it. - delayAddingEst = (c == 0); - guideIndex++; } } catch (EOFException e) { // expected. Thrown when we have decoded all guide posts. intersectWithGuidePosts = false; } } + byte[] endRegionKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey(); byte[] currentKeyBytes = currentKey.copyBytes(); intersectWithGuidePosts &= guideIndex < gpsSize; @@ -1012,6 +1069,7 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw } else { endKey = regionBoundaries.get(regionIndex); } + if (isLocalIndex) { // Only attempt further pruning if the prefix range is using // a skip scan since we've already pruned the range of regions @@ -1030,12 +1088,12 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw } keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey()); } + byte[] initialKeyBytes = currentKeyBytes; int gpsComparedToEndKey = -1; boolean everNotDelayed = false; while (intersectWithGuidePosts && (endKey.length == 0 || (gpsComparedToEndKey=currentGuidePost.compareTo(endKey)) <= 0)) { - Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, - false); + Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, false); if (newScan != null) { ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(), regionInfo.getEndKey(), @@ -1057,7 +1115,8 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation); currentKeyBytes = currentGuidePostBytes; try { - currentGuidePost = PrefixByteCodec.decode(decoder, input); + currentGuidePost = guideIndexInMovingWindow < gpsMovingWindowSize ? + gpsMovingWindow.get(guideIndexInMovingWindow++) : PrefixByteCodec.decode(decoder, input); currentGuidePostBytes = currentGuidePost.copyBytes(); guideIndex++; } catch (EOFException e) { @@ -1065,7 +1124,9 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw intersectWithGuidePosts = false; } } + boolean gpsInThisRegion = initialKeyBytes != currentKeyBytes; + if (!useStatsForParallelization) { /* * If we are not using stats for generating parallel scans, we need to reset the @@ -1073,8 +1134,9 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw */ currentKeyBytes = initialKeyBytes; } + Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true); - if(newScan != null) { + if (newScan != null) { ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(), regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow()); // Boundary case of no GP in region after delaying adding of estimates @@ -1087,29 +1149,33 @@ private List> getParallelScans(byte[] startKey, byte[] stopKey) throw delayAddingEst = false; } scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation); + currentKeyBytes = endKey; + // We have a guide post in the region if the above loop was entered // or if the current key is less than the region end key (since the loop // may not have been entered if our scan end key is smaller than the // first guide post in that region). boolean gpsAfterStopKey = false; gpsAvailableForAllRegions &= - ( gpsInThisRegion && everNotDelayed) || // GP in this region + ( gpsInThisRegion && everNotDelayed ) || // GP in this region ( regionIndex == startRegionIndex && gpsForFirstRegion ) || // GP in first region (before start key) ( gpsAfterStopKey = ( regionIndex == stopIndex && intersectWithGuidePosts && // GP in last region (after stop key) - ( endRegionKey.length == 0 || // then check if gp is in the region - currentGuidePost.compareTo(endRegionKey) < 0) ) ); + ( endRegionKey.length == 0 || currentGuidePost.compareTo(endRegionKey) < 0 ) ) ); if (gpsAfterStopKey) { // If gp after stop key, but still in last region, track min ts as fallback fallbackTs = Math.min(fallbackTs, gps.getGuidePostTimestamps()[guideIndex]); } + regionIndex++; } + if (!scans.isEmpty()) { // Add any remaining scans parallelScans.add(scans); } + Long pageLimit = getUnfilteredPageLimit(scan); if (scanRanges.isPointLookup() || pageLimit != null) { // If run in parallel, the limit is pushed to each parallel scan so must be accounted for in all of them diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 3b63f668c0a..a35646cd586 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -20,6 +20,8 @@ import static org.apache.commons.lang.StringUtils.isNotEmpty; import static org.apache.phoenix.query.QueryServices.USE_STATS_FOR_PARALLELIZATION; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION; +import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_MOVING_WINDOW_SIZE; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_MOVING_WINDOW_SIZE; import java.io.IOException; import java.sql.Connection; @@ -668,6 +670,16 @@ public static boolean getStatsForParallelizationProp(PhoenixConnection conn, PTa .getBoolean(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION); } + public static int getGuidePostsMovingWindowSize(PhoenixConnection conn) { + return conn.getQueryServices().getConfiguration() + .getInt(STATS_GUIDEPOST_MOVING_WINDOW_SIZE, DEFAULT_STATS_GUIDEPOST_MOVING_WINDOW_SIZE); + } + + public static void setGuidePostsMovingWindowSize(PhoenixConnection conn, int movingWindowSize) { + conn.getQueryServices().getConfiguration() + .setInt(STATS_GUIDEPOST_MOVING_WINDOW_SIZE, movingWindowSize); + } + public static void setTenantId(Configuration configuration, String tenantId){ Preconditions.checkNotNull(configuration); configuration.set(MAPREDUCE_TENANT_ID, tenantId); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 73548532198..52666e94a62 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -189,7 +189,9 @@ public interface QueryServices extends SQLCloseable { public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width"; public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region"; public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime"; - + public static final String STATS_COLLECTION_ENABLED = "phoenix.stats.collection.enabled"; + public static final String USE_STATS_FOR_PARALLELIZATION = "phoenix.use.stats.parallelization"; + @Deprecated // use STATS_COLLECTION_ENABLED config instead public static final String STATS_ENABLED_ATTRIB = "phoenix.stats.enabled"; @@ -198,8 +200,9 @@ public interface QueryServices extends SQLCloseable { public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async"; // Maximum size in bytes taken up by cached table stats in the client public static final String STATS_MAX_CACHE_SIZE = "phoenix.stats.cache.maxSize"; - public static final String LOG_SALT_BUCKETS_ATTRIB = "phoenix.log.saltBuckets"; + public static final String STATS_GUIDEPOST_MOVING_WINDOW_SIZE = "phoenix.stats.guidepost.movingWindowSize"; + public static final String LOG_SALT_BUCKETS_ATTRIB = "phoenix.log.saltBuckets"; public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority"; public static final String EXPLAIN_CHUNK_COUNT_ATTRIB = "phoenix.explain.displayChunkCount"; @@ -302,8 +305,6 @@ public interface QueryServices extends SQLCloseable { public static final String PHOENIX_QUERY_SERVER_SERVICE_NAME = "phoenix.queryserver.service.name"; public static final String PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME = "phoenix.queryserver.zookeeper.acl.username"; public static final String PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD = "phoenix.queryserver.zookeeper.acl.password"; - public static final String STATS_COLLECTION_ENABLED = "phoenix.stats.collection.enabled"; - public static final String USE_STATS_FOR_PARALLELIZATION = "phoenix.use.stats.parallelization"; // whether to enable server side RS -> RS calls for upsert select statements public static final String ENABLE_SERVER_UPSERT_SELECT ="phoenix.client.enable.server.upsert.select"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 92e68275495..6913a1ee0f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -99,6 +99,7 @@ import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_STATS_FOR_PARALLELIZATION; +import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_MOVING_WINDOW_SIZE; import java.util.HashSet; import java.util.Map.Entry; @@ -246,6 +247,7 @@ public class QueryServicesOptions { public static final long DEFAULT_STATS_MAX_CACHE_SIZE = 256 * 1024 * 1024; // Allow stats collection to be initiated by client multiple times immediately public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0; + public static final int DEFAULT_STATS_GUIDEPOST_MOVING_WINDOW_SIZE = 256; public static final boolean DEFAULT_USE_REVERSE_SCAN = true; @@ -333,7 +335,7 @@ public class QueryServicesOptions { public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true; public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true; - + //Security defaults public static final boolean DEFAULT_PHOENIX_ACLS_ENABLED = false; @@ -443,7 +445,7 @@ public static QueryServicesOptions withDefaults() { .setIfUnset(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE) .setIfUnset(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED) .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION) - .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION) + .setIfUnset(STATS_GUIDEPOST_MOVING_WINDOW_SIZE, DEFAULT_STATS_GUIDEPOST_MOVING_WINDOW_SIZE) .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING) .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED) .setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED) @@ -647,6 +649,10 @@ public boolean isTracingEnabled() { return config.getBoolean(TRACING_ENABLED, DEFAULT_TRACING_ENABLED); } + public int getStatsGuidePostMovingWindowSize() { + return config.getInt(STATS_GUIDEPOST_MOVING_WINDOW_SIZE, DEFAULT_STATS_GUIDEPOST_MOVING_WINDOW_SIZE); + } + public QueryServicesOptions setTracingEnabled(boolean enable) { config.setBoolean(TRACING_ENABLED, enable); return this; @@ -721,6 +727,10 @@ public QueryServicesOptions setStatsUpdateFrequencyMs(int frequencyMs) { return set(STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs); } + public QueryServicesOptions setStatsGuidePostMovingWindowSize(int movingWindowSize) { + return set(STATS_GUIDEPOST_MOVING_WINDOW_SIZE, movingWindowSize); + } + public QueryServicesOptions setMinStatsUpdateFrequencyMs(int frequencyMs) { return set(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs); } @@ -758,7 +768,6 @@ public QueryServicesOptions setNumRetriesForSchemaChangeCheck(int numRetries) { public QueryServicesOptions setDelayInMillisForSchemaChangeCheck(long delayInMillis) { config.setLong(DELAY_FOR_SCHEMA_UPDATE_CHECK, delayInMillis); return this; - } public QueryServicesOptions setUseByteBasedRegex(boolean flag) { @@ -831,7 +840,6 @@ public QueryServicesOptions setIndexRebuildTaskInitialDelay(long waitTime) { return this; } - public QueryServicesOptions setSequenceCacheSize(long sequenceCacheSize) { config.setLong(SEQUENCE_CACHE_SIZE_ATTRIB, sequenceCacheSize); return this;