Skip to content

Commit

Permalink
HBASE-26993 Make the new framework for region replication could work …
Browse files Browse the repository at this point in the history
…for SKIP_WAL (#4392)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
comnetwork committed Jun 3, 2022
1 parent c0e8243 commit d57159f
Show file tree
Hide file tree
Showing 6 changed files with 502 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2891,7 +2891,7 @@ private boolean writeCanNotFlushMarkerToWAL(WriteEntry flushOpSeqIdMVCCEntry, WA
* {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
* flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
*/
this.attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
this.attachRegionReplicationToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
return false;
}

Expand All @@ -2912,7 +2912,7 @@ private boolean writeCanNotFlushMarkerToWAL(WriteEntry flushOpSeqIdMVCCEntry, WA
* Create {@link WALEdit} for {@link FlushDescriptor} and attach {@link RegionReplicationSink#add}
* to the flushOpSeqIdMVCCEntry.
*/
private void attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
private void attachRegionReplicationToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
FlushDescriptor desc, RegionReplicationSink sink) {
assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent();
WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc);
Expand Down Expand Up @@ -3372,8 +3372,8 @@ public abstract void prepareMiniBatchOperations(
* Write mini-batch operations to MemStore
*/
public abstract WriteEntry writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
throws IOException;
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
long now) throws IOException;

protected void writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
Expand Down Expand Up @@ -3592,6 +3592,10 @@ protected MiniBatchOperationInProgress<Mutation> createMiniBatch(final int lastI
walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);
}

protected WALEdit createWALEdit(final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
return new WALEdit(miniBatchOp.getCellCount(), isInReplay());
}

/**
* Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are
* present, they are merged to result WALEdit.
Expand All @@ -3609,6 +3613,12 @@ public boolean visit(int index) throws IOException {
// we use durability of the original mutation for the mutation passed by CP.
if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
region.recordMutationWithoutWal(m.getFamilyCellMap());
/**
* Here is for HBASE-26993,in order to make the new framework for region replication
* could work for SKIP_WAL, we save the {@link Mutation} which
* {@link Mutation#getDurability} is {@link Durability#SKIP_WAL} in miniBatchOp.
*/
cacheSkipWALMutationForRegionReplication(miniBatchOp, walEdits, familyCellMaps[index]);
return true;
}

Expand All @@ -3622,27 +3632,38 @@ public boolean visit(int index) throws IOException {
|| curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup
|| curWALEditForNonce.getFirst().getNonce() != nonce
) {
curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),
new WALEdit(miniBatchOp.getCellCount(), isInReplay()));
curWALEditForNonce =
new Pair<>(new NonceKey(nonceGroup, nonce), createWALEdit(miniBatchOp));
walEdits.add(curWALEditForNonce);
}
WALEdit walEdit = curWALEditForNonce.getSecond();

// Add WAL edits from CPs.
WALEdit fromCP = walEditsFromCoprocessors[index];
if (fromCP != null) {
for (Cell cell : fromCP.getCells()) {
walEdit.add(cell);
}
}
walEdit.add(familyCellMaps[index]);

List<Cell> cellsFromCP = fromCP == null ? Collections.emptyList() : fromCP.getCells();
addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMaps[index]);
return true;
}
});
return walEdits;
}

