Skip to content

Commit

Permalink
HBASE-19622 Reimplement ReplicationPeers with the new replication sto…
Browse files Browse the repository at this point in the history
…rage interface
  • Loading branch information
openinx authored and Apache9 committed Jan 9, 2018
1 parent 62a4f5b commit f89920a
Show file tree
Hide file tree
Showing 24 changed files with 304 additions and 955 deletions.
Expand Up @@ -247,22 +247,22 @@ public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF
public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
throws DeserializationException { throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) { if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic(); int pbLen = ProtobufUtil.lengthOfPBMagic();
ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.Builder builder =
ReplicationProtos.ReplicationPeer.newBuilder(); ReplicationProtos.ReplicationPeer.newBuilder();
ReplicationProtos.ReplicationPeer peer; ReplicationProtos.ReplicationPeer peer;
try { try {
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
peer = builder.build(); peer = builder.build();
} catch (IOException e) { } catch (IOException e) {
throw new DeserializationException(e); throw new DeserializationException(e);
} }
return convert(peer); return convert(peer);
} else { } else {
if (bytes.length > 0) { if (bytes == null || bytes.length <= 0) {
return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); throw new DeserializationException("Bytes to deserialize should not be empty.");
} }
return ReplicationPeerConfig.newBuilder().setClusterKey("").build(); return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
} }
} }


Expand Down
Expand Up @@ -339,15 +339,10 @@ private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
@Override public boolean isAborted() {return false;} @Override public boolean isAborted() {return false;}
}); });


ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf);
rp.init(); rp.init();


Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId));
if (pair == null) {
throw new IOException("Couldn't get peer conf!");
}

return pair;
} catch (ReplicationException e) { } catch (ReplicationException e) {
throw new IOException( throw new IOException(
"An error occurred while trying to connect to the remove peer cluster", e); "An error occurred while trying to connect to the remove peer cluster", e);
Expand Down
Expand Up @@ -31,14 +31,8 @@ public final class ReplicationFactory {
private ReplicationFactory() { private ReplicationFactory() {
} }


public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) {
Abortable abortable) { return new ReplicationPeers(zk, conf);
return getReplicationPeers(zk, conf, null, abortable);
}

public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
ReplicationQueueStorage queueStorage, Abortable abortable) {
return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable);
} }


public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,
Expand Down
Expand Up @@ -18,29 +18,16 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;


import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;


@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer { public class ReplicationPeerImpl implements ReplicationPeer {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);

private final ReplicationPeerStorage peerStorage;

private final Configuration conf; private final Configuration conf;


private final String id; private final String id;
Expand All @@ -58,21 +45,21 @@ public class ReplicationPeerImpl implements ReplicationPeer {
* @param id string representation of this peer's identifier * @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer * @param peerConfig configuration for the replication peer
*/ */
public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id, public ReplicationPeerImpl(Configuration conf, String id, boolean peerState,
ReplicationPeerConfig peerConfig) { ReplicationPeerConfig peerConfig) {
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
this.conf = conf; this.conf = conf;
this.peerConfig = peerConfig;
this.id = id; this.id = id;
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
this.peerConfig = peerConfig;
this.peerConfigListeners = new ArrayList<>(); this.peerConfigListeners = new ArrayList<>();
} }


public void refreshPeerState() throws ReplicationException { void setPeerState(boolean enabled) {
this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED; this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
} }


public void refreshPeerConfig() throws ReplicationException { void setPeerConfig(ReplicationPeerConfig peerConfig) {
this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig); this.peerConfig = peerConfig;
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
} }


Expand Down Expand Up @@ -135,36 +122,4 @@ public long getPeerBandwidth() {
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
this.peerConfigListeners.add(listener); this.peerConfigListeners.add(listener);
} }

}
/**
* Parse the raw data from ZK to get a peer's state
* @param bytes raw ZK data
* @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
* @throws DeserializationException if parsing the state fails
*/
public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
return ReplicationProtos.ReplicationState.State.ENABLED == state;
}

/**
* @param bytes Content of a state znode.
* @return State parsed from the passed bytes.
* @throws DeserializationException if a ProtoBuf operation fails
*/
private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pbLen = ProtobufUtil.lengthOfPBMagic();
ReplicationProtos.ReplicationState.Builder builder =
ReplicationProtos.ReplicationState.newBuilder();
ReplicationProtos.ReplicationState state;
try {
ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
state = builder.build();
return state.getState();
} catch (IOException e) {
throw new DeserializationException(e);
}
}
}
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;


import java.util.List; import java.util.List;
import java.util.Optional;


import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;


Expand Down Expand Up @@ -70,5 +69,5 @@ void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
* Get the peer config of a replication peer. * Get the peer config of a replication peer.
* @throws ReplicationException if there are errors accessing the storage service. * @throws ReplicationException if there are errors accessing the storage service.
*/ */
Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException; ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
} }

0 comments on commit f89920a

Please sign in to comment.