Skip to content

Commit

Permalink
PHOENIX-6200 Add counters for extra index rows, log results to PIT an…
Browse files Browse the repository at this point in the history
…d PIT_RESULT table (#995)

* PHOENIX-6200 Add counters for extra index rows, log results to PIT and PIT_RESULT table

* Address feedback
  • Loading branch information
tkhurana committed Dec 9, 2020
1 parent 92c8579 commit 8af9cd8
Show file tree
Hide file tree
Showing 10 changed files with 858 additions and 82 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
Expand All @@ -42,21 +39,21 @@
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
Expand Down Expand Up @@ -86,15 +83,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
Expand Down Expand Up @@ -636,6 +625,10 @@ public static void assertExplainPlan(boolean localIndex, String actualExplainPla
actualExplainPlan.contains(expectedExplainPlan));
}

public static CounterGroup getMRJobCounters(IndexTool indexTool) throws IOException {
return indexTool.getJob().getCounters().getGroup(PhoenixIndexToolJobCounters.class.getName());
}

private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_CELLS;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_ROW;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.MISSING_ROW;
import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
Expand All @@ -111,6 +112,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
public static final String ERROR_MESSAGE_MISSING_INDEX_ROW = "Missing index row";
public static final String ERROR_MESSAGE_EXTRA_INDEX_ROW = "Extra index row";
public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS =
"phoenix.index.mr.log.beyond.max.lookback.errors";
public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
Expand Down Expand Up @@ -323,6 +325,7 @@ protected boolean isColumnIncluded(Cell cell) {
byte[] qualifier = CellUtil.cloneQualifier(cell);
return set.contains(qualifier);
}

@VisibleForTesting
public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
Expand Down Expand Up @@ -374,6 +377,7 @@ private void closeTables() throws IOException {
dataHTable.close();
}
}

@Override
public void close() throws IOException {
innerScanner.close();
Expand Down Expand Up @@ -429,14 +433,19 @@ public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, lon
null, isBeforeRebuild, errorType);
}

protected byte[] getDataTableName() {
return region.getRegionInfo().getTable().getName();
}

@VisibleForTesting
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild,
IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
ungroupedAggregateRegionObserver.checkForRegionClosing();
byte[] dataTableName = getDataTableName();
verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType);
dataTableName, isBeforeRebuild, errorType);
}

private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
Expand Down Expand Up @@ -623,6 +632,37 @@ private void updateUnverifiedIndexRowCounters(Put actual, long expectedTs, List<
}
}

/**
* actualIndexMutationList is the list of all the mutations of a single extra index row (i.e. not referenced by data row)
* ordered by decreasing order of timestamps with Deletes before Puts
*/
private void logExtraIndexRowAndUpdateCounters(List<Mutation> actualIndexMutationList,
IndexToolVerificationResult.PhaseResult verificationPhaseResult, boolean isBeforeRebuild) throws IOException {
for (Mutation m : actualIndexMutationList) {
// this extra row in the index table has already been deleted
if ((m instanceof Delete)) {
return;
}

// check the empty column status of latest (most recent) put mutation
if (isVerified((Put) m)) {
verificationPhaseResult.setExtraVerifiedIndexRowCount(
verificationPhaseResult.getExtraVerifiedIndexRowCount() + 1);
} else {
verificationPhaseResult.setExtraUnverifiedIndexRowCount(
verificationPhaseResult.getExtraUnverifiedIndexRowCount() + 1);
}

byte[] indexKey = m.getRow();
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexKey), viewConstants);
String errorMsg = ERROR_MESSAGE_EXTRA_INDEX_ROW;
IndexVerificationOutputRepository.IndexVerificationErrorType errorType = EXTRA_ROW;
logToIndexToolOutputTable(dataKey, indexKey, 0, getTimestamp(m), errorMsg,
isBeforeRebuild, errorType);
break;
}
}

