Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "PHOENIX-6198 Add option to IndexTool to specify the source table for scan" #968

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,14 @@ private static List<String> getArgList (boolean directApi, boolean useSnapshot,
IndexTool.IndexVerifyType verifyType, Long startTime,
Long endTime, Long incrementalVerify) {
return getArgList(directApi, useSnapshot, schemaName, dataTable, indxTable, tenantId,
verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify, false);
verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify);
}

private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime,
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify, boolean useIndexTableAsSource) {
Long incrementalVerify) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
Expand Down Expand Up @@ -692,11 +692,6 @@ private static List<String> getArgList (boolean directApi, boolean useSnapshot,
args.add("-rv");
args.add(String.valueOf(incrementalVerify));
}

if (useIndexTableAsSource) {
args.add("-fi");
}

args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args;
Expand All @@ -713,7 +708,7 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri
String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType,
IndexTool.IndexDisableLoggingType disableLoggingType) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, null, null, disableLoggingType, null, false);
tenantId, verifyType, null, null, disableLoggingType, null);
return args.toArray(new String[0]);
}

Expand All @@ -732,19 +727,7 @@ public static String[] getArgValues(boolean directApi, boolean useSnapshot, Stri
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify ) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, false);
return args.toArray(new String[0]);
}

public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indexTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime,
Long endTime,
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify,
boolean useIndexTableAsSource) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, useIndexTableAsSource);
tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify);
return args.toArray(new String[0]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,18 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.*;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryServices;
Expand All @@ -54,7 +47,6 @@
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolLastVerifyTime;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolSourceTable;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
Expand All @@ -76,46 +68,6 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
public PhoenixServerBuildIndexInputFormat() {
}

private interface QueryPlanBuilder {
QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
String indexTableFullName) throws SQLException;
}

private class DataTableQueryPlanBuilder implements QueryPlanBuilder {
@Override
public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
String indexTableFullName) throws SQLException {
PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
MutationPlan plan = compiler.compile(indexTable);
return plan.getQueryPlan();
}
}

private class IndexTableQueryPlanBuilder implements QueryPlanBuilder {
@Override
public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
String indexTableFullName) throws SQLException {
QueryPlan plan;
try (final PhoenixStatement statement = new PhoenixStatement(phoenixConnection)) {
String query = "SELECT count(*) FROM " + indexTableFullName;
plan = statement.compileQuery(query);
TableRef tableRef = plan.getTableRef();
Scan scan = plan.getContext().getScan();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
PTable pIndexTable = tableRef.getTable();
PTable pDataTable = PhoenixRuntime.getTable(phoenixConnection, dataTableFullName);
IndexMaintainer.serialize(pDataTable, ptr, Collections.singletonList(pIndexTable), phoenixConnection);
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
}
return plan;
}
}

private QueryPlanBuilder queryPlanBuilder;

@Override
protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
throws IOException {
Expand All @@ -138,20 +90,18 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration c
}
String dataTableFullName = getIndexToolDataTableName(configuration);
String indexTableFullName = getIndexToolIndexTableName(configuration);
SourceTable sourceTable = getIndexToolSourceTable(configuration);
queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ?
new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder();

try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
setCurrentScnValue(configuration, scn);

Long startTime = (startTimeValue == null) ? 0L : Long.valueOf(startTimeValue);

queryPlan = queryPlanBuilder.getQueryPlan(phoenixConnection, dataTableFullName, indexTableFullName);
Scan scan = queryPlan.getContext().getScan();

PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
ServerBuildIndexCompiler compiler =
new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
MutationPlan plan = compiler.compile(indexTable);
Scan scan = plan.getContext().getScan();
Long lastVerifyTimeValue = lastVerifyTime == null ? 0L : Long.valueOf(lastVerifyTime);
try {
scan.setTimeRange(startTime, scn);
Expand All @@ -176,6 +126,7 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration c
} catch (IOException e) {
throw new SQLException(e);
}
queryPlan = plan.getQueryPlan();
// since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
if (txnScnValue != null) {
scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
Expand Down Expand Up @@ -84,7 +85,6 @@
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
Expand Down Expand Up @@ -200,7 +200,6 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
private boolean isPartialBuild, isForeground;
private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
private IndexDisableLoggingType disableLoggingType = IndexDisableLoggingType.NONE;
private SourceTable sourceTable = SourceTable.DATA_TABLE_SOURCE;
private String qDataTable;
private String qIndexTable;
private boolean useSnapshot;
Expand Down Expand Up @@ -286,12 +285,6 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
, "Disable logging of failed verification rows for BEFORE, " +
"AFTER, or BOTH verify jobs");

