Skip to content

Commit

Permalink
HBASE-21248 Implement exponential backoff when retrying for ModifyPee…
Browse files Browse the repository at this point in the history
…rProcedure
  • Loading branch information
Apache9 committed Sep 29, 2018
1 parent ab6ec1f commit fdbaa4c
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 69 deletions.
Expand Up @@ -32,7 +32,7 @@
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
Expand All @@ -42,7 +42,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;

/**
* The base class for all replication peer related procedure except sync replication state
Expand All @@ -58,6 +61,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
// The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000;

private int attemps;

protected ModifyPeerProcedure() {
}

Expand Down Expand Up @@ -143,7 +148,9 @@ private boolean needReopen(TableStateManager tsm, TableName tn) throws IOExcepti
}
}

private void reopenRegions(MasterProcedureEnv env) throws IOException {
// will be override in test to simulate error
@VisibleForTesting
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
TableStateManager tsm = env.getMasterServices().getTableStateManager();
Expand All @@ -165,6 +172,12 @@ private void reopenRegions(MasterProcedureEnv env) throws IOException {
}
}

// will be override in test to simulate error
@VisibleForTesting
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId);
}

private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
Expand Down Expand Up @@ -235,9 +248,24 @@ protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, Tab
}
}

@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false;
}

private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException {
attemps++;
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}

@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
throws ProcedureSuspendedException {
switch (state) {
case PRE_PEER_MODIFICATION:
try {
Expand All @@ -249,20 +277,24 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
Expand All @@ -273,30 +305,37 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
try {
reopenRegions(env);
} catch (Exception e) {
LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(),
peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) {
LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_SET_PEER_ENABLED:
try {
env.getReplicationPeerManager().enablePeer(peerId);
enablePeer(env);
} catch (ReplicationException e) {
LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
}
attemps = 0;
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
Expand All @@ -307,9 +346,10 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
try {
postPeerModification(env);
} catch (ReplicationException e) {
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
getClass().getName(), peerId, e);
throw new ProcedureYieldException();
long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff);
} catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e);
Expand Down
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;

public final class ProcedureTestUtil {

private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestUtil.class);

private ProcedureTestUtil() {
}

private static Optional<JsonObject> getProcedure(HBaseTestingUtility util,
Class<? extends Procedure<?>> clazz, JsonParser parser) throws IOException {
JsonArray array = parser.parse(util.getAdmin().getProcedures()).getAsJsonArray();
Iterator<JsonElement> iterator = array.iterator();
while (iterator.hasNext()) {
JsonElement element = iterator.next();
JsonObject obj = element.getAsJsonObject();
String className = obj.get("className").getAsString();
if (className.equals(clazz.getName())) {
return Optional.of(obj);
}
}
return Optional.empty();
}

public static void waitUntilProcedureWaitingTimeout(HBaseTestingUtility util,
Class<? extends Procedure<?>> clazz, long timeout) throws IOException {
JsonParser parser = new JsonParser();
util.waitFor(timeout,
() -> getProcedure(util, clazz, parser)
.filter(o -> ProcedureState.WAITING_TIMEOUT.name().equals(o.get("state").getAsString()))
.isPresent());
}

public static void waitUntilProcedureTimeoutIncrease(HBaseTestingUtility util,
Class<? extends Procedure<?>> clazz, int times) throws IOException, InterruptedException {
JsonParser parser = new JsonParser();
long oldTimeout = 0;
int timeoutIncrements = 0;
for (;;) {
long timeout = getProcedure(util, clazz, parser).filter(o -> o.has("timeout"))
.map(o -> o.get("timeout").getAsLong()).orElse(-1L);
if (timeout > oldTimeout) {
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
timeoutIncrements);
oldTimeout = timeout;
timeoutIncrements++;
if (timeoutIncrements > times) {
break;
}
}
Thread.sleep(1000);
}
}
}
Expand Up @@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ProcedureTestUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
Expand All @@ -45,13 +45,6 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
import org.apache.hbase.thirdparty.com.google.gson.JsonParser;

/**
* Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the
Expand All @@ -64,8 +57,6 @@ public class TestCloseRegionWhileRSCrash {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class);

private static final Logger LOG = LoggerFactory.getLogger(TestCloseRegionWhileRSCrash.class);

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static TableName TABLE_NAME = TableName.valueOf("Backoff");
Expand Down Expand Up @@ -189,25 +180,11 @@ public void testRetryBackoff() throws IOException, InterruptedException {
}
});
t.start();
JsonParser parser = new JsonParser();
long oldTimeout = 0;
int timeoutIncrements = 0;
// wait until we enter the WAITING_TIMEOUT state
UTIL.waitFor(30000, () -> getTimeout(parser, UTIL.getAdmin().getProcedures()) > 0);
while (true) {
long timeout = getTimeout(parser, UTIL.getAdmin().getProcedures());
if (timeout > oldTimeout) {
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
timeoutIncrements);
oldTimeout = timeout;
timeoutIncrements++;
if (timeoutIncrements > 3) {
// If we incremented at least twice, break; the backoff is working.
break;
}
}
Thread.sleep(1000);
}
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TransitRegionStateProcedure.class,
30000);
// wait until the timeout value increase three times
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3);
// let's close the connection to make sure that the SCP can not update meta successfully
UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
RESUME.countDown();
Expand All @@ -223,24 +200,4 @@ public void testRetryBackoff() throws IOException, InterruptedException {
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1)));
}
}

/**
* @param proceduresAsJSON This is String returned by admin.getProcedures call... an array of
* Procedures as JSON.
* @return The Procedure timeout value parsed from the TRSP.
*/
private long getTimeout(JsonParser parser, String proceduresAsJSON) {
JsonArray array = parser.parse(proceduresAsJSON).getAsJsonArray();
Iterator<JsonElement> iterator = array.iterator();
while (iterator.hasNext()) {
JsonElement element = iterator.next();
JsonObject obj = element.getAsJsonObject();
String className = obj.get("className").getAsString();
String actualClassName = TransitRegionStateProcedure.class.getName();
if (className.equals(actualClassName) && obj.has("timeout")) {
return obj.get("timeout").getAsLong();
}
}
return -1L;
}
}

0 comments on commit fdbaa4c

Please sign in to comment.