Skip to content

Commit

Permalink
PHOENIX-6198 Add option to IndexTool to specify the source table for …
Browse files Browse the repository at this point in the history
…scan (#937)

* PHOENIX-6198 Add option to IndexTool to specify the source table for scan

* Addressed feedback for PHOENIX-6198

Extended the `-from-index` option to support -vBOTH, -vAFTER and -vNONE.
Added the disclaimer for -vAFTER. Also, using the source table enum from
IndexScrutinyTool.
  • Loading branch information
tkhurana committed Nov 9, 2020
1 parent 605656c commit 0d33ce9
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,14 @@ private static List<String> getArgList (boolean directApi, boolean useSnapshot,
IndexTool.IndexVerifyType verifyType, Long startTime,
Long endTime, Long incrementalVerify) {
return getArgList(directApi, useSnapshot, schemaName, dataTable, indxTable, tenantId,
verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify);
verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify, false);
}

private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime,
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify) {
Long incrementalVerify, boolean useIndexTableAsSource) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
Expand Down Expand Up @@ -692,6 +692,11 @@ private static List<String> getArgList (boolean directApi, boolean useSnapshot,
args.add("-rv");
args.add(String.valueOf(incrementalVerify));
}

if (useIndexTableAsSource) {
args.add("-fi");
}

args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args;
Expand All @@ -708,7 +713,7 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri
String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType,
IndexTool.IndexDisableLoggingType disableLoggingType) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, null, null, disableLoggingType, null);
tenantId, verifyType, null, null, disableLoggingType, null, false);
return args.toArray(new String[0]);
}

Expand All @@ -727,7 +732,19 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify ) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify);
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, false);
return args.toArray(new String[0]);
}

public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indexTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime,
Long endTime,
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify,
boolean useIndexTableAsSource) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, useIndexTableAsSource);
return args.toArray(new String[0]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.*;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryServices;
Expand All @@ -47,6 +54,7 @@
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolLastVerifyTime;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolSourceTable;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
Expand All @@ -68,6 +76,46 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
public PhoenixServerBuildIndexInputFormat() {
}

private interface QueryPlanBuilder {
QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
String indexTableFullName) throws SQLException;
}

private class DataTableQueryPlanBuilder implements QueryPlanBuilder {
@Override
public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
String indexTableFullName) throws SQLException {
PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
MutationPlan plan = compiler.compile(indexTable);
return plan.getQueryPlan();
}
}

private class IndexTableQueryPlanBuilder implements QueryPlanBuilder {
@Override
public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
String indexTableFullName) throws SQLException {
QueryPlan plan;
try (final PhoenixStatement statement = new PhoenixStatement(phoenixConnection)) {
String query = "SELECT count(*) FROM " + indexTableFullName;
plan = statement.compileQuery(query);
TableRef tableRef = plan.getTableRef();
Scan scan = plan.getContext().getScan();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
PTable pIndexTable = tableRef.getTable();
PTable pDataTable = PhoenixRuntime.getTable(phoenixConnection, dataTableFullName);
IndexMaintainer.serialize(pDataTable, ptr, Collections.singletonList(pIndexTable), phoenixConnection);
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
}
return plan;
}
}

private QueryPlanBuilder queryPlanBuilder;

@Override
protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
throws IOException {
Expand All @@ -90,18 +138,20 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration c
}
String dataTableFullName = getIndexToolDataTableName(configuration);
String indexTableFullName = getIndexToolIndexTableName(configuration);
SourceTable sourceTable = getIndexToolSourceTable(configuration);
queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ?
new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder();

try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
setCurrentScnValue(configuration, scn);

Long startTime = (startTimeValue == null) ? 0L : Long.valueOf(startTimeValue);
PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
ServerBuildIndexCompiler compiler =
new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
MutationPlan plan = compiler.compile(indexTable);
Scan scan = plan.getContext().getScan();

queryPlan = queryPlanBuilder.getQueryPlan(phoenixConnection, dataTableFullName, indexTableFullName);
Scan scan = queryPlan.getContext().getScan();

Long lastVerifyTimeValue = lastVerifyTime == null ? 0L : Long.valueOf(lastVerifyTime);
try {
scan.setTimeRange(startTime, scn);
Expand All @@ -126,7 +176,6 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration c
} catch (IOException e) {
throw new SQLException(e);
}
queryPlan = plan.getQueryPlan();
// since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
if (txnScnValue != null) {
scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
Expand Down Expand Up @@ -85,6 +84,7 @@
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
Expand Down Expand Up @@ -200,6 +200,7 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
private boolean isPartialBuild, isForeground;
private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
private IndexDisableLoggingType disableLoggingType = IndexDisableLoggingType.NONE;
private SourceTable sourceTable = SourceTable.DATA_TABLE_SOURCE;
private String qDataTable;
private String qIndexTable;
private boolean useSnapshot;
Expand Down Expand Up @@ -285,6 +286,12 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
, "Disable logging of failed verification rows for BEFORE, " +
"AFTER, or BOTH verify jobs");

