diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index ad4df610e1f8..7690c9625506 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -42,7 +42,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * The base class for all replication peer related procedure except sync replication state @@ -58,6 +61,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure lastSeqIds, String encodedRegionName, long barrier, ReplicationQueueStorage queueStorage) throws ReplicationException { if (barrier >= 0) { @@ -235,9 +248,24 @@ protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, Tab } } + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException { + attemps++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + @Override protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + throws ProcedureSuspendedException { switch (state) { case PRE_PEER_MODIFICATION: try { @@ -249,20 +277,24 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st releaseLatch(); return Flow.NO_MORE_STATE; } catch (ReplicationException e) { - LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(), - peerId, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attemps); + LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e); + throw suspend(backoff); } + attemps = 0; setNextState(PeerModificationState.UPDATE_PEER_STORAGE); return Flow.HAS_MORE_STATE; case UPDATE_PEER_STORAGE: try { updatePeerStorage(env); } catch (ReplicationException e) { - LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId, - e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attemps); + LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(), + peerId, backoff / 1000, e); + throw suspend(backoff); } + attemps = 0; setNextState(PeerModificationState.REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; case REFRESH_PEER_ON_RS: @@ -273,30 +305,37 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st try { reopenRegions(env); } catch (Exception e) { - LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attemps); + LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(), + peerId, backoff / 1000, e); + throw suspend(backoff); } + attemps = 0; setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); return Flow.HAS_MORE_STATE; case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: try { updateLastPushedSequenceIdForSerialPeer(env); } catch (Exception e) { - LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(), - peerId, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attemps); + LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e); + throw suspend(backoff); } + attemps = 0; setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED : PeerModificationState.POST_PEER_MODIFICATION); return Flow.HAS_MORE_STATE; case SERIAL_PEER_SET_PEER_ENABLED: try { - env.getReplicationPeerManager().enablePeer(peerId); + enablePeer(env); } catch (ReplicationException e) { - LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(), - peerId, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attemps); + LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e); + throw suspend(backoff); } + attemps = 0; setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: @@ -307,9 +346,10 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st try { postPeerModification(env); } catch (ReplicationException e) { - LOG.warn("{} failed to call postPeerModification for peer {}, retry", - getClass().getName(), peerId, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attemps); + LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e); + throw suspend(backoff); } catch (IOException e) { LOG.warn("{} failed to call post CP hook for peer {}, " + "ignore since the procedure has already done", getClass().getName(), peerId, e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java new file mode 100644 index 000000000000..ff23d85616bc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.gson.JsonArray; +import org.apache.hbase.thirdparty.com.google.gson.JsonElement; +import org.apache.hbase.thirdparty.com.google.gson.JsonObject; +import org.apache.hbase.thirdparty.com.google.gson.JsonParser; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +public final class ProcedureTestUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestUtil.class); + + private ProcedureTestUtil() { + } + + private static Optional getProcedure(HBaseTestingUtility util, + Class> clazz, JsonParser parser) throws IOException { + JsonArray array = parser.parse(util.getAdmin().getProcedures()).getAsJsonArray(); + Iterator iterator = array.iterator(); + while (iterator.hasNext()) { + JsonElement element = iterator.next(); + JsonObject obj = element.getAsJsonObject(); + String className = obj.get("className").getAsString(); + if (className.equals(clazz.getName())) { + return Optional.of(obj); + } + } + return Optional.empty(); + } + + public static void waitUntilProcedureWaitingTimeout(HBaseTestingUtility util, + Class> clazz, long timeout) throws IOException { + JsonParser parser = new JsonParser(); + util.waitFor(timeout, + () -> getProcedure(util, clazz, parser) + .filter(o -> ProcedureState.WAITING_TIMEOUT.name().equals(o.get("state").getAsString())) + .isPresent()); + } + + public static void waitUntilProcedureTimeoutIncrease(HBaseTestingUtility util, + Class> clazz, int times) throws IOException, InterruptedException { + JsonParser parser = new JsonParser(); + long oldTimeout = 0; + int timeoutIncrements = 0; + for (;;) { + long timeout = getProcedure(util, clazz, parser).filter(o -> o.has("timeout")) + .map(o -> o.get("timeout").getAsLong()).orElse(-1L); + if (timeout > oldTimeout) { + LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout, + timeoutIncrements); + oldTimeout = timeout; + timeoutIncrements++; + if (timeoutIncrements > times) { + break; + } + } + Thread.sleep(1000); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java index 3573bd661e98..d34bfbb4f2ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; -import java.util.Iterator; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ProcedureTestUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; @@ -45,13 +45,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.gson.JsonArray; -import org.apache.hbase.thirdparty.com.google.gson.JsonElement; -import org.apache.hbase.thirdparty.com.google.gson.JsonObject; -import org.apache.hbase.thirdparty.com.google.gson.JsonParser; /** * Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the @@ -64,8 +57,6 @@ public class TestCloseRegionWhileRSCrash { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class); - private static final Logger LOG = LoggerFactory.getLogger(TestCloseRegionWhileRSCrash.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static TableName TABLE_NAME = TableName.valueOf("Backoff"); @@ -189,25 +180,11 @@ public void testRetryBackoff() throws IOException, InterruptedException { } }); t.start(); - JsonParser parser = new JsonParser(); - long oldTimeout = 0; - int timeoutIncrements = 0; // wait until we enter the WAITING_TIMEOUT state - UTIL.waitFor(30000, () -> getTimeout(parser, UTIL.getAdmin().getProcedures()) > 0); - while (true) { - long timeout = getTimeout(parser, UTIL.getAdmin().getProcedures()); - if (timeout > oldTimeout) { - LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout, - timeoutIncrements); - oldTimeout = timeout; - timeoutIncrements++; - if (timeoutIncrements > 3) { - // If we incremented at least twice, break; the backoff is working. - break; - } - } - Thread.sleep(1000); - } + ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TransitRegionStateProcedure.class, + 30000); + // wait until the timeout value increase three times + ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3); // let's close the connection to make sure that the SCP can not update meta successfully UTIL.getMiniHBaseCluster().getMaster().getConnection().close(); RESUME.countDown(); @@ -223,24 +200,4 @@ public void testRetryBackoff() throws IOException, InterruptedException { table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1))); } } - - /** - * @param proceduresAsJSON This is String returned by admin.getProcedures call... an array of - * Procedures as JSON. - * @return The Procedure timeout value parsed from the TRSP. - */ - private long getTimeout(JsonParser parser, String proceduresAsJSON) { - JsonArray array = parser.parse(proceduresAsJSON).getAsJsonArray(); - Iterator iterator = array.iterator(); - while (iterator.hasNext()) { - JsonElement element = iterator.next(); - JsonObject obj = element.getAsJsonObject(); - String className = obj.get("className").getAsString(); - String actualClassName = TransitRegionStateProcedure.class.getName(); - if (className.equals(actualClassName) && obj.has("timeout")) { - return obj.get("timeout").getAsLong(); - } - } - return -1L; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java new file mode 100644 index 000000000000..7566d28e8104 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ProcedureTestUtil; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestModifyPeerProcedureRetryBackoff { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestModifyPeerProcedureRetryBackoff.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static boolean FAIL = true; + + public static class TestModifyPeerProcedure extends ModifyPeerProcedure { + + public TestModifyPeerProcedure() { + } + + public TestModifyPeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ADD; + } + + private void tryFail() throws ReplicationException { + synchronized (TestModifyPeerProcedureRetryBackoff.class) { + if (FAIL) { + throw new ReplicationException("Inject error"); + } + FAIL = true; + } + } + + @Override + protected > void addChildProcedure( + @SuppressWarnings("unchecked") T... subProcedure) { + // Make it a no-op + } + + @Override + protected PeerModificationState nextStateAfterRefresh() { + return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; + } + + @Override + protected boolean enablePeerBeforeFinish() { + return true; + } + + @Override + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + tryFail(); + } + + @Override + protected void reopenRegions(MasterProcedureEnv env) throws IOException { + try { + tryFail(); + } catch (ReplicationException e) { + throw new IOException(e); + } + } + + @Override + protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { + tryFail(); + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + tryFail(); + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + tryFail(); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + tryFail(); + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void assertBackoffIncrease() throws IOException, InterruptedException { + ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TestModifyPeerProcedure.class, 30000); + ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TestModifyPeerProcedure.class, 2); + synchronized (TestModifyPeerProcedureRetryBackoff.class) { + FAIL = false; + } + UTIL.waitFor(30000, () -> FAIL); + } + + @Test + public void test() throws IOException, InterruptedException { + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + long procId = procExec.submitProcedure(new TestModifyPeerProcedure("1")); + // PRE_PEER_MODIFICATION + assertBackoffIncrease(); + // UPDATE_PEER_STORAGE + assertBackoffIncrease(); + // No retry for REFRESH_PEER_ON_RS + // SERIAL_PEER_REOPEN_REGIONS + assertBackoffIncrease(); + // SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID + assertBackoffIncrease(); + // SERIAL_PEER_SET_PEER_ENABLED + assertBackoffIncrease(); + // No retry for SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS + // POST_PEER_MODIFICATION + assertBackoffIncrease(); + UTIL.waitFor(30000, () -> procExec.isFinished(procId)); + } +}