protected void addNonSkipWALMutationsToWALEdit(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {
doAddCellsToWALEdit(walEdit, cellsFromCP, familyCellMap);
}

protected static void doAddCellsToWALEdit(WALEdit walEdit, List<Cell> cellsFromCP,
Map<byte[], List<Cell>> familyCellMap) {
walEdit.add(cellsFromCP);
walEdit.add(familyCellMap);
}

protected abstract void cacheSkipWALMutationForRegionReplication(
final MiniBatchOperationInProgress<Mutation> miniBatchOp,
List<Pair<NonceKey, WALEdit>> walEdits, Map<byte[], List<Cell>> familyCellMap);

/**
* This method completes mini-batch operations by calling postBatchMutate() CP hook (if
* required) and completing mvcc.
Expand Down Expand Up @@ -3717,13 +3738,15 @@ private static class MutationBatchOperation extends BatchOperation<Mutation> {
private long nonceGroup;
private long nonce;
protected boolean canProceed;
private boolean regionReplicateEnable;

public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
long nonceGroup, long nonce) {
super(region, operations);
this.atomic = atomic;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
this.regionReplicateEnable = region.regionReplicationSink.isPresent();
}

@Override
Expand Down Expand Up @@ -4140,17 +4163,115 @@ private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
return walEdits;
}

/**
* Here is for HBASE-26993,in order to make the new framework for region replication could work
* for SKIP_WAL, we save the {@link Mutation} which {@link Mutation#getDurability} is
* {@link Durability#SKIP_WAL} in miniBatchOp.
*/
@Override
protected void cacheSkipWALMutationForRegionReplication(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits, Map<byte[], List<Cell>> familyCellMap) {
if (!this.regionReplicateEnable) {
return;
}

WALEdit walEditForReplicateIfExistsSkipWAL =
miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
/**
* When there is a SKIP_WAL {@link Mutation},we create a new {@link WALEdit} for replicating
* to region replica,first we fill the existing {@link WALEdit} to it and then add the
* {@link Mutation} which is SKIP_WAL to it.
*/
if (walEditForReplicateIfExistsSkipWAL == null) {
walEditForReplicateIfExistsSkipWAL =
this.createWALEditForReplicateSkipWAL(miniBatchOp, nonceKeyAndWALEdits);
miniBatchOp.setWalEditForReplicateIfExistsSkipWAL(walEditForReplicateIfExistsSkipWAL);
}
walEditForReplicateIfExistsSkipWAL.add(familyCellMap);

}

private WALEdit createWALEditForReplicateSkipWAL(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits) {
if (nonceKeyAndWALEdits.isEmpty()) {
return this.createWALEdit(miniBatchOp);
}
// for MutationBatchOperation, more than one nonce is not allowed
assert nonceKeyAndWALEdits.size() == 1;
WALEdit currentWALEdit = nonceKeyAndWALEdits.get(0).getSecond();
return new WALEdit(currentWALEdit);
}

@Override
protected void addNonSkipWALMutationsToWALEdit(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {

super.addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMap);
WALEdit walEditForReplicateIfExistsSkipWAL =
miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
if (walEditForReplicateIfExistsSkipWAL == null) {
return;
}
/**
* When walEditForReplicateIfExistsSkipWAL is not null,it means there exists SKIP_WAL
* {@link Mutation} and we create a new {@link WALEdit} in
* {@link MutationBatchOperation#cacheSkipWALMutationForReplicateRegionReplica} for
* replicating to region replica, so here we also add non SKIP_WAL{@link Mutation}s to
* walEditForReplicateIfExistsSkipWAL.
*/
doAddCellsToWALEdit(walEditForReplicateIfExistsSkipWAL, cellsFromCP, familyCellMap);
}

@Override
public WriteEntry writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry)
throws IOException {
final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry,
long now) throws IOException {
boolean newWriteEntry = false;
if (writeEntry == null) {
writeEntry = region.mvcc.begin();
newWriteEntry = true;
}
super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber());
if (newWriteEntry) {
/**
* Here is for HBASE-26993 case 2,all {@link Mutation}s are {@link Durability#SKIP_WAL}. In
* order to make the new framework for region replication could work for SKIP_WAL,because
* there is no {@link RegionReplicationSink#add} attached in {@link HRegion#doWALAppend},so
* here we get {@link WALEdit} from
* {@link MiniBatchOperationInProgress#getWalEditForReplicateIfExistsSkipWAL} and attach
* {@link RegionReplicationSink#add} to the new mvcc writeEntry.
*/
attachRegionReplicationToMVCCEntry(miniBatchOp, writeEntry, now);
}
return writeEntry;
}

private WALKeyImpl createWALKey(long now) {
// for MutationBatchOperation,isReplay is false.
return this.region.createWALKeyForWALAppend(false, this, now, this.nonceGroup, this.nonce);
}

/**
* Create {@link WALKeyImpl} and get {@link WALEdit} from miniBatchOp and attach
* {@link RegionReplicationSink#add} to the mvccWriteEntry.
*/
private void attachRegionReplicationToMVCCEntry(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WriteEntry mvccWriteEntry, long now)
throws IOException {
if (!this.regionReplicateEnable) {
return;
}
assert !mvccWriteEntry.getCompletionAction().isPresent();

final WALKeyImpl walKey = this.createWALKey(now);
walKey.setWriteEntry(mvccWriteEntry);
region.doAttachReplicateRegionReplicaAction(walKey,
miniBatchOp.getWalEditForReplicateIfExistsSkipWAL(), mvccWriteEntry);
}

