Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-6198 Add option to IndexTool to specify the source table for scan #937

Merged
merged 2 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,14 @@ private static List<String> getArgList (boolean directApi, boolean useSnapshot,
IndexTool.IndexVerifyType verifyType, Long startTime,
Long endTime, Long incrementalVerify) {
return getArgList(directApi, useSnapshot, schemaName, dataTable, indxTable, tenantId,
verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify);
verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify, false);
}

private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime,
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify) {
Long incrementalVerify, boolean useIndexTableAsSource) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
Expand Down Expand Up @@ -692,6 +692,11 @@ private static List<String> getArgList (boolean directApi, boolean useSnapshot,
args.add("-rv");
args.add(String.valueOf(incrementalVerify));
}

if (useIndexTableAsSource) {
args.add("-fi");
}

args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args;
Expand All @@ -708,7 +713,7 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri
String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType,
IndexTool.IndexDisableLoggingType disableLoggingType) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, null, null, disableLoggingType, null);
tenantId, verifyType, null, null, disableLoggingType, null, false);
return args.toArray(new String[0]);
}

Expand All @@ -727,7 +732,19 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify ) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify);
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, false);
return args.toArray(new String[0]);
}

public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indexTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime,
Long endTime,
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify,
boolean useIndexTableAsSource) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, useIndexTableAsSource);
return args.toArray(new String[0]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
Expand Down Expand Up @@ -192,6 +191,14 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
}
}

/**
* Which table to use as the source table
*/
public enum SourceTable {
tkhurana marked this conversation as resolved.
Show resolved Hide resolved
DATA_TABLE_SOURCE,
INDEX_TABLE_SOURCE;
}

private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);

private String schemaName;
Expand All @@ -200,6 +207,7 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
private boolean isPartialBuild, isForeground;
private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
private IndexDisableLoggingType disableLoggingType = IndexDisableLoggingType.NONE;
private SourceTable sourceTable = SourceTable.DATA_TABLE_SOURCE;
private String qDataTable;
private String qIndexTable;
private boolean useSnapshot;
Expand Down Expand Up @@ -285,6 +293,11 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
, "Disable logging of failed verification rows for BEFORE, " +
"AFTER, or BOTH verify jobs");

private static final Option USE_INDEX_TABLE_AS_SOURCE_OPTION =
new Option("fi", "from-index", false,
"To verify every row in the index table has a corresponding row in the data table. " +
"Only supported for global indexes. This option is applicable to BEFORE and ONLY type verification jobs");
tkhurana marked this conversation as resolved.
Show resolved Hide resolved

public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";

public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
Expand All @@ -294,7 +307,6 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
public static final String FEATURE_NOT_APPLICABLE = "start-time/end-time and retry verify feature are only "
+ "applicable for local or non-transactional global indexes";


public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts "
+ "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table";

Expand Down Expand Up @@ -323,6 +335,7 @@ private Options getOptions() {
options.addOption(END_TIME_OPTION);
options.addOption(RETRY_VERIFY_OPTION);
options.addOption(DISABLE_LOGGING_OPTION);
options.addOption(USE_INDEX_TABLE_AS_SOURCE_OPTION);
return options;
}

Expand Down Expand Up @@ -377,6 +390,12 @@ public CommandLine parseOptions(String[] args) {
if (splitIndex && cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())) {
throw new IllegalStateException("Cannot split index for a partial rebuild, as the index table is dropped");
}
if (!isFromIndexCompatibleWithVerifyOption(cmdLine)) {
throw new IllegalStateException("Can't use index table as source when no index " +
"verification or the wrong kind of index verification has been requested. " +
"VerifyType: [" + cmdLine.getOptionValue(VERIFY_OPTION.getOpt()) + "] " +
"Supported types ONLY,BEFORE");
}
if (loggingDisabledMismatchesVerifyOption(cmdLine)){
throw new IllegalStateException("Can't disable index verification logging when no " +
"index verification or the wrong kind of index verification has been requested. " +
Expand Down Expand Up @@ -422,6 +441,26 @@ private boolean loggingDisabledMismatchesVerifyOption(CommandLine cmdLine) {
return false;
}

private boolean isFromIndexCompatibleWithVerifyOption(CommandLine cmdLine) {
if (!cmdLine.hasOption(USE_INDEX_TABLE_AS_SOURCE_OPTION.getOpt())) {
return true;
}

// -fi only works with -v ONLY and -v BEFORE

// default -v option is NONE which is not supported
if (!cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
return false;
}
String verifyValue = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
IndexVerifyType verifyType = IndexVerifyType.fromValue(verifyValue);

if (!verifyType.equals(IndexVerifyType.BEFORE) && !verifyType.equals(IndexVerifyType.ONLY)) {
return false;
}
return true;
}

private void printHelpAndExit(String errorMessage, Options options) {
System.err.println(errorMessage);
printHelpAndExit(options, 1);
Expand All @@ -445,6 +484,8 @@ public IndexTool.IndexDisableLoggingType getDisableLoggingType() {
return disableLoggingType;
}

public IndexTool.SourceTable getSourceTable() { return sourceTable; }

class JobFactory {
Connection connection;
Configuration configuration;
Expand Down Expand Up @@ -692,6 +733,7 @@ private Job configureJobForServerBuildIndex() throws Exception {

PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable);
if (startTime != null) {
PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
}
Expand Down Expand Up @@ -849,6 +891,7 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt());
boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
boolean disableLogging = cmdLine.hasOption(DISABLE_LOGGING_OPTION.getOpt());
boolean useIndexTableAsSource = cmdLine.hasOption(USE_INDEX_TABLE_AS_SOURCE_OPTION.getOpt());

if (useTenantId) {
tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
Expand All @@ -875,6 +918,11 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt()));
}
}

