Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.write.IndexWriter;
Expand Down Expand Up @@ -149,16 +152,16 @@ private static class BatchMutateContext {
private final int clientVersion;
// The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
// the verified column) will have the value false ("unverified") on these mutations
private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
private ListMultimap<HTableInterfaceReference, Mutation> preIndexUpdates;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be this patch, but we should consider extracting this ListMultimap<HTableInterfaceReference, Mutation> into a class of some kind with named accessor methods so that code dealing with it can be at a higher level of abstraction / more self-documenting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a jira for this? @gjacoby126 If not, I ll file one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no JIRA that I know of, @swaroopak. Thanks!

Copy link
Contributor Author

@kadirozde kadirozde Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The collection of index mutations that will be applied after the data table mutations. The empty column (i.e.,
// the verified column) will have the value true ("verified") on the put mutations
private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates;
// The collection of candidate index mutations that will be applied after the data table mutations
private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates;
private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
long dataWriteStartTime;

private long dataWriteStartTime;
private boolean rebuild;
private BatchMutateContext(int clientVersion) {
this.clientVersion = clientVersion;
}
Expand Down Expand Up @@ -512,86 +515,84 @@ public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ)
}
}

private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp,
ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
HTableInterfaceReference hTableInterfaceReference =
new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
return;
}
List<Mutation> localUpdates = new ArrayList<Mutation>();
Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
while (indexUpdatesItr.hasNext()) {
Pair<Mutation, byte[]> next = indexUpdatesItr.next();
localUpdates.add(next.getFirst());
}
if (!localUpdates.isEmpty()) {
miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
}
}

private void prepareIndexMutations(
ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp,
BatchMutateContext context,
Collection<? extends Mutation> mutations,
long now,
PhoenixIndexMetaData indexMetaData) throws Throwable {

List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();

// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}

// get the index updates for all elements in this batch
Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData);

