Skip to content

Commit

Permalink
HBASE-27871 Meta replication stuck forever if wal it's still reading …
Browse files Browse the repository at this point in the history
…gets rolled and deleted (#5271)

Signed-off-by: Peter Somogyi <psomogyi@apache.org>
  • Loading branch information
wchevreuil committed Jun 15, 2023
1 parent 7d7fbf3 commit 91627ce
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -79,6 +80,8 @@ class ReplicationSourceWALReader extends Thread {
private long totalBufferQuota;
private final String walGroupId;

AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false);

/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
Expand Down Expand Up @@ -130,8 +133,11 @@ public void run() {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
batch = null;
if (!source.isPeerEnabled()) {
waitingPeerEnabled.set(true);
Threads.sleep(sleepForRetries);
continue;
} else {
waitingPeerEnabled.set(false);
}
if (!checkQuota()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -386,7 +387,10 @@ private void resetReader() throws IOException {
if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;
// For now, this could happen only when reading meta wal for meta replicas.
// In this case, raising UncheckedIOException will let the endpoint deal with resetting
// the replication source. See HBASE-27871.
throw new UncheckedIOException(fnfe);
}
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -53,6 +54,7 @@
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
Expand Down Expand Up @@ -225,6 +227,50 @@ public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Excepti
}
}

@Test
public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception {
TableName tableName = TableName.valueOf("hbase:meta");
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName)) {
MiniHBaseCluster cluster = HTU.getHBaseCluster();
cluster.getMaster().balanceSwitch(false);
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService()
.getReplicationManager().catalogReplicationSource.get();
((ReplicationPeerImpl) source.replicationPeer).setPeerState(false);
// there's small chance source reader has passed the peer state check but not yet read the
// wal, which could allow it to read some added entries before the wal gets deleted,
// so we are making sure here we only proceed once the reader loop has managed to
// detect the peer is disabled.
HTU.waitFor(2000, 100, true, () -> {
MutableObject<Boolean> readerWaiting = new MutableObject<>(true);
source.logQueue.getQueues().keySet()
.forEach(w -> readerWaiting.setValue(readerWaiting.getValue()
&& source.workerThreads.get(w).entryReader.waitingPeerEnabled.get()));
return readerWaiting.getValue();
});
// load the data to the table
for (int i = 0; i < 5; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
LOG.info("flushing table");
HTU.flush(tableName);
LOG.info("compacting table");
if (i < 4) {
HTU.compact(tableName, false);
}
}
HTU.getHBaseCluster().getMaster().getLogCleaner().runCleaner();
((ReplicationPeerImpl) source.replicationPeer).setPeerState(true);
// now loads more data without flushing nor compacting
for (int i = 5; i < 10; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
}
verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY);
}
}

@Test
public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
Expand Down

0 comments on commit 91627ce

Please sign in to comment.