if (useIndexTableAsSource) {
sourceTable = SourceTable.INDEX_TABLE_SOURCE;
}

schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public final class PhoenixConfigurationUtil {

public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name";

public static final String INDEX_TOOL_SOURCE_TABLE = "phoenix.mr.index_tool.source.table";

public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table";

public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size";
Expand Down Expand Up @@ -660,6 +662,19 @@ public static String getIndexToolIndexTableName(Configuration configuration) {
return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME);
}

public static void setIndexToolSourceTable(Configuration configuration,
IndexTool.SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(sourceTable);
configuration.set(INDEX_TOOL_SOURCE_TABLE, sourceTable.name());
}

public static IndexTool.SourceTable getIndexToolSourceTable(Configuration configuration) {
Preconditions.checkNotNull(configuration);
return IndexTool.SourceTable.valueOf(configuration.get(INDEX_TOOL_SOURCE_TABLE,
IndexTool.SourceTable.DATA_TABLE_SOURCE.name()));
}

public static void setScrutinySourceTable(Configuration configuration,
SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@

import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE;
import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
import static org.junit.Assert.assertEquals;
import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

public class IndexToolTest extends BaseTest {
Expand Down Expand Up @@ -335,4 +334,53 @@ public void verifyDisableLoggingException(IndexTool.IndexDisableLoggingType disa
CommandLine cmdLine = it.parseOptions(args);
}

@Test
public void testIndexToolDefaultSource() throws Exception {
Long startTime = 1L;
Long endTime = 10L;
String [] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
startTime , endTime);
CommandLine cmdLine = it.parseOptions(args);
it.populateIndexToolAttributes(cmdLine);
assertEquals(IndexTool.SourceTable.DATA_TABLE_SOURCE, it.getSourceTable());
}

@Test
public void testIndexToolFromIndexSource() throws Exception {
verifyFromIndexOption(IndexTool.IndexVerifyType.ONLY);
verifyFromIndexOption(IndexTool.IndexVerifyType.BEFORE);
}

@Test
public void testIndexToolFromIndexSourceWith_Wrong_VerifyOption() throws Exception {
verifyFromIndexException(IndexTool.IndexVerifyType.NONE);
verifyFromIndexException(IndexTool.IndexVerifyType.AFTER);
verifyFromIndexException(IndexTool.IndexVerifyType.BOTH);
}

private void verifyFromIndexException(IndexTool.IndexVerifyType verifyType) {
Long startTime = 1L;
Long endTime = 10L;
String[] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, verifyType,
startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, null, true);
exceptionRule.expect(IllegalStateException.class);
CommandLine cmdLine = it.parseOptions(args);
}

private void verifyFromIndexOption(IndexTool.IndexVerifyType verifyType) throws Exception {
Long startTime = 1L;
Long endTime = 10L;
String[] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, verifyType,
startTime, endTime, IndexTool.IndexDisableLoggingType.BEFORE, null, true);
CommandLine cmdLine = it.parseOptions(args);
it.populateIndexToolAttributes(cmdLine);
assertEquals(IndexTool.SourceTable.INDEX_TABLE_SOURCE, it.getSourceTable());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.mapreduce.index.IndexTool.SourceTable;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
Expand Down Expand Up @@ -312,4 +313,21 @@ public void testLastVerifyTimeConfig() {
Long.parseLong(PhoenixConfigurationUtil.getIndexToolLastVerifyTime(configuration)));

}

@Test
public void testIndexToolSourceConfig() {
final Configuration conf = new Configuration();

// by default source is data table
SourceTable sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE);

PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.INDEX_TABLE_SOURCE);
sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
Assert.assertEquals(sourceTable, SourceTable.INDEX_TABLE_SOURCE);

PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.DATA_TABLE_SOURCE);
sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE);
}
}