Skip to content

Commit

Permalink
HBASE-21486 The current replication implementation for peer in STANDB…
Browse files Browse the repository at this point in the history
…Y state breaks serial replication
  • Loading branch information
Apache9 committed Dec 1, 2018
1 parent dfeab9f commit 766aa1b
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 136 deletions.
19 changes: 10 additions & 9 deletions hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
Expand Up @@ -398,15 +398,16 @@ enum PeerSyncReplicationStateTransitionState {
PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1; PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1;
SET_PEER_NEW_SYNC_REPLICATION_STATE = 2; SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3; REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
REPLAY_REMOTE_WAL_IN_PEER = 4; REOPEN_ALL_REGIONS_IN_PEER = 4;
REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5; SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER = 5;
REOPEN_ALL_REGIONS_IN_PEER = 6; REPLAY_REMOTE_WAL_IN_PEER = 6;
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7; REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 7;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8; TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 8;
SYNC_REPLICATION_SET_PEER_ENABLED = 9; REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 9;
SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10; SYNC_REPLICATION_SET_PEER_ENABLED = 10;
CREATE_DIR_FOR_REMOTE_WAL = 11; SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 11;
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12; CREATE_DIR_FOR_REMOTE_WAL = 12;
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 13;
} }


message PeerModificationStateData { message PeerModificationStateData {
Expand Down
Expand Up @@ -17,20 +17,43 @@
*/ */
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;


import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;


/** /**
* The base class for all replication peer related procedure. * The base class for all replication peer related procedure.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class AbstractPeerProcedure<TState> public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState>
extends AbstractPeerNoLockProcedure<TState> implements PeerProcedureInterface { implements PeerProcedureInterface {

private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class);

protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;

// The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000;


// used to keep compatible with old client where we can only returns after updateStorage. // used to keep compatible with old client where we can only returns after updateStorage.
protected ProcedurePrepareLatch latch; protected ProcedurePrepareLatch latch;
Expand Down Expand Up @@ -75,4 +98,74 @@ protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type)
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId); env.getReplicationPeerManager().enablePeer(peerId);
} }

private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
lastSeqIds.put(encodedRegionName, barrier);
if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
lastSeqIds.clear();
}
}
}

protected final void setLastPushedSequenceId(MasterProcedureEnv env,
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!ReplicationUtils.contains(peerConfig, tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
if (!lastSeqIds.isEmpty()) {
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
}
}

// If the table is currently disabling, then we need to wait until it is disabled.We will write
// replication barrier for a disabled table. And return whether we need to update the last pushed
// sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
// then we do not need to update last pushed sequence id for this table.
private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
throws IOException {
for (;;) {
try {
if (!tsm.getTableState(tn).isDisabling()) {
return true;
}
Thread.sleep(SLEEP_INTERVAL_MS);
} catch (TableStateNotFoundException e) {
return false;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
}
}
}

// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
// should not forget to check whether the map is empty at last, if not you should call
// queueStorage.setLastSequenceIds to write out the remaining entries in the map.
protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
if (!needSetLastPushedSequenceId(tsm, tableName)) {
LOG.debug("Skip settting last pushed sequence id for {}", tableName);
return;
}
for (Pair<String, Long> name2Barrier : MetaTableAccessor
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
queueStorage);
}
}
} }
Expand Up @@ -19,11 +19,7 @@


import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager;
Expand All @@ -35,9 +31,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -55,11 +49,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi


private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class); private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);


protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;

// The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000;

protected ModifyPeerProcedure() { protected ModifyPeerProcedure() {
} }


Expand Down Expand Up @@ -169,76 +158,6 @@ protected void reopenRegions(MasterProcedureEnv env) throws IOException {
} }
} }


private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
lastSeqIds.put(encodedRegionName, barrier);
if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
lastSeqIds.clear();
}
}
}

protected final void setLastPushedSequenceId(MasterProcedureEnv env,
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!ReplicationUtils.contains(peerConfig, tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
if (!lastSeqIds.isEmpty()) {
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
}
}

// If the table is currently disabling, then we need to wait until it is disabled.We will write
// replication barrier for a disabled table. And return whether we need to update the last pushed
// sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
// then we do not need to update last pushed sequence id for this table.
private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
throws IOException {
for (;;) {
try {
if (!tsm.getTableState(tn).isDisabling()) {
return true;
}
Thread.sleep(SLEEP_INTERVAL_MS);
} catch (TableStateNotFoundException e) {
return false;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
}
}
}

// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
// should not forget to check whether the map is empty at last, if not you should call
// queueStorage.setLastSequenceIds to write out the remaining entries in the map.
protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
if (!needSetLastPushedSequenceId(tsm, tableName)) {
LOG.debug("Skip settting last pushed sequence id for {}", tableName);
return;
}
for (Pair<String, Long> name2Barrier : MetaTableAccessor
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
queueStorage);
}
}

@Override @Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, InterruptedException { throws ProcedureSuspendedException, InterruptedException {
Expand Down

0 comments on commit 766aa1b

Please sign in to comment.