/**
* In this method, the actual list is repaired in memory using the expected list which is actually the output of
* rebuilding the index table row. The result of this repair is used only for verification.
Expand Down Expand Up @@ -767,6 +807,7 @@ public boolean verifySingleIndexRow(byte[] indexRowKey, List<Mutation> actualMut
if (actualMutationList == null || actualMutationList.isEmpty()) {
throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
}

if (isBeforeRebuild) {
Mutation m = actualMutationList.get(0);
if (m instanceof Put && (mostRecentIndexRowKeys.isEmpty() || mostRecentIndexRowKeys.contains(m.getRow()))) {
Expand All @@ -778,7 +819,9 @@ public boolean verifySingleIndexRow(byte[] indexRowKey, List<Mutation> actualMut
if (verifyType == IndexTool.IndexVerifyType.ONLY) {
repairActualMutationList(actualMutationList, expectedMutationList);
}
// actualMutationList can be empty after returning from this function
cleanUpActualMutationList(actualMutationList);

long currentTime = EnvironmentEdgeManager.currentTimeMillis();
int actualIndex = 0;
int expectedIndex = 0;
Expand Down Expand Up @@ -883,6 +926,11 @@ public boolean verifySingleIndexRow(byte[] indexRowKey, List<Mutation> actualMut
logMismatch(expected, actual, expectedIndex, verificationPhaseResult, isBeforeRebuild);
}
else {
if (expected == null) {
// Happens when the actualMutationList becomes empty after returning from
// the cleanUpActualMutationList function.
expected = expectedMutationList.get(0);
}
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants);
String errorMsg = String.format("Not matching index row. expectedIndex=%d. expectedMutationSize=%d. actualIndex=%d. actualMutationSize=%d. expectedType=%s. actualType=%s",
expectedIndex, expectedSize, actualIndex, actualSize, expected.getClass().getName(), (actualIndex < actualSize ? actual.getClass().getName() : "null"));
Expand All @@ -909,10 +957,11 @@ protected void verifyIndexRows(Map<byte[], List<Mutation>> actualIndexMutationMa
if (expectedMutationList != null) {
if (!verifySingleIndexRow(entry.getKey(), entry.getValue(), expectedMutationList, mostRecentIndexRowKeys,
indexRowsToBeDeleted, verificationPhaseResult, isBeforeRebuild)) {
invalidIndexRows.put(indexRowKey, actualIndexMutationMap.get(indexRowKey));
invalidIndexRows.put(indexRowKey, expectedMutationList);
}
expectedIndexMutationMap.remove(indexRowKey);
} else {
logExtraIndexRowAndUpdateCounters(entry.getValue(), verificationPhaseResult, isBeforeRebuild);
indexRowsToBeDeleted.add(indexMaintainer.buildRowDeleteMutation(indexRowKey,
IndexMaintainer.DeleteType.ALL_VERSIONS, getTimestamp(entry.getValue().get(0))));
}
Expand Down Expand Up @@ -965,6 +1014,7 @@ protected void verifyIndexRows(Map<byte[], List<Mutation>> actualIndexMutationMa
logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg,
isBeforeRebuild, errorType);
}

// Leave the invalid and missing rows in indexMutationMap
expectedIndexMutationMap.putAll(invalidIndexRows);
}
Expand Down Expand Up @@ -1010,7 +1060,11 @@ protected void updateIndexRows(Map<byte[], List<Mutation>> indexMutationMap,
verificationResult.setRebuiltIndexRowCount(verificationResult.getRebuiltIndexRowCount() + indexMutationMap.size());
}
} catch (Throwable t) {
ServerUtil.throwIOException(indexHTable.getName().toString(), t);
if (indexHTable != null) {
ServerUtil.throwIOException(indexHTable.getName().toString(), t);
} else {
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public IndexRepairRegionScanner(final RegionScanner innerScanner,
}
}

@Override
public byte[] getDataTableName() {
return dataHTable.getName().toBytes();
}

public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
Put put = null;
Delete del = null;
Expand Down Expand Up @@ -178,6 +183,28 @@ private Map<byte[], List<Mutation>> populateActualIndexMutationMap(Map<byte[], L
return actualIndexMutationMap;
}

private Map<byte[], List<Mutation>> populateActualIndexMutationMap() throws IOException {
Map<byte[], List<Mutation>> actualIndexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
Scan indexScan = new Scan();
indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
indexScan.setRaw(true);
indexScan.setMaxVersions();
indexScan.setCacheBlocks(false);
try (RegionScanner regionScanner = region.getScanner(indexScan)) {
do {
ungroupedAggregateRegionObserver.checkForRegionClosing();
List<Cell> row = new ArrayList<Cell>();
hasMore = regionScanner.nextRaw(row);
if (!row.isEmpty()) {
populateIndexMutationFromIndexRow(row, actualIndexMutationMap);
}
} while (hasMore);
} catch (Throwable t) {
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
}
return actualIndexMutationMap;
}

private void repairAndOrVerifyIndexRows(Set<byte[]> dataRowKeys,
Map<byte[], List<Mutation>> actualIndexMutationMap,
IndexToolVerificationResult verificationResult) throws IOException {
Expand All @@ -188,7 +215,7 @@ private void repairAndOrVerifyIndexRows(Set<byte[]> dataRowKeys,
return;
}
if (verifyType == IndexTool.IndexVerifyType.ONLY) {
verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getBefore(), true);
verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
return;
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE) {
Expand All @@ -200,11 +227,12 @@ private void repairAndOrVerifyIndexRows(Set<byte[]> dataRowKeys,
}
if (verifyType == IndexTool.IndexVerifyType.AFTER) {
repairIndexRows(expectedIndexMutationMap, Collections.EMPTY_LIST, verificationResult);
verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getAfter(), false);
actualIndexMutationMap = populateActualIndexMutationMap();
verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getAfter(), false);
return;
}
if (verifyType == IndexTool.IndexVerifyType.BOTH) {
verifyIndexRows(actualIndexMutationMap,expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
if (!expectedIndexMutationMap.isEmpty() || !indexRowsToBeDeleted.isEmpty()) {
repairIndexRows(expectedIndexMutationMap, indexRowsToBeDeleted, verificationResult);
}
Expand All @@ -215,6 +243,7 @@ private void repairAndOrVerifyIndexRows(Set<byte[]> dataRowKeys,
}
}


private void addRepairAndOrVerifyTask(TaskBatch<Boolean> tasks,
final Set<byte[]> dataRowKeys,
final Map<byte[], List<Mutation>> actualIndexMutationMap,
Expand Down
Loading

0 comments on commit 8af9cd8

Please sign in to comment.