context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData);
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
while(indexUpdatesItr.hasNext()) {
Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
localUpdates.add(next.getFirst().getFirst());
indexUpdatesItr.remove();
}
else {
// get index maintainer for this index table
IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
if (indexMaintainer == null) {
throw new DoNotRetryIOException(
"preBatchMutateWithExceptions: indexMaintainer is null " +
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
}
byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates);
context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
int updateCount = 0;
for (IndexMaintainer indexMaintainer : maintainers) {
updateCount++;
byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
HTableInterfaceReference hTableInterfaceReference =
new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
while (indexUpdatesItr.hasNext()) {
Pair<Mutation, byte[]> next = indexUpdatesItr.next();
// add the VERIFIED cell, which is the empty cell
Mutation m = next.getFirst().getFirst();
boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
if (rebuild) {
Mutation m = next.getFirst();
if (context.rebuild) {
indexUpdatesItr.remove();
if (m instanceof Put) {
long ts = getMaxTimestamp(m);
// Remove the empty column prepared by Index codec as we need to change its value
removeEmptyColumn(m, emptyCF, emptyCQ);
((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
}
context.preIndexUpdates.put(hTableInterfaceReference, m);
} else {
indexUpdatesItr.remove();
// For this mutation whether it is put or delete, set the status of the index row "unverified"
// This will be done before the data table row is updated (i.e., in the first write phase)
Put unverifiedPut = new Put(m.getRow());
unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
context.preIndexUpdates.add(new Pair <>(unverifiedPut, next.getFirst().getSecond()));
context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
if (m instanceof Put) {
// Remove the empty column prepared by Index codec as we need to change its value
removeEmptyColumn(m, emptyCF, emptyCQ);
((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
}
context.intermediatePostIndexUpdates.add(next);
}
}
}
if (!localUpdates.isEmpty()) {
miniBatchOp.addOperationsFromCP(0,
localUpdates.toArray(new Mutation[localUpdates.size()]));
}
if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
}
for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
context.preIndexUpdates.add(update.getFirst());
}
TracingUtils.addAnnotation(current, "index update count", updateCount);
}
}

Expand All @@ -616,20 +617,23 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
setBatchMutateContext(c, context);
Mutation firstMutation = miniBatchOp.getOperation(0);
ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
context.rebuild = replayWrite != null;
/*
* Exclusively lock all rows so we get a consistent read
* while determining the index updates
*/
if (replayWrite == null) {
long now;
if (!context.rebuild) {
populateRowsToLock(miniBatchOp, context);
lockRows(context);
}
long now = EnvironmentEdgeManager.currentTimeMillis();
// Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
// concurrent updates
if (replayWrite == null) {
now = EnvironmentEdgeManager.currentTimeMillis();
// Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
// concurrent updates
populatePendingRows(context);
}
else {
now = EnvironmentEdgeManager.currentTimeMillis();
}
// First group all the updates for a single row into a single update to be processed
Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
// early exit if it turns out we don't have any edits
Expand All @@ -652,9 +656,11 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
for (RowLock rowLock : context.rowLocks) {
rowLock.release();
}
// Do the index updates
// Do the first phase index updates
doPre(c, context, miniBatchOp);
if (replayWrite == null) {
context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
if (!context.rebuild) {
List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
// Acquire the locks again before letting the region proceed with data table updates
List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
for (RowLock rowLock : context.rowLocks) {
Expand All @@ -664,29 +670,26 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
context.rowLocks.clear();
context.rowLocks = rowLocks;
// Check if we need to skip post index update for any of the row
Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator();
while (iterator.hasNext()) {
// Check if this row is going through another mutation which has a newer timestamp. If so,
// ignore the pending updates for this row
Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
PendingRow pendingRow = pendingRows.get(rowKey);
// Are there concurrent updates on the data table row? if so, skip post index updates
// and let read repair resolve conflicts
if (pendingRow.isConcurrent()) {
iterator.remove();
for (IndexMaintainer indexMaintainer : maintainers) {
HTableInterfaceReference hTableInterfaceReference =
new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
Iterator<Pair<Mutation, byte[]>> iterator =
context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
while (iterator.hasNext()) {
// Are there concurrent updates on the data table row? if so, skip post index updates
// and let read repair resolve conflicts
Pair<Mutation, byte[]> update = iterator.next();
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
PendingRow pendingRow = pendingRows.get(rowKey);
if (!pendingRow.isConcurrent()) {
context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst());
}
}
}
// We are done with handling concurrent mutations. So we can remove the rows of this batch from
// the collection of pending rows
removePendingRows(context);
}
if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) {
context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size());
}
for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) {
context.postIndexUpdates.add(update.getFirst());
}
if (failDataTableUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the data table write failure");
}
Expand Down Expand Up @@ -764,7 +767,7 @@ private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutate

private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post)
throws IOException {
Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
//short circuit, if we don't need to do any work

if (context == null || indexUpdates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.List;

import com.google.common.collect.ListMultimap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Stoppable;
Expand All @@ -34,6 +35,8 @@
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,28 +82,20 @@ public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> min
return this.delegate.getIndexMetaData(miniBatchOp);
}

public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
public void getIndexUpdates(ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates,
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations,
IndexMetaData indexMetaData) throws Throwable {
// notify the delegate that we have started processing a batch
this.delegate.batchStarted(miniBatchOp, indexMetaData);

// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size());
for (Mutation m : mutations) {
Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
for (Pair<Mutation, byte[]> update : updates) {
update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
}
}
for (Pair<Mutation, byte[]> update : updates) {
results.add(new Pair<>(update, m.getRow()));
indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wouldn't it be more efficient to reuse HTableInterfaceReference rather than create N of them when we're likely mutating only a small number of tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not easy to optimize this as for each data table mutation we get updates for all index tables. If we do not create a separate reference for each update, then we need to maintain a hash or list of references and then need to look up on or search them. This may not be more efficient. Let me know if you had something else to suggest here.

}
}
return results;
}

public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
Expand Down