diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 982714090a8..0e2cc3358ae 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -82,7 +82,6 @@ @RunWith(Parameterized.class) public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { - private static final Logger LOGGER = LoggerFactory.getLogger(PartialIndexRebuilderIT.class); private final boolean localIndex; private final boolean mutable; private final boolean transactional; @@ -120,6 +119,7 @@ public static void setup() throws Exception { serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5)); serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8)); Map clientProps = Maps.newHashMapWithExpectedSize(2); clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index cfeaad475a1..4a13c748796 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -67,6 +67,8 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { public static final String GROUP_BY_LIMIT = "_GroupByLimit"; public static final String LOCAL_INDEX = "_LocalIndex"; public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild"; + // The number of index rows to be rebuild in one RPC call + public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging"; /* * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation. * Needed for backward compatibility purposes. TODO: get rid of this in next major release. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index d45f047aec0..12cd3f94cc1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; +import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; @@ -130,7 +131,6 @@ import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.stats.StatisticsCollectorFactory; -import org.apache.phoenix.schema.stats.StatisticsScanner; import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException; import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; @@ -1087,116 +1087,136 @@ private static PTable deserializeTable(byte[] b) { throw new RuntimeException(e); } } - - private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, - Configuration config) throws IOException { - byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); - boolean useProto = true; - // for backward compatibility fall back to look up by the old attribute - if (indexMetaData == null) { - useProto = false; - indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + private class IndexRebuildRegionScanner extends BaseRegionScanner { + private long pageSizeInRows = Long.MAX_VALUE; + private boolean hasMore; + private final int maxBatchSize; + private MutationList mutations; + private final long maxBatchSizeBytes; + private final long blockingMemstoreSize; + private final byte[] clientVersionBytes; + private List results = new ArrayList(); + private byte[] indexMetaData; + private boolean useProto = true; + private Scan scan; + private RegionScanner innerScanner; + final Region region; + + IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan, + final Configuration config) { + super(innerScanner); + if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) { + pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS); + } + + maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + mutations = new MutationList(maxBatchSize); + maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); + blockingMemstoreSize = getBlockingMemstoreSize(region, config); + clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); + if (indexMetaData == null) { + useProto = false; + indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + } + this.scan = scan; + this.innerScanner = innerScanner; + this.region = region; } - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); - boolean hasMore; - int rowCount = 0; - try { - int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); - long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); - final long blockingMemstoreSize = getBlockingMemstoreSize(region, config); - MutationList mutations = new MutationList(maxBatchSize); - region.startRegionOperation(); - byte[] uuidValue = ServerCacheClient.generateId(); - synchronized (innerScanner) { - do { - List results = new ArrayList(); - hasMore = innerScanner.nextRaw(results); - if (!results.isEmpty()) { - Put put = null; - Delete del = null; - for (Cell cell : results) { - - if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { - if (put == null) { - put = new Put(CellUtil.cloneRow(cell)); - put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); - put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); - put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); - mutations.add(put); - // Since we're replaying existing mutations, it makes no sense to write them to the wal - put.setDurability(Durability.SKIP_WAL); - } - put.add(cell); - } else { - if (del == null) { - del = new Delete(CellUtil.cloneRow(cell)); - del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); - del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); - del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); - mutations.add(del); - // Since we're replaying existing mutations, it makes no sense to write them to the wal - del.setDurability(Durability.SKIP_WAL); + @Override + public RegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { return hasMore; } + + @Override + public void close() throws IOException { innerScanner.close(); } + + private void setMutationAttributes(Mutation m, byte[] uuidValue) { + m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); + m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); + m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); + // Since we're replaying existing mutations, it makes no sense to write them to the wal + m.setDurability(Durability.SKIP_WAL); + } + + @Override + public boolean next(List results) throws IOException { + int rowCount = 0; + try { + byte[] uuidValue = ServerCacheClient.generateId(); + synchronized (innerScanner) { + do { + List row = new ArrayList(); + hasMore = innerScanner.nextRaw(row); + if (!row.isEmpty()) { + Put put = null; + Delete del = null; + for (Cell cell : row) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + setMutationAttributes(put, uuidValue); + mutations.add(put); + } + put.add(cell); + } else { + if (del == null) { + del = new Delete(CellUtil.cloneRow(cell)); + setMutationAttributes(del, uuidValue); + mutations.add(del); + } + del.addDeleteMarker(cell); } - del.addDeleteMarker(cell); } + if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { + checkForRegionClosing(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); + uuidValue = ServerCacheClient.generateId(); + mutations.clear(); + } + rowCount++; } - if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - checkForRegionClosing(); - commitBatchWithRetries(region, mutations, blockingMemstoreSize); - uuidValue = ServerCacheClient.generateId(); - mutations.clear(); - } - rowCount++; + + } while (hasMore && rowCount < pageSizeInRows); + if (!mutations.isEmpty()) { + checkForRegionClosing(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); } - - } while (hasMore); - if (!mutations.isEmpty()) { - checkForRegionClosing(); - commitBatchWithRetries(region, mutations, blockingMemstoreSize); + } + } catch (IOException e) { + hasMore = false; + LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e)); + throw e; + } finally { + if (!hasMore) { + region.closeRegionOperation(); } } - } catch (IOException e) { - LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e)); - throw e; - } finally { - region.closeRegionOperation(); + byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); + final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + results.add(aggKeyValue); + return hasMore; } - byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); - final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, - SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); - - RegionScanner scanner = new BaseRegionScanner(innerScanner) { - @Override - public RegionInfo getRegionInfo() { - return region.getRegionInfo(); - } - - @Override - public boolean isFilterDone() { - return true; - } - @Override - public void close() throws IOException { - innerScanner.close(); - } - - @Override - public boolean next(List results) throws IOException { - results.add(aggKeyValue); - return false; - } + @Override + public long getMaxResultSize() { + return scan.getMaxResultSize(); + } + } + + private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, + final Configuration config) throws IOException { - @Override - public long getMaxResultSize() { - return scan.getMaxResultSize(); - } - }; + region.startRegionOperation(); + RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config); return scanner; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index f5aaf4200bf..fc30cb2a571 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -252,10 +252,6 @@ public byte[] getRowKey() { return result.getRow(); } }; - for (Cell cell : result.rawCells()) { - String cellString = cell.toString(); - LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell))); - } byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp); if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length, indexRowKey, 0, indexRowKey.length) != 0) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java index 5128c26a4f0..30c1cec97a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java @@ -42,6 +42,7 @@ import com.google.common.base.Preconditions; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName; +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; /** * {@link InputFormat} implementation from Phoenix for building index @@ -89,6 +90,7 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration Scan scan = queryPlan.getContext().getScan(); try { scan.setTimeRange(0, scn); + scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES); } catch (IOException e) { throw new SQLException(e); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index e01ed16e5f1..6f5913aae02 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -501,17 +501,6 @@ private Job configureJobForAysncIndex() private Job configureJobForServerBuildIndex() throws Exception { - long indexRebuildQueryTimeoutMs = - configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB, - QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT); - // Set various phoenix and hbase level timeouts and rpc retries - configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, - Long.toString(indexRebuildQueryTimeoutMs)); - configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - Long.toString(indexRebuildQueryTimeoutMs)); - configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, - Long.toString(indexRebuildQueryTimeoutMs)); - PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable); PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable); @@ -527,7 +516,6 @@ private Job configureJobForServerBuildIndex() fs = outputPath.getFileSystem(configuration); fs.delete(outputPath, true); } - configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs)); final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index ffeec517205..93e218ea67d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -352,6 +352,8 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled"; // Enable support for long view index(default is false) public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled"; + // The number of index rows to be rebuild in one RPC call + public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows"; // Before 4.15 when we created a view we included the parent table column metadata in the view diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 84ff3f710ee..981afe01f73 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -208,7 +208,7 @@ public class QueryServicesOptions { public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 mins // 30 min rpc timeout * 5 tries, with 2100ms total pause time between retries - public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = 60000 * 60 * 24; // 24 hrs + public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100; public static final long DEFAULT_INDEX_REBUILD_RPC_TIMEOUT = 30000 * 60; // 30 mins public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level @@ -357,6 +357,7 @@ public class QueryServicesOptions { public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */ public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true; + public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 1024*1024; public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;