@Override
public void completeMiniBatchOperations(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
Expand Down Expand Up @@ -4466,8 +4587,8 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi

@Override
public WriteEntry writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
throws IOException {
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
long now) throws IOException {
super.writeMiniBatchOperationsToMemStore(miniBatchOp, getOrigLogSeqNum());
return writeEntry;
}
Expand All @@ -4479,6 +4600,14 @@ public void completeMiniBatchOperations(
super.completeMiniBatchOperations(miniBatchOp, writeEntry);
region.mvcc.advanceTo(getOrigLogSeqNum());
}

@Override
protected void cacheSkipWALMutationForRegionReplication(
MiniBatchOperationInProgress<Mutation> miniBatchOp, List<Pair<NonceKey, WALEdit>> walEdits,
Map<byte[], List<Cell>> familyCellMap) {
// There is no action to do if current region is secondary replica
}

}

public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
Expand Down Expand Up @@ -4647,8 +4776,7 @@ private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
NonceKey nonceKey = nonceKeyWALEditPair.getFirst();

if (walEdit != null && !walEdit.isEmpty()) {
writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
writeEntry = doWALAppend(walEdit, batchOp, miniBatchOp, now, nonceKey);
}

// Complete mvcc for all but last writeEntry (for replay case)
Expand All @@ -4660,7 +4788,7 @@ private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {

// STEP 5. Write back to memStore
// NOTE: writeEntry can be null here
writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry, now);

// STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
// complete mvcc for last writeEntry
Expand Down Expand Up @@ -7903,42 +8031,46 @@ public Result increment(Increment increment, long nonceGroup, long nonce) throws
}, () -> createRegionSpan("Region.increment"));
}

private WALKeyImpl createWALKeyForWALAppend(boolean isReplay, BatchOperation<?> batchOp, long now,
long nonceGroup, long nonce) {
WALKeyImpl walKey = isReplay
? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
batchOp.getClusterIds(), nonceGroup, nonce, mvcc)
: new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
batchOp.getClusterIds(), nonceGroup, nonce, mvcc, this.getReplicationScope());
if (isReplay) {
walKey.setOrigLogSeqNum(batchOp.getOrigLogSeqNum());
}
return walKey;
}

/**
* @return writeEntry associated with this append
*/
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
MiniBatchOperationInProgress<Mutation> miniBatchOp, long now, NonceKey nonceKey)
throws IOException {
Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), "WALEdit is null or empty!");
Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != SequenceId.NO_SEQUENCE_ID,
Preconditions.checkArgument(
!walEdit.isReplay() || batchOp.getOrigLogSeqNum() != SequenceId.NO_SEQUENCE_ID,
"Invalid replay sequence Id for replay WALEdit!");
// Using default cluster id, as this can only happen in the originating cluster.
// A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
// here instead of WALKeyImpl directly to support legacy coprocessors.
WALKeyImpl walKey = walEdit.isReplay()
? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc)
: new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc, this.getReplicationScope());
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(origLogSeqNum);
}

WALKeyImpl walKey = createWALKeyForWALAppend(walEdit.isReplay(), batchOp, now,
nonceKey.getNonceGroup(), nonceKey.getNonce());
// don't call the coproc hook for writes to the WAL caused by
// system lifecycle events like flushes or compactions
if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdit);
}
ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
try {
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
WriteEntry writeEntry = walKey.getWriteEntry();
regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
sink.add(walKey, walEdit, rpcCall);
}));
this.attachRegionReplicationInWALAppend(batchOp, miniBatchOp, walKey, walEdit, writeEntry);
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
sync(txid, batchOp.durability);
}
return writeEntry;
} catch (IOException ioe) {
Expand All @@ -7947,7 +8079,51 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
}
throw ioe;
}
}

/**
* Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
* replica.
*/
private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
MiniBatchOperationInProgress<Mutation> miniBatchOp, WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
if (!regionReplicationSink.isPresent()) {
return;
}
/**
* If {@link HRegion#regionReplicationSink} is present,only {@link MutationBatchOperation} is
* used and {@link NonceKey} is all the same for {@link Mutation}s in
* {@link MutationBatchOperation},so for HBASE-26993 case 1,if
* {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is not null and we could
* enter {@link HRegion#doWALAppend},that means partial {@link Mutation}s are
* {@link Durability#SKIP_WAL}, we use
* {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} to replicate to region
* replica,but if {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is
* null,that means there is no {@link Mutation} is {@link Durability#SKIP_WAL},so we just use
* walEdit to replicate.
*/
assert batchOp instanceof MutationBatchOperation;
WALEdit walEditToUse = miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
if (walEditToUse == null) {
walEditToUse = walEdit;
}
doAttachReplicateRegionReplicaAction(walKey, walEditToUse, writeEntry);
}

/**
* Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
* replica.
*/
private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
if (walEdit == null || walEdit.isEmpty()) {
return;
}
final ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
sink.add(walKey, walEdit, rpcCall);
}));
}

public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
Expand Down
Loading

0 comments on commit d57159f

Please sign in to comment.