diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 256f9a78624f..e6718c068296 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -79,6 +80,8 @@ class ReplicationSourceWALReader extends Thread { private long totalBufferQuota; private final String walGroupId; + AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false); + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -130,8 +133,11 @@ public void run() { while (isReaderRunning()) { // loop here to keep reusing stream while we can batch = null; if (!source.isPeerEnabled()) { + waitingPeerEnabled.set(true); Threads.sleep(sleepForRetries); continue; + } else { + waitingPeerEnabled.set(false); } if (!checkQuota()) { continue; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index aa059aa30a27..cbaf3c6f6e96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.OptionalLong; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; @@ -386,7 +387,10 @@ private void resetReader() throws IOException { if (archivedLog != null) { openReader(archivedLog); } else { - throw fnfe; + // For now, this could happen only when reading meta wal for meta replicas. + // In this case, raising UncheckedIOException will let the endpoint deal with resetting + // the replication source. See HBASE-27871. + throw new UncheckedIOException(fnfe); } } catch (NullPointerException npe) { throw new IOException("NPE resetting reader, likely HDFS-4380", npe); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java index 7ca651dbe770..7196c2827ca4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; @@ -225,6 +227,50 @@ public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Excepti } } + @Test + public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception { + TableName tableName = TableName.valueOf("hbase:meta"); + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(tableName)) { + MiniHBaseCluster cluster = HTU.getHBaseCluster(); + cluster.getMaster().balanceSwitch(false); + HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta()); + ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService() + .getReplicationManager().catalogReplicationSource.get(); + ((ReplicationPeerImpl) source.replicationPeer).setPeerState(false); + // there's small chance source reader has passed the peer state check but not yet read the + // wal, which could allow it to read some added entries before the wal gets deleted, + // so we are making sure here we only proceed once the reader loop has managed to + // detect the peer is disabled. + HTU.waitFor(2000, 100, true, () -> { + MutableObject readerWaiting = new MutableObject<>(true); + source.logQueue.getQueues().keySet() + .forEach(w -> readerWaiting.setValue(readerWaiting.getValue() + && source.workerThreads.get(w).entryReader.waitingPeerEnabled.get())); + return readerWaiting.getValue(); + }); + // load the data to the table + for (int i = 0; i < 5; i++) { + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); + LOG.info("flushing table"); + HTU.flush(tableName); + LOG.info("compacting table"); + if (i < 4) { + HTU.compact(tableName, false); + } + } + HTU.getHBaseCluster().getMaster().getLogCleaner().runCleaner(); + ((ReplicationPeerImpl) source.replicationPeer).setPeerState(true); + // now loads more data without flushing nor compacting + for (int i = 5; i < 10; i++) { + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); + } + verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY); + } + } + @Test public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception { MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();