diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 44ac952abc73..cc0c6ba3472d 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -398,15 +398,16 @@ enum PeerSyncReplicationStateTransitionState { PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1; SET_PEER_NEW_SYNC_REPLICATION_STATE = 2; REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3; - REPLAY_REMOTE_WAL_IN_PEER = 4; - REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5; - REOPEN_ALL_REGIONS_IN_PEER = 6; - TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7; - REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8; - SYNC_REPLICATION_SET_PEER_ENABLED = 9; - SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10; - CREATE_DIR_FOR_REMOTE_WAL = 11; - POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12; + REOPEN_ALL_REGIONS_IN_PEER = 4; + SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER = 5; + REPLAY_REMOTE_WAL_IN_PEER = 6; + REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 7; + TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 8; + REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 9; + SYNC_REPLICATION_SET_PEER_ENABLED = 10; + SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 11; + CREATE_DIR_FOR_REMOTE_WAL = 12; + POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 13; } message PeerModificationStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index 882a050dffb3..755e0a3dc580 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -17,11 +17,27 @@ */ package org.apache.hadoop.hbase.master.replication; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.TableStateManager; +import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -29,8 +45,15 @@ * The base class for all replication peer related procedure. */ @InterfaceAudience.Private -public abstract class AbstractPeerProcedure - extends AbstractPeerNoLockProcedure implements PeerProcedureInterface { +public abstract class AbstractPeerProcedure extends AbstractPeerNoLockProcedure + implements PeerProcedureInterface { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class); + + protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; + + // The sleep interval when waiting table to be enabled or disabled. + protected static final int SLEEP_INTERVAL_MS = 1000; // used to keep compatible with old client where we can only returns after updateStorage. protected ProcedurePrepareLatch latch; @@ -75,4 +98,74 @@ protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { env.getReplicationPeerManager().enablePeer(peerId); } + + private void addToMap(Map lastSeqIds, String encodedRegionName, long barrier, + ReplicationQueueStorage queueStorage) throws ReplicationException { + if (barrier >= 0) { + lastSeqIds.put(encodedRegionName, barrier); + if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + lastSeqIds.clear(); + } + } + } + + protected final void setLastPushedSequenceId(MasterProcedureEnv env, + ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { + Map lastSeqIds = new HashMap(); + for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { + if (!td.hasGlobalReplicationScope()) { + continue; + } + TableName tn = td.getTableName(); + if (!ReplicationUtils.contains(peerConfig, tn)) { + continue; + } + setLastPushedSequenceIdForTable(env, tn, lastSeqIds); + } + if (!lastSeqIds.isEmpty()) { + env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); + } + } + + // If the table is currently disabling, then we need to wait until it is disabled.We will write + // replication barrier for a disabled table. And return whether we need to update the last pushed + // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException, + // then we do not need to update last pushed sequence id for this table. + private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn) + throws IOException { + for (;;) { + try { + if (!tsm.getTableState(tn).isDisabling()) { + return true; + } + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (TableStateNotFoundException e) { + return false; + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); + } + } + } + + // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is + // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller + // should not forget to check whether the map is empty at last, if not you should call + // queueStorage.setLastSequenceIds to write out the remaining entries in the map. + protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, + Map lastSeqIds) throws IOException, ReplicationException { + TableStateManager tsm = env.getMasterServices().getTableStateManager(); + ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); + Connection conn = env.getMasterServices().getConnection(); + if (!needSetLastPushedSequenceId(tsm, tableName)) { + LOG.debug("Skip settting last pushed sequence id for {}", tableName); + return; + } + for (Pair name2Barrier : MetaTableAccessor + .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { + LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier); + addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, + queueStorage); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 9550fb0bc138..d5d27796942a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -19,11 +19,7 @@ import java.io.IOException; import java.io.InterruptedIOException; -import java.util.HashMap; -import java.util.Map; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.TableStateManager; @@ -35,9 +31,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationUtils; -import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,11 +49,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure lastSeqIds, String encodedRegionName, long barrier, - ReplicationQueueStorage queueStorage) throws ReplicationException { - if (barrier >= 0) { - lastSeqIds.put(encodedRegionName, barrier); - if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { - queueStorage.setLastSequenceIds(peerId, lastSeqIds); - lastSeqIds.clear(); - } - } - } - - protected final void setLastPushedSequenceId(MasterProcedureEnv env, - ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { - Map lastSeqIds = new HashMap(); - for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { - if (!td.hasGlobalReplicationScope()) { - continue; - } - TableName tn = td.getTableName(); - if (!ReplicationUtils.contains(peerConfig, tn)) { - continue; - } - setLastPushedSequenceIdForTable(env, tn, lastSeqIds); - } - if (!lastSeqIds.isEmpty()) { - env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); - } - } - - // If the table is currently disabling, then we need to wait until it is disabled.We will write - // replication barrier for a disabled table. And return whether we need to update the last pushed - // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException, - // then we do not need to update last pushed sequence id for this table. - private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn) - throws IOException { - for (;;) { - try { - if (!tsm.getTableState(tn).isDisabling()) { - return true; - } - Thread.sleep(SLEEP_INTERVAL_MS); - } catch (TableStateNotFoundException e) { - return false; - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); - } - } - } - - // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is - // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller - // should not forget to check whether the map is empty at last, if not you should call - // queueStorage.setLastSequenceIds to write out the remaining entries in the map. - protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, - Map lastSeqIds) throws IOException, ReplicationException { - TableStateManager tsm = env.getMasterServices().getTableStateManager(); - ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); - Connection conn = env.getMasterServices().getConnection(); - if (!needSetLastPushedSequenceId(tsm, tableName)) { - LOG.debug("Skip settting last pushed sequence id for {}", tableName); - return; - } - for (Pair name2Barrier : MetaTableAccessor - .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { - LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier); - addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, - queueStorage); - } - } - @Override protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) throws ProcedureSuspendedException, InterruptedException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index 8c6232f48c94..fcf41bee72f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -50,7 +50,7 @@ public class TransitPeerSyncReplicationStateProcedure extends AbstractPeerProcedure { private static final Logger LOG = - LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); + LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); protected SyncReplicationState fromState; @@ -58,6 +58,8 @@ public class TransitPeerSyncReplicationStateProcedure private boolean enabled; + private boolean serial; + public TransitPeerSyncReplicationStateProcedure() { } @@ -75,8 +77,8 @@ public PeerOperationType getPeerOperationType() { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); TransitPeerSyncReplicationStateStateData.Builder builder = - TransitPeerSyncReplicationStateStateData.newBuilder() - .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); + TransitPeerSyncReplicationStateStateData.newBuilder() + .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); if (fromState != null) { builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState)); } @@ -87,7 +89,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { super.deserializeStateData(serializer); TransitPeerSyncReplicationStateStateData data = - serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); + serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState()); if (data.hasFromState()) { fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState()); @@ -129,6 +131,7 @@ protected void preTransit(MasterProcedureEnv env) throws IOException { } fromState = desc.getSyncReplicationState(); enabled = desc.isEnabled(); + serial = desc.getPeerConfig().isSerial(); } private void postTransit(MasterProcedureEnv env) throws IOException { @@ -174,7 +177,11 @@ private void setNextStateAfterRefreshBegin() { : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); } else { assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); - setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); + // for serial peer, we need to reopen all the regions and then update the last pushed sequence + // id, before replaying any remote wals, so that the serial replication will not be stuck, and + // also guarantee the order when replicating the remote wal back. + setNextState(serial ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER + : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); } } @@ -183,6 +190,11 @@ private void setNextStateAfterRefreshEnd() { setNextState( enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); + } else if (fromState == SyncReplicationState.STANDBY) { + assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); + setNextState(serial && enabled + ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED + : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); } else { setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); @@ -196,14 +208,20 @@ private void replayRemoteWAL(boolean serial) { @VisibleForTesting protected void setPeerNewSyncReplicationState(MasterProcedureEnv env) throws ReplicationException { - env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); - if (toState.equals(SyncReplicationState.STANDBY) && enabled) { - // disable the peer if we are going to transit to STANDBY state, as we need to remove + if (toState.equals(SyncReplicationState.STANDBY) || + (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled) { + // Disable the peer if we are going to transit to STANDBY state, as we need to remove // all the pending replication files. If we do not disable the peer and delete the wal // queues on zk directly, RS will get NoNode exception when updating the wal position // and crash. + // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the + // replication is serial, as we need to update the lastPushedSequence id after we reopen all + // the regions, and for performance reason here we will update in batch, without using CAS, if + // we are still replicating at RS side, we may accidentally update the last pushed sequence id + // to a less value and cause the replication to be stuck. env.getReplicationPeerManager().disablePeer(peerId); } + env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); } @VisibleForTesting @@ -240,7 +258,7 @@ protected Flow executeFromState(MasterProcedureEnv env, long backoff = ProcedureUtil.getBackoffTimeMs(attempts); LOG.warn( "Failed to update peer storage for peer {} when starting transiting sync " + - "replication peer state from {} to {}, sleep {} secs and retry", + "replication peer state from {} to {}, sleep {} secs and retry", peerId, fromState, toState, backoff / 1000, e); throw suspend(backoff); } @@ -254,6 +272,30 @@ protected Flow executeFromState(MasterProcedureEnv env, .toArray(RefreshPeerProcedure[]::new)); setNextStateAfterRefreshBegin(); return Flow.HAS_MORE_STATE; + case REOPEN_ALL_REGIONS_IN_PEER: + reopenRegions(env); + if (fromState.equals(SyncReplicationState.STANDBY)) { + assert serial; + setNextState( + PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER); + } else { + setNextState( + PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); + } + return Flow.HAS_MORE_STATE; + case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER: + try { + setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get()); + } catch (Exception e) { + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to update last pushed sequence id for peer {} when transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); + } + setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); + return Flow.HAS_MORE_STATE; case REPLAY_REMOTE_WAL_IN_PEER: replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial()); setNextState( @@ -266,7 +308,7 @@ protected Flow executeFromState(MasterProcedureEnv env, long backoff = ProcedureUtil.getBackoffTimeMs(attempts); LOG.warn( "Failed to remove all replication queues peer {} when starting transiting" + - " sync replication peer state from {} to {}, sleep {} secs and retry", + " sync replication peer state from {} to {}, sleep {} secs and retry", peerId, fromState, toState, backoff / 1000, e); throw suspend(backoff); } @@ -275,11 +317,6 @@ protected Flow executeFromState(MasterProcedureEnv env, ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); return Flow.HAS_MORE_STATE; - case REOPEN_ALL_REGIONS_IN_PEER: - reopenRegions(env); - setNextState( - PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); - return Flow.HAS_MORE_STATE; case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE: try { transitPeerSyncReplicationState(env); @@ -287,7 +324,7 @@ protected Flow executeFromState(MasterProcedureEnv env, long backoff = ProcedureUtil.getBackoffTimeMs(attempts); LOG.warn( "Failed to update peer storage for peer {} when ending transiting sync " + - "replication peer state from {} to {}, sleep {} secs and retry", + "replication peer state from {} to {}, sleep {} secs and retry", peerId, fromState, toState, backoff / 1000, e); throw suspend(backoff); } @@ -308,7 +345,7 @@ protected Flow executeFromState(MasterProcedureEnv env, long backoff = ProcedureUtil.getBackoffTimeMs(attempts); LOG.warn( "Failed to set peer enabled for peer {} when transiting sync replication peer " + - "state from {} to {}, sleep {} secs and retry", + "state from {} to {}, sleep {} secs and retry", peerId, fromState, toState, backoff / 1000, e); throw suspend(backoff); } @@ -327,7 +364,7 @@ protected Flow executeFromState(MasterProcedureEnv env, long backoff = ProcedureUtil.getBackoffTimeMs(attempts); LOG.warn( "Failed to create remote wal dir for peer {} when transiting sync replication " + - "peer state from {} to {}, sleep {} secs and retry", + "peer state from {} to {}, sleep {} secs and retry", peerId, fromState, toState, backoff / 1000, e); throw suspend(backoff); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index 1b523540667a..f373590e6b22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -103,8 +103,8 @@ public static void setUp() throws Exception { ZK_UTIL.startMiniZKCluster(); initTestingUtility(UTIL1, "/cluster1"); initTestingUtility(UTIL2, "/cluster2"); - StartMiniClusterOption option = StartMiniClusterOption.builder() - .numMasters(2).numRegionServers(3).numDataNodes(3).build(); + StartMiniClusterOption option = + StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build(); UTIL1.startMiniCluster(option); UTIL2.startMiniCluster(option); TableDescriptor td = @@ -217,16 +217,16 @@ protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) { return getRemoteWALDir(remoteWALDir, peerId); } - protected Path getRemoteWALDir(Path remoteWALDir, String peerId) { + protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) { return new Path(remoteWALDir, peerId); } - protected Path getReplayRemoteWALs(Path remoteWALDir, String peerId) { + protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) { return new Path(remoteWALDir, peerId + "-replay"); } - protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility) - throws Exception { + protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, + HBaseTestingUtility utility) throws Exception { ReplicationPeerStorage rps = ReplicationStorageFactory .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration()); try { @@ -247,7 +247,7 @@ protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingU } } - protected void verifyReplicationRequestRejection(HBaseTestingUtility utility, + protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility, boolean expectedRejection) throws Exception { HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); ClusterConnection connection = regionServer.getClusterConnection(); @@ -270,4 +270,20 @@ protected void verifyReplicationRequestRejection(HBaseTestingUtility utility, } } } + + protected final void waitUntilDeleted(HBaseTestingUtility util, Path remoteWAL) throws Exception { + MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem(); + util.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return !mfs.getWALFileSystem().exists(remoteWAL); + } + + @Override + public String explainFailure() throws Exception { + return remoteWAL + " has not been deleted yet"; + } + }); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java new file mode 100644 index 000000000000..672564965e67 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.endsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.LogRoller; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; + +/** + * Testcase to confirm that serial replication will not be stuck when using along with synchronous + * replication. See HBASE-21486 for more details. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSerialSyncReplication extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSerialSyncReplication.class); + + @Test + public void test() throws Exception { + // change to serial + UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig + .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build()); + UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig + .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build()); + + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + UTIL2.getAdmin().disableReplicationPeer(PEER_ID); + + writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); + + MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir( + new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID); + FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); + assertEquals(1, remoteWALStatus.length); + Path remoteWAL = remoteWALStatus[0].getPath(); + assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)); + // roll the wal writer, so that we will delete the remore wal. This is used to make sure that we + // will not replay this wal when transiting to DA. + for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) { + LogRoller roller = t.getRegionServer().getWalRoller(); + roller.requestRollAll(); + roller.waitUntilWalRollFinished(); + } + waitUntilDeleted(UTIL2, remoteWAL); + + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + // let's reopen the region + RegionInfo region = Iterables.getOnlyElement(UTIL2.getAdmin().getRegions(TABLE_NAME)); + HRegionServer target = UTIL2.getOtherRegionServer(UTIL2.getRSForFirstRegionInTable(TABLE_NAME)); + UTIL2.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(target.getServerName().getServerName())); + // here we will remove all the pending wals. This is not a normal operation sequence but anyway, + // user could do this. + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + // transit back to DA + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + + UTIL2.getAdmin().enableReplicationPeer(PEER_ID); + // make sure that the async replication still works + writeAndVerifyReplication(UTIL2, UTIL1, 100, 200); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java index 0cd18463a5b3..9f8982604537 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -41,22 +40,6 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class); - private void waitUntilDeleted(Path remoteWAL) throws Exception { - MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem(); - UTIL1.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return !mfs.getWALFileSystem().exists(remoteWAL); - } - - @Override - public String explainFailure() throws Exception { - return remoteWAL + " has not been deleted yet"; - } - }); - } - @Test public void testRemoveRemoteWAL() throws Exception { UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, @@ -76,7 +59,7 @@ public void testRemoveRemoteWAL() throws Exception { HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); rs.getWalRoller().requestRollAll(); // The replicated wal file should be deleted finally - waitUntilDeleted(remoteWAL); + waitUntilDeleted(UTIL2, remoteWAL); remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); assertEquals(1, remoteWALStatus.length); remoteWAL = remoteWALStatus[0].getPath(); @@ -95,6 +78,6 @@ public void testRemoveRemoteWAL() throws Exception { verifyThroughRegion(UTIL2, 100, 200); // Confirm that we will also remove the remote wal files in DA state - waitUntilDeleted(remoteWAL); + waitUntilDeleted(UTIL2, remoteWAL); } }