Skip to content
Permalink
Browse files
HBASE-26960 Another case for unnecessary replication suspending in Re… (
  • Loading branch information
comnetwork committed Apr 30, 2022
1 parent b0c2832 commit f5a566ea1b390e886fa7c85044791e5b89a69300
Showing 4 changed files with 357 additions and 9 deletions.
@@ -2794,7 +2794,8 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
long flushOpSeqId = writeEntry.getWriteNumber();
FlushResultImpl flushResult =
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId,
"Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
"Nothing to flush",
writeCanNotFlushMarkerToWAL(writeEntry, wal, writeFlushWalMarker));
mvcc.completeAndWait(writeEntry);
// Set to null so we don't complete it again down in finally block.
writeEntry = null;
@@ -2975,17 +2976,33 @@ private boolean isAllFamilies(Collection<HStore> families) {
}

/**
* Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
* reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
* This method is only used when we flush but the memstore is empty,if writeFlushWalMarker is
* true,we write the {@link FlushAction#CANNOT_FLUSH} flush marker to WAL when the memstore is
* empty. Ignores exceptions from WAL. Returns whether the write succeeded.
* @return whether WAL write was successful
*/
private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
private boolean writeCanNotFlushMarkerToWAL(WriteEntry flushOpSeqIdMVCCEntry, WAL wal,
boolean writeFlushWalMarker) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, getRegionInfo(),
-1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
RegionReplicationSink sink = regionReplicationSink.orElse(null);

if (sink != null && !writeFlushWalMarker) {
/**
* Here for replication to secondary region replica could use {@link FlushAction#CANNOT_FLUSH}
* to recover writeFlushWalMarker is false, we create {@link WALEdit} for
* {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
* flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
*/
this.attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry,
desc, sink);
return false;
}

if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
try {
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
regionReplicationSink.orElse(null));
sink);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : " +
@@ -2995,6 +3012,24 @@ private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarke
return false;
}

/**
* Create {@link WALEdit} for {@link FlushDescriptor} and attach {@link RegionReplicationSink#add}
* to the flushOpSeqIdMVCCEntry.
*/
private void attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
FlushDescriptor desc, RegionReplicationSink sink) {
assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent();
WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc);
WALKeyImpl walKey =
WALUtil.createWALKey(getRegionInfo(), mvcc, this.getReplicationScope(), null);
walKey.setWriteEntry(flushOpSeqIdMVCCEntry);
/**
* Here the {@link ServerCall} is null for {@link RegionReplicationSink#add} because the
* flushMarkerWALEdit is created by ourselves, not from rpc.
*/
flushOpSeqIdMVCCEntry.attachCompletionAction(() -> sink.add(walKey, flushMarkerWALEdit, null));
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Intentional; notify is about completed flush")
FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,
@@ -303,6 +303,10 @@ private void runCompletionAction() {
completionAction.ifPresent(Runnable::run);
}

public Optional<Runnable> getCompletionAction() {
return completionAction;
}

public long getWriteNumber() {
return this.writeNumber;
}
@@ -158,8 +158,7 @@ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
final boolean sync, final RegionReplicationSink sink) throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
WALKeyImpl walKey = createWALKey(hri, mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.appendMarker(hri, walKey, edit);
@@ -182,6 +181,13 @@ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
return walKey;
}

public static WALKeyImpl createWALKey(final RegionInfo hri, MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScope,
final Map<String, byte[]> extendedAttributes) {
return new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
}

/**
* Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in
* Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in

0 comments on commit f5a566e

Please sign in to comment.