private static final Option USE_INDEX_TABLE_AS_SOURCE_OPTION =
new Option("fi", "from-index", false,
"To verify every row in the index table has a corresponding row in the data table. "
+ "Only supported for global indexes. If this option is used with -v AFTER, these "
+ "extra rows will be identified but not repaired.");

public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";

public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
Expand All @@ -301,6 +294,7 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
public static final String FEATURE_NOT_APPLICABLE = "start-time/end-time and retry verify feature are only "
+ "applicable for local or non-transactional global indexes";


public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts "
+ "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table";

Expand Down Expand Up @@ -329,7 +323,6 @@ private Options getOptions() {
options.addOption(END_TIME_OPTION);
options.addOption(RETRY_VERIFY_OPTION);
options.addOption(DISABLE_LOGGING_OPTION);
options.addOption(USE_INDEX_TABLE_AS_SOURCE_OPTION);
return options;
}

Expand Down Expand Up @@ -452,8 +445,6 @@ public IndexTool.IndexDisableLoggingType getDisableLoggingType() {
return disableLoggingType;
}

public IndexScrutinyTool.SourceTable getSourceTable() { return sourceTable; }

class JobFactory {
Connection connection;
Configuration configuration;
Expand Down Expand Up @@ -701,7 +692,6 @@ private Job configureJobForServerBuildIndex() throws Exception {

PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable);
if (startTime != null) {
PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
}
Expand Down Expand Up @@ -859,7 +849,6 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt());
boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
boolean disableLogging = cmdLine.hasOption(DISABLE_LOGGING_OPTION.getOpt());
boolean useIndexTableAsSource = cmdLine.hasOption(USE_INDEX_TABLE_AS_SOURCE_OPTION.getOpt());

if (useTenantId) {
tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
Expand All @@ -886,11 +875,6 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt()));
}
}

if (useIndexTableAsSource) {
sourceTable = SourceTable.INDEX_TABLE_SOURCE;
}

schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.IndexTool;
Expand Down Expand Up @@ -126,8 +125,6 @@ public final class PhoenixConfigurationUtil {

public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name";

public static final String INDEX_TOOL_SOURCE_TABLE = "phoenix.mr.index_tool.source.table";

public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table";

public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size";
Expand Down Expand Up @@ -663,19 +660,6 @@ public static String getIndexToolIndexTableName(Configuration configuration) {
return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME);
}

public static void setIndexToolSourceTable(Configuration configuration,
IndexScrutinyTool.SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(sourceTable);
configuration.set(INDEX_TOOL_SOURCE_TABLE, sourceTable.name());
}

public static IndexScrutinyTool.SourceTable getIndexToolSourceTable(Configuration configuration) {
Preconditions.checkNotNull(configuration);
return IndexScrutinyTool.SourceTable.valueOf(configuration.get(INDEX_TOOL_SOURCE_TABLE,
IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE.name()));
}

public static void setScrutinySourceTable(Configuration configuration,
SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

import org.apache.commons.cli.CommandLine;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -36,8 +34,9 @@

import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE;
import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE;
import static org.junit.Assert.assertEquals;
import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class IndexToolTest extends BaseTest {
Expand Down Expand Up @@ -336,35 +335,4 @@ public void verifyDisableLoggingException(IndexTool.IndexDisableLoggingType disa
CommandLine cmdLine = it.parseOptions(args);
}

@Test
public void testIndexToolDefaultSource() throws Exception {
Long startTime = 1L;
Long endTime = 10L;
String [] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
startTime , endTime);
CommandLine cmdLine = it.parseOptions(args);
it.populateIndexToolAttributes(cmdLine);
assertEquals(IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE, it.getSourceTable());
}

@Test
public void testIndexToolFromIndexSource() throws Exception {
verifyFromIndexOption(IndexTool.IndexVerifyType.ONLY);
verifyFromIndexOption(IndexTool.IndexVerifyType.BEFORE);
}

private void verifyFromIndexOption(IndexTool.IndexVerifyType verifyType) throws Exception {
Long startTime = 1L;
Long endTime = 10L;
String[] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, verifyType,
startTime, endTime, IndexTool.IndexDisableLoggingType.BEFORE, null, true);
CommandLine cmdLine = it.parseOptions(args);
it.populateIndexToolAttributes(cmdLine);
assertEquals(IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE, it.getSourceTable());
}

}
Loading