From 0d33ce934d4405a1e534264875abaec7e5ee82f1 Mon Sep 17 00:00:00 2001 From: tkhurana Date: Mon, 9 Nov 2020 09:54:18 -0800 Subject: [PATCH] PHOENIX-6198 Add option to IndexTool to specify the source table for scan (#937) * PHOENIX-6198 Add option to IndexTool to specify the source table for scan * Addressed feedback for PHOENIX-6198 Extended the `-from-index` option to support -vBOTH, -vAFTER and -vNONE. Added the disclaimer for -vAFTER. Also, using the source table enum from IndexScrutinyTool. --- .../apache/phoenix/end2end/IndexToolIT.java | 25 ++++++-- .../PhoenixServerBuildIndexInputFormat.java | 61 +++++++++++++++++-- .../phoenix/mapreduce/index/IndexTool.java | 20 +++++- .../util/PhoenixConfigurationUtil.java | 16 +++++ .../apache/phoenix/index/IndexToolTest.java | 36 ++++++++++- .../util/PhoenixConfigurationUtilTest.java | 18 ++++++ 6 files changed, 162 insertions(+), 14 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index c56e01fdb51..41ae086e4ce 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -641,14 +641,14 @@ private static List getArgList (boolean directApi, boolean useSnapshot, IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime, Long incrementalVerify) { return getArgList(directApi, useSnapshot, schemaName, dataTable, indxTable, tenantId, - verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify); + verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify, false); } private static List getArgList (boolean directApi, boolean useSnapshot, String schemaName, String dataTable, String indxTable, String tenantId, IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime, IndexTool.IndexDisableLoggingType disableLoggingType, - Long incrementalVerify) { + Long incrementalVerify, boolean useIndexTableAsSource) { List args = Lists.newArrayList(); if (schemaName != null) { args.add("-s"); @@ -692,6 +692,11 @@ private static List getArgList (boolean directApi, boolean useSnapshot, args.add("-rv"); args.add(String.valueOf(incrementalVerify)); } + + if (useIndexTableAsSource) { + args.add("-fi"); + } + args.add("-op"); args.add("/tmp/" + UUID.randomUUID().toString()); return args; @@ -708,7 +713,7 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType, IndexTool.IndexDisableLoggingType disableLoggingType) { List args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, - tenantId, verifyType, null, null, disableLoggingType, null); + tenantId, verifyType, null, null, disableLoggingType, null, false); return args.toArray(new String[0]); } @@ -727,7 +732,19 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri IndexTool.IndexDisableLoggingType disableLoggingType, Long incrementalVerify ) { List args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, - tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify); + tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, false); + return args.toArray(new String[0]); + } + + public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName, + String dataTable, String indexTable, String tenantId, + IndexTool.IndexVerifyType verifyType, Long startTime, + Long endTime, + IndexTool.IndexDisableLoggingType disableLoggingType, + Long incrementalVerify, + boolean useIndexTableAsSource) { + List args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, + tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, useIndexTableAsSource); return args.toArray(new String[0]); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java index 056daf6b179..6f9c6399091 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; +import java.util.Collections; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.JobContext; @@ -31,7 +33,12 @@ import org.apache.phoenix.compile.*; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.QueryServices; @@ -47,6 +54,7 @@ import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolLastVerifyTime; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolSourceTable; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue; @@ -68,6 +76,46 @@ public class PhoenixServerBuildIndexInputFormat extends Ph public PhoenixServerBuildIndexInputFormat() { } + private interface QueryPlanBuilder { + QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName, + String indexTableFullName) throws SQLException; + } + + private class DataTableQueryPlanBuilder implements QueryPlanBuilder { + @Override + public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName, + String indexTableFullName) throws SQLException { + PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName); + ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName); + MutationPlan plan = compiler.compile(indexTable); + return plan.getQueryPlan(); + } + } + + private class IndexTableQueryPlanBuilder implements QueryPlanBuilder { + @Override + public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName, + String indexTableFullName) throws SQLException { + QueryPlan plan; + try (final PhoenixStatement statement = new PhoenixStatement(phoenixConnection)) { + String query = "SELECT count(*) FROM " + indexTableFullName; + plan = statement.compileQuery(query); + TableRef tableRef = plan.getTableRef(); + Scan scan = plan.getContext().getScan(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + PTable pIndexTable = tableRef.getTable(); + PTable pDataTable = PhoenixRuntime.getTable(phoenixConnection, dataTableFullName); + IndexMaintainer.serialize(pDataTable, ptr, Collections.singletonList(pIndexTable), phoenixConnection); + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); + ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + } + return plan; + } + } + + private QueryPlanBuilder queryPlanBuilder; + @Override protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) throws IOException { @@ -90,6 +138,9 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration c } String dataTableFullName = getIndexToolDataTableName(configuration); String indexTableFullName = getIndexToolIndexTableName(configuration); + SourceTable sourceTable = getIndexToolSourceTable(configuration); + queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ? + new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder(); try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) { PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class); @@ -97,11 +148,10 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration c setCurrentScnValue(configuration, scn); Long startTime = (startTimeValue == null) ? 0L : Long.valueOf(startTimeValue); - PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName); - ServerBuildIndexCompiler compiler = - new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName); - MutationPlan plan = compiler.compile(indexTable); - Scan scan = plan.getContext().getScan(); + + queryPlan = queryPlanBuilder.getQueryPlan(phoenixConnection, dataTableFullName, indexTableFullName); + Scan scan = queryPlan.getContext().getScan(); + Long lastVerifyTimeValue = lastVerifyTime == null ? 0L : Long.valueOf(lastVerifyTime); try { scan.setTimeRange(startTime, scn); @@ -126,7 +176,6 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration c } catch (IOException e) { throw new SQLException(e); } - queryPlan = plan.getQueryPlan(); // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver if (txnScnValue != null) { scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue))); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index a20bb73250c..c574ea5ac3a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -51,7 +51,6 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; @@ -85,6 +84,7 @@ import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.CsvBulkImportUtil; import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames; import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; import org.apache.phoenix.mapreduce.util.ConnectionUtil; @@ -200,6 +200,7 @@ public static IndexDisableLoggingType fromValue(byte[] value) { private boolean isPartialBuild, isForeground; private IndexVerifyType indexVerifyType = IndexVerifyType.NONE; private IndexDisableLoggingType disableLoggingType = IndexDisableLoggingType.NONE; + private SourceTable sourceTable = SourceTable.DATA_TABLE_SOURCE; private String qDataTable; private String qIndexTable; private boolean useSnapshot; @@ -285,6 +286,12 @@ public static IndexDisableLoggingType fromValue(byte[] value) { , "Disable logging of failed verification rows for BEFORE, " + "AFTER, or BOTH verify jobs"); + private static final Option USE_INDEX_TABLE_AS_SOURCE_OPTION = + new Option("fi", "from-index", false, + "To verify every row in the index table has a corresponding row in the data table. " + + "Only supported for global indexes. If this option is used with -v AFTER, these " + + "extra rows will be identified but not repaired."); + public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s"; public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than " @@ -294,7 +301,6 @@ public static IndexDisableLoggingType fromValue(byte[] value) { public static final String FEATURE_NOT_APPLICABLE = "start-time/end-time and retry verify feature are only " + "applicable for local or non-transactional global indexes"; - public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts " + "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table"; @@ -323,6 +329,7 @@ private Options getOptions() { options.addOption(END_TIME_OPTION); options.addOption(RETRY_VERIFY_OPTION); options.addOption(DISABLE_LOGGING_OPTION); + options.addOption(USE_INDEX_TABLE_AS_SOURCE_OPTION); return options; } @@ -445,6 +452,8 @@ public IndexTool.IndexDisableLoggingType getDisableLoggingType() { return disableLoggingType; } + public IndexScrutinyTool.SourceTable getSourceTable() { return sourceTable; } + class JobFactory { Connection connection; Configuration configuration; @@ -692,6 +701,7 @@ private Job configureJobForServerBuildIndex() throws Exception { PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable); PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable); + PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable); if (startTime != null) { PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime); } @@ -849,6 +859,7 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception { boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt()); boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt()); boolean disableLogging = cmdLine.hasOption(DISABLE_LOGGING_OPTION.getOpt()); + boolean useIndexTableAsSource = cmdLine.hasOption(USE_INDEX_TABLE_AS_SOURCE_OPTION.getOpt()); if (useTenantId) { tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); @@ -875,6 +886,11 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception { cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt())); } } + + if (useIndexTableAsSource) { + sourceTable = SourceTable.INDEX_TABLE_SOURCE; + } + schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 82c1f2f8597..fc9c5baedbd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -41,6 +41,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper; import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool; import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; import org.apache.phoenix.mapreduce.index.IndexTool; @@ -125,6 +126,8 @@ public final class PhoenixConfigurationUtil { public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name"; + public static final String INDEX_TOOL_SOURCE_TABLE = "phoenix.mr.index_tool.source.table"; + public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table"; public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size"; @@ -660,6 +663,19 @@ public static String getIndexToolIndexTableName(Configuration configuration) { return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME); } + public static void setIndexToolSourceTable(Configuration configuration, + IndexScrutinyTool.SourceTable sourceTable) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(sourceTable); + configuration.set(INDEX_TOOL_SOURCE_TABLE, sourceTable.name()); + } + + public static IndexScrutinyTool.SourceTable getIndexToolSourceTable(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return IndexScrutinyTool.SourceTable.valueOf(configuration.get(INDEX_TOOL_SOURCE_TABLE, + IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE.name())); + } + public static void setScrutinySourceTable(Configuration configuration, SourceTable sourceTable) { Preconditions.checkNotNull(configuration); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java index 87215d0fa24..916552b3ac2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java @@ -19,10 +19,12 @@ import org.apache.commons.cli.CommandLine; import org.apache.phoenix.end2end.IndexToolIT; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexScrutiny; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -34,9 +36,8 @@ import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE; import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE; -import static org.junit.Assert.assertEquals; import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE; -import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.when; public class IndexToolTest extends BaseTest { @@ -335,4 +336,35 @@ public void verifyDisableLoggingException(IndexTool.IndexDisableLoggingType disa CommandLine cmdLine = it.parseOptions(args); } + @Test + public void testIndexToolDefaultSource() throws Exception { + Long startTime = 1L; + Long endTime = 10L; + String [] args = + IndexToolIT.getArgValues(true, true, schema, + dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE, + startTime , endTime); + CommandLine cmdLine = it.parseOptions(args); + it.populateIndexToolAttributes(cmdLine); + assertEquals(IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE, it.getSourceTable()); + } + + @Test + public void testIndexToolFromIndexSource() throws Exception { + verifyFromIndexOption(IndexTool.IndexVerifyType.ONLY); + verifyFromIndexOption(IndexTool.IndexVerifyType.BEFORE); + } + + private void verifyFromIndexOption(IndexTool.IndexVerifyType verifyType) throws Exception { + Long startTime = 1L; + Long endTime = 10L; + String[] args = + IndexToolIT.getArgValues(true, true, schema, + dataTable, indexTable, tenantId, verifyType, + startTime, endTime, IndexTool.IndexDisableLoggingType.BEFORE, null, true); + CommandLine cmdLine = it.parseOptions(args); + it.populateIndexToolAttributes(cmdLine); + assertEquals(IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE, it.getSourceTable()); + } + } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java index 840e9d56b7d..c316fa45c08 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.query.BaseConnectionlessQueryTest; @@ -312,4 +313,21 @@ public void testLastVerifyTimeConfig() { Long.parseLong(PhoenixConfigurationUtil.getIndexToolLastVerifyTime(configuration))); } + + @Test + public void testIndexToolSourceConfig() { + final Configuration conf = new Configuration(); + + // by default source is data table + SourceTable sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf); + Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE); + + PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.INDEX_TABLE_SOURCE); + sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf); + Assert.assertEquals(sourceTable, SourceTable.INDEX_TABLE_SOURCE); + + PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.DATA_TABLE_SOURCE); + sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf); + Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE); + } }