private static final Option USE_INDEX_TABLE_AS_SOURCE_OPTION =
new Option("fi", "from-index", false,
"To verify every row in the index table has a corresponding row in the data table. "
+ "Only supported for global indexes. If this option is used with -v AFTER, these "
+ "extra rows will be identified but not repaired.");

public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";

public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
Expand All @@ -294,7 +301,6 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
public static final String FEATURE_NOT_APPLICABLE = "start-time/end-time and retry verify feature are only "
+ "applicable for local or non-transactional global indexes";


public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts "
+ "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table";

Expand Down Expand Up @@ -323,6 +329,7 @@ private Options getOptions() {
options.addOption(END_TIME_OPTION);
options.addOption(RETRY_VERIFY_OPTION);
options.addOption(DISABLE_LOGGING_OPTION);
options.addOption(USE_INDEX_TABLE_AS_SOURCE_OPTION);
return options;
}

Expand Down Expand Up @@ -445,6 +452,8 @@ public IndexTool.IndexDisableLoggingType getDisableLoggingType() {
return disableLoggingType;
}

public IndexScrutinyTool.SourceTable getSourceTable() { return sourceTable; }

class JobFactory {
Connection connection;
Configuration configuration;
Expand Down Expand Up @@ -692,6 +701,7 @@ private Job configureJobForServerBuildIndex() throws Exception {

PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable);
if (startTime != null) {
PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
}
Expand Down Expand Up @@ -849,6 +859,7 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt());
boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
boolean disableLogging = cmdLine.hasOption(DISABLE_LOGGING_OPTION.getOpt());
boolean useIndexTableAsSource = cmdLine.hasOption(USE_INDEX_TABLE_AS_SOURCE_OPTION.getOpt());

if (useTenantId) {
tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
Expand All @@ -875,6 +886,11 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt()));
}
}

if (useIndexTableAsSource) {
sourceTable = SourceTable.INDEX_TABLE_SOURCE;
}

schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.IndexTool;
Expand Down Expand Up @@ -125,6 +126,8 @@ public final class PhoenixConfigurationUtil {

public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name";

public static final String INDEX_TOOL_SOURCE_TABLE = "phoenix.mr.index_tool.source.table";

public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table";

public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size";
Expand Down Expand Up @@ -660,6 +663,19 @@ public static String getIndexToolIndexTableName(Configuration configuration) {
return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME);
}

public static void setIndexToolSourceTable(Configuration configuration,
IndexScrutinyTool.SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(sourceTable);
configuration.set(INDEX_TOOL_SOURCE_TABLE, sourceTable.name());
}

public static IndexScrutinyTool.SourceTable getIndexToolSourceTable(Configuration configuration) {
Preconditions.checkNotNull(configuration);
return IndexScrutinyTool.SourceTable.valueOf(configuration.get(INDEX_TOOL_SOURCE_TABLE,
IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE.name()));
}

public static void setScrutinySourceTable(Configuration configuration,
SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import org.apache.commons.cli.CommandLine;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -34,9 +36,8 @@

import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE;
import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
import static org.junit.Assert.assertEquals;
import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

public class IndexToolTest extends BaseTest {
Expand Down Expand Up @@ -335,4 +336,35 @@ public void verifyDisableLoggingException(IndexTool.IndexDisableLoggingType disa
CommandLine cmdLine = it.parseOptions(args);
}

@Test
public void testIndexToolDefaultSource() throws Exception {
Long startTime = 1L;
Long endTime = 10L;
String [] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
startTime , endTime);
CommandLine cmdLine = it.parseOptions(args);
it.populateIndexToolAttributes(cmdLine);
assertEquals(IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE, it.getSourceTable());
}

@Test
public void testIndexToolFromIndexSource() throws Exception {
verifyFromIndexOption(IndexTool.IndexVerifyType.ONLY);
verifyFromIndexOption(IndexTool.IndexVerifyType.BEFORE);
}

private void verifyFromIndexOption(IndexTool.IndexVerifyType verifyType) throws Exception {
Long startTime = 1L;
Long endTime = 10L;
String[] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, verifyType,
startTime, endTime, IndexTool.IndexDisableLoggingType.BEFORE, null, true);
CommandLine cmdLine = it.parseOptions(args);
it.populateIndexToolAttributes(cmdLine);
assertEquals(IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE, it.getSourceTable());
}

}
Loading

0 comments on commit 0d33ce9

Please sign in to comment.