diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java index 1adda02e6318..4684f0827bcb 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -42,7 +42,8 @@ void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) /** * Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}. - * @throws ReplicationException if there are errors accessing the storage service. + * @throws ReplicationException if there are errors accessing the storage service or peer does not + * exist. */ void setPeerState(String peerId, boolean enabled) throws ReplicationException; diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java index 462cfedd0a04..cbfec3bdb0d2 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -29,6 +30,15 @@ @InterfaceAudience.Private public final class ReplicationStorageFactory { + public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl"; + public static final String DEFAULT_REPLICATION_PEER_STORAGE_IMPL = + ZKReplicationPeerStorage.class.getName(); + + public static final String REPLICATION_QUEUE_STORAGE_IMPL = + "hbase.replication.queue.storage.impl"; + public static final String DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL = + ZKReplicationQueueStorage.class.getName(); + private ReplicationStorageFactory() { } @@ -36,7 +46,10 @@ private ReplicationStorageFactory() { * Create a new {@link ReplicationPeerStorage}. */ public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) { - return new ZKReplicationPeerStorage(zk, conf); + String peerStorageClass = + conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL); + return ReflectionUtils.instantiateWithCustomCtor(peerStorageClass, + new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf }); } /** @@ -44,6 +57,9 @@ public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Con */ public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk, Configuration conf) { - return new ZKReplicationQueueStorage(zk, conf); + String queueStorageClass = + conf.get(REPLICATION_QUEUE_STORAGE_IMPL, DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL); + return ReflectionUtils.instantiateWithCustomCtor(queueStorageClass, + new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf }); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index e2479e035ef7..2e86c1709b52 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.toByteArray; + import java.io.IOException; import java.util.Collection; import java.util.List; @@ -30,12 +32,19 @@ import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + /** * Helper class for replication. */ @InterfaceAudience.Private public final class ReplicationUtils { + public static final byte[] PEER_STATE_ENABLED_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); + public static final byte[] PEER_STATE_DISABLED_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + private ReplicationUtils() { } @@ -173,4 +182,8 @@ public static boolean contains(ReplicationPeerConfig peerConfig, TableName table return tableCFs != null && tableCFs.containsKey(tableName); } } + + public static String parsePeerIdFromQueueId(String queueId) { + return new ReplicationQueueInfo(queueId).getPeerId(); + } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java new file mode 100644 index 000000000000..ee7969b0ed0b --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Table based replication peer storage. + */ +@InterfaceAudience.Private +public class TableReplicationPeerStorage extends TableReplicationStorageBase + implements ReplicationPeerStorage { + + public TableReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) throws IOException { + super(zookeeper, conf); + } + + @Override + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG, + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE, + enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES); + table.put(put); + } catch (IOException e) { + throw new ReplicationException("Failed to add peer " + peerId, e); + } + } + + @Override + public void removePeer(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Delete delete = new Delete(Bytes.toBytes(peerId)); + table.delete(delete); + } catch (IOException e) { + throw new ReplicationException("Failed to remove peer " + peerId, e); + } + } + + // TODO make it to be a checkExistAndMutate operation. + private boolean peerExist(String peerId, Table table) throws IOException { + Get get = new Get(Bytes.toBytes(peerId)); + get.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE); + return table.exists(get); + } + + @Override + public void setPeerState(String peerId, boolean enabled) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + if (!peerExist(peerId, table)) { + throw new ReplicationException("Peer " + peerId + " does not exist."); + } + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE, + enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES); + table.put(put); + } catch (IOException e) { + throw new ReplicationException( + "Failed to set peer state, peerId=" + peerId + ", state=" + enabled, e); + } + } + + @Override + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + if (!peerExist(peerId, table)) { + throw new ReplicationException("Peer " + peerId + " does not exist."); + } + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG, + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + table.put(put); + } catch (IOException e) { + throw new ReplicationException("Failed to update peer configuration, peerId=" + peerId, e); + } + } + + @Override + public List listPeerIds() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE); + try (ResultScanner scanner = table.getScanner(scan)) { + List peerIds = new ArrayList<>(); + for (Result r : scanner) { + peerIds.add(Bytes.toString(r.getRow())); + } + return peerIds; + } + } catch (IOException e) { + throw new ReplicationException("Failed to list peers", e); + } + } + + @Override + public boolean isPeerEnabled(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE); + Result r = table.get(get); + if (r == null) { + throw new ReplicationException("Peer " + peerId + " does not found"); + } + return Arrays.equals(PEER_STATE_ENABLED_BYTES, r.getValue(FAMILY_PEER, QUALIFIER_PEER_STATE)); + } catch (IOException e) { + throw new ReplicationException("Failed to read the peer state, peerId=" + peerId, e); + } + } + + @Override + public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG); + Result r = table.get(get); + if (r == null) { + throw new ReplicationException("Peer " + peerId + " does not found"); + } + byte[] data = r.getValue(FAMILY_PEER, QUALIFIER_PEER_CONFIG); + if (data == null || data.length == 0) { + throw new ReplicationException( + "Replication peer config data shouldn't be empty, peerId=" + peerId); + } + try { + return ReplicationPeerConfigUtil.parsePeerFrom(data); + } catch (DeserializationException e) { + throw new ReplicationException( + "Failed to parse replication peer config for peer with id=" + peerId, e); + } + } catch (IOException e) { + throw new ReplicationException( + "Failed to read the peer configuration in hbase:replication, peerId=" + peerId, e); + } + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java new file mode 100644 index 000000000000..abb279de1186 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Table based replication queue storage. + */ +@InterfaceAudience.Private +public class TableReplicationQueueStorage extends TableReplicationStorageBase + implements ReplicationQueueStorage { + + private static final Logger LOG = LoggerFactory.getLogger(TableReplicationQueueStorage.class); + + public TableReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) throws IOException { + super(zookeeper, conf); + } + + /** + * Serialize the {fileName, position} pair into a byte array. + */ + private static byte[] makeByteArray(String fileName, long position) { + byte[] data = new byte[Bytes.SIZEOF_INT + fileName.length() + Bytes.SIZEOF_LONG]; + int pos = 0; + pos = Bytes.putInt(data, pos, fileName.length()); + pos = Bytes.putBytes(data, pos, Bytes.toBytes(fileName), 0, fileName.length()); + pos = Bytes.putLong(data, pos, position); + assert pos == data.length; + return data; + } + + /** + * Deserialize the byte array into a {filename, position} pair. + */ + private static Pair parseFileNameAndPosition(byte[] data, int offset) + throws IOException { + if (data == null) { + throw new IOException("The byte array shouldn't be null"); + } + int pos = offset; + int len = Bytes.toInt(data, pos, Bytes.SIZEOF_INT); + pos += Bytes.SIZEOF_INT; + if (pos + len > data.length) { + throw new IllegalArgumentException("offset (" + pos + ") + length (" + len + ") exceed the" + + " capacity of the array: " + data.length); + } + String fileName = Bytes.toString(Bytes.copy(data, pos, len)); + pos += len; + long position = Bytes.toLong(data, pos, Bytes.SIZEOF_LONG); + return new Pair<>(fileName, position); + } + + @Override + public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Delete delete = new Delete(getServerNameRowKey(serverName)); + delete.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId)); + // Delete all pairs. + delete.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP); + table.delete(delete); + } catch (IOException e) { + throw new ReplicationException( + "Failed to remove wal from queue, serverName=" + serverName + ", queueId=" + queueId, e); + } + } + + @Override + public void addWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Put put = new Put(getServerNameRowKey(serverName)); + put.addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, HConstants.EMPTY_BYTE_ARRAY); + put.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId), HConstants.EMPTY_BYTE_ARRAY); + put.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), makeByteArray(fileName, 0L)); + table.put(put); + } catch (IOException e) { + throw new ReplicationException("Failed to add wal to queue, serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName, e); + } + } + + @Override + public void removeWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Optional walCell = getWALsInQueue0(table, serverName, queueId).stream() + .filter(w -> w.fileNameMatch(fileName)).findFirst(); + if (walCell.isPresent()) { + Delete delete = new Delete(getServerNameRowKey(walCell.get().serverName)) + .addColumn(FAMILY_WAL, Bytes.toBytes(queueId), walCell.get().cellTimestamp); + table.delete(delete); + } else { + LOG.warn(fileName + " has already been deleted when removing log"); + } + } catch (IOException e) { + throw new ReplicationException("Failed to remove wal from queue, serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName, e); + } + } + + @Override + public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, + Map lastSeqIds) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Optional walCell = getWALsInQueue0(table, serverName, queueId).stream() + .filter(w -> w.fileNameMatch(fileName)).findFirst(); + if (walCell.isPresent()) { + List puts = new ArrayList<>(); + Put put = new Put(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, + Bytes.toBytes(walCell.get().queueId), walCell.get().cellTimestamp, + makeByteArray(fileName, position)); + puts.add(put); + // Update the last pushed sequence id for each region in a batch. + String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); + if (lastSeqIds != null && lastSeqIds.size() > 0) { + for (Map.Entry e : lastSeqIds.entrySet()) { + Put regionPut = new Put(Bytes.toBytes(peerId)).addColumn(FAMILY_REGIONS, + getRegionQualifier(e.getKey()), Bytes.toBytes(e.getValue())); + puts.add(regionPut); + } + } + table.put(puts); + } else { + throw new ReplicationException("WAL file " + fileName + " does not found under queue " + + queueId + " for server " + serverName); + } + } catch (IOException e) { + throw new ReplicationException( + "Failed to set wal position and last sequence ids, serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position, + e); + } + } + + @Override + public long getLastSequenceId(String encodedRegionName, String peerId) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)); + get.addColumn(FAMILY_REGIONS, getRegionQualifier(encodedRegionName)); + Result r = table.get(get); + if (r == null || r.listCells() == null) { + return HConstants.NO_SEQNUM; + } + return Bytes.toLong(r.getValue(FAMILY_REGIONS, getRegionQualifier(encodedRegionName))); + } catch (IOException e) { + throw new ReplicationException( + "Failed to get last sequence id, region=" + encodedRegionName + ", peerId=" + peerId, e); + } + } + + @Override + public long getWALPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Optional walCell = getWALsInQueue0(table, serverName, queueId).stream() + .filter(w -> w.fileNameMatch(fileName)).findFirst(); + if (walCell.isPresent()) { + return walCell.get().position; + } else { + LOG.warn("WAL " + fileName + " does not found under queue " + queueId + " for server " + + serverName); + return 0; + } + } catch (IOException e) { + throw new ReplicationException("Failed to get wal position. serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName, e); + } + } + + /** + * Each cell in column wal:{queueId} will be parsed to a WALCell. The WALCell will be more + * friendly to upper layer. + */ + private static final class WALCell { + ServerName serverName; + String queueId; + String wal; + long position; + long cellTimestamp;// Timestamp of the cell + + private WALCell(ServerName serverName, String queueId, String wal, long position, + long cellTimestamp) { + this.serverName = serverName; + this.queueId = queueId; + this.wal = wal; + this.position = position; + this.cellTimestamp = cellTimestamp; + } + + public static WALCell create(Cell cell) throws IOException { + ServerName serverName = ServerName.parseServerName( + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + String queueId = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + Pair fileAndPos = + parseFileNameAndPosition(cell.getValueArray(), cell.getValueOffset()); + return new WALCell(serverName, queueId, fileAndPos.getFirst(), fileAndPos.getSecond(), + cell.getTimestamp()); + } + + public boolean fileNameMatch(String fileName) { + return StringUtils.equals(wal, fileName); + } + } + + /** + * Parse the WALCell list from a HBase result. + */ + private List result2WALCells(Result r) throws IOException { + List wals = new ArrayList<>(); + if (r != null && r.listCells() != null && r.listCells().size() > 0) { + for (Cell cell : r.listCells()) { + wals.add(WALCell.create(cell)); + } + } + return wals; + } + + /** + * List all WALs for the specific region server and queueId. + */ + private List getWALsInQueue0(Table table, ServerName serverName, String queueId) + throws IOException { + Get get = new Get(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, Bytes.toBytes(queueId)) + .readAllVersions(); + return result2WALCells(table.get(get)); + } + + @Override + public List getWALsInQueue(ServerName serverName, String queueId) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + return getWALsInQueue0(table, serverName, queueId).stream().map(p -> p.wal) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new ReplicationException( + "Failed to get wals in queue. serverName=" + serverName + ", queueId=" + queueId, e); + } + } + + @Override + public List getAllQueues(ServerName serverName) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + List queues = new ArrayList<>(); + Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_QUEUE); + Result r = table.get(get); + if (r != null && r.listCells() != null && r.listCells().size() > 0) { + for (Cell c : r.listCells()) { + String queue = + Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()); + queues.add(queue); + } + } + return queues; + } catch (IOException e) { + throw new ReplicationException("Failed to get all queues. serverName=" + serverName, e); + } + } + + @Override + public Pair> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException { + LOG.info( + "Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + try (Table table = getReplicationMetaTable()) { + // Create an enabled region server for destination. + byte[] destServerNameRowKey = getServerNameRowKey(destServerName); + byte[] srcServerNameRowKey = getServerNameRowKey(sourceServerName); + Put put = new Put(destServerNameRowKey).addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, + HConstants.EMPTY_BYTE_ARRAY); + table.put(put); + List wals = getWALsInQueue0(table, sourceServerName, queueId); + String newQueueId = queueId + "-" + sourceServerName; + // Remove the queue in source region server if wal set of the queue is empty. + if (CollectionUtils.isEmpty(wals)) { + Delete delete = + new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId)) + .addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP); + table.delete(delete); + LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty"); + return new Pair<>(newQueueId, Collections.emptySortedSet()); + } + // Transfer all wals from source region server to destination region server in a batch. + List mutations = new ArrayList<>(); + // a. Create queue for destination server. + mutations.add(new Put(destServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(newQueueId), + HConstants.EMPTY_BYTE_ARRAY)); + SortedSet logQueue = new TreeSet<>(); + for (WALCell wal : wals) { + byte[] data = makeByteArray(wal.wal, wal.cellTimestamp); + // b. Add wal to destination server. + mutations.add( + new Put(destServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(newQueueId), data)); + // c. Remove wal from source server. + mutations.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(queueId), + wal.cellTimestamp)); + logQueue.add(wal.wal); + } + // d. Remove the queue of source server. + mutations + .add(new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId))); + Object[] results = new Object[mutations.size()]; + table.batch(mutations, results); + boolean allSuccess = Stream.of(results).allMatch(r -> r != null); + if (!allSuccess) { + throw new ReplicationException("Claim queue queueId=" + queueId + " from " + + sourceServerName + " to " + destServerName + " failed, not all mutations success."); + } + LOG.info( + "Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + return new Pair<>(newQueueId, logQueue); + } catch (IOException | InterruptedException e) { + throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName + + " to " + destServerName + " failed", e); + } + } + + @Override + public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { + // TODO Make this to be a checkAndDelete, and provide a UT for it. + try (Table table = getReplicationMetaTable()) { + Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_WAL).readAllVersions(); + Result r = table.get(get); + if (r == null || r.listCells() == null || r.listCells().size() == 0) { + Delete delete = new Delete(getServerNameRowKey(serverName)); + table.delete(delete); + } + } catch (IOException e) { + throw new ReplicationException( + "Failed to remove replicator when queue is empty, serverName=" + serverName, e); + } + } + + @Override + public List getListOfReplicators() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED).readVersions(1); + Set serverNames = new HashSet<>(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result r : scanner) { + if (r.listCells().size() > 0) { + Cell firstCell = r.listCells().get(0); + String serverName = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(), + firstCell.getRowLength()); + serverNames.add(ServerName.parseServerName(serverName)); + } + } + } + return new ArrayList<>(serverNames); + } catch (IOException e) { + throw new ReplicationException("Failed to get list of replicators", e); + } + } + + @Override + public Set getAllWALs() throws ReplicationException { + Set walSet = new HashSet<>(); + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addFamily(FAMILY_WAL).readAllVersions(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result r : scanner) { + result2WALCells(r).forEach(w -> walSet.add(w.wal)); + } + } + return walSet; + } catch (IOException e) { + throw new ReplicationException("Failed to get all wals", e); + } + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + // Need to do nothing. + } + + @Override + public void removePeerFromHFileRefs(String peerId) throws ReplicationException { + // Need to do nothing. + } + + @Override + public void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + List puts = new ArrayList<>(); + for (Pair p : pairs) { + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_HFILE_REFS, Bytes.toBytes(p.getSecond().getName()), + HConstants.EMPTY_BYTE_ARRAY); + puts.add(put); + } + table.put(puts); + } catch (IOException e) { + throw new ReplicationException("Failed to add hfile refs, peerId=" + peerId, e); + } + } + + @Override + public void removeHFileRefs(String peerId, List files) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + List deletes = new ArrayList<>(); + for (String file : files) { + Delete delete = new Delete(Bytes.toBytes(peerId)); + delete.addColumns(FAMILY_HFILE_REFS, Bytes.toBytes(file)); + deletes.add(delete); + } + table.delete(deletes); + } catch (IOException e) { + throw new ReplicationException("Failed to remove hfile refs, peerId=" + peerId, e); + } + } + + @Override + public List getAllPeersFromHFileRefsQueue() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Set peers = new HashSet<>(); + Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result r : scanner) { + if (r.listCells().size() > 0) { + Cell firstCell = r.listCells().get(0); + String peerId = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(), + firstCell.getRowLength()); + peers.add(peerId); + } + } + } + return new ArrayList<>(peers); + } catch (IOException e) { + throw new ReplicationException("Faield to get all peers by reading hbase:replication meta", + e); + } + } + + @Override + public List getReplicableHFiles(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)).addFamily(FAMILY_HFILE_REFS); + Result r = table.get(get); + List hfiles = new ArrayList<>(); + if (r != null && r.listCells() != null) { + for (Cell c : r.listCells()) { + hfiles.add( + Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength())); + } + } + return hfiles; + } catch (IOException e) { + throw new ReplicationException("Failed to get replicable hfiles, peerId=" + peerId, e); + } + } + + @Override + public Set getAllHFileRefs() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS); + try (ResultScanner scanner = table.getScanner(scan)) { + Set hfileSet = new HashSet<>(); + for (Result r : scanner) { + for (Cell c : r.listCells()) { + String hfile = Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), + c.getQualifierLength()); + hfileSet.add(hfile); + } + } + return hfileSet; + } + } catch (IOException e) { + throw new ReplicationException("Failed to get all hfile refs", e); + } + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java new file mode 100644 index 000000000000..fd2b574e07f3 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Private +public class TableReplicationStorageBase { + protected final ZKWatcher zookeeper; + protected final Configuration conf; + + public static final TableName REPLICATION_TABLE = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + + // Peer family, the row key would be peer id. + public static final byte[] FAMILY_PEER = Bytes.toBytes("peer"); + public static final byte[] QUALIFIER_PEER_CONFIG = Bytes.toBytes("config"); + public static final byte[] QUALIFIER_PEER_STATE = Bytes.toBytes("state"); + + // Region server state family, the row key would be name of region server. + public static final byte[] FAMILY_RS_STATE = Bytes.toBytes("rs_state"); + public static final byte[] QUALIFIER_STATE_ENABLED = Bytes.toBytes("enabled"); + + // Queue and wal family, the row key would be name of region server. + public static final byte[] FAMILY_QUEUE = Bytes.toBytes("queue"); + public static final byte[] FAMILY_WAL = Bytes.toBytes("wal"); + + // HFile-Refs family, the row key would be peer id. + public static final byte[] FAMILY_HFILE_REFS = Bytes.toBytes("hfile-refs"); + + // Region family, the row key would be peer id. + public static final byte[] FAMILY_REGIONS = Bytes.toBytes("regions"); + + private Connection connection; + + protected static byte[] getServerNameRowKey(ServerName serverName) { + return Bytes.toBytes(serverName.toString()); + } + + protected static byte[] getRegionQualifier(String encodedRegionName) { + return Bytes.toBytes(encodedRegionName); + } + + @VisibleForTesting + public static TableDescriptorBuilder createReplicationTableDescBuilder(final Configuration conf) + throws IOException { + int metaMaxVersion = + conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS); + int metaBlockSize = + conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE); + return TableDescriptorBuilder + .newBuilder(REPLICATION_TABLE) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_PEER).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_RS_STATE).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_QUEUE).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_WAL) + .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true) + .setBlocksize(metaBlockSize).setScope(HConstants.REPLICATION_SCOPE_LOCAL) + .setBloomFilterType(BloomType.NONE).build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_REGIONS).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_HFILE_REFS) + .setMaxVersions(metaMaxVersion).setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()); + } + + protected TableReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) + throws IOException { + this.zookeeper = zookeeper; + this.conf = conf; + this.connection = ConnectionFactory.createConnection(conf); + } + + protected Table getReplicationMetaTable() throws IOException { + return this.connection.getTable(REPLICATION_TABLE); + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index a53500a89059..138f14a1998a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES; + import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -31,8 +34,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; - /** * ZK based replication peer storage. */ @@ -46,11 +47,6 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state"; public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state"; - public static final byte[] ENABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); - public static final byte[] DISABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); - /** * The name of the znode that contains the replication status of a remote slave (i.e. peer) * cluster. @@ -89,7 +85,7 @@ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean ena ZKUtilOp.createAndFailSilent(getPeerNode(peerId), ReplicationPeerConfigUtil.toByteArray(peerConfig)), ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), - enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), + enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES)), false); } catch (KeeperException e) { throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" @@ -108,7 +104,7 @@ public void removePeer(String peerId) throws ReplicationException { @Override public void setPeerState(String peerId, boolean enabled) throws ReplicationException { - byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; + byte[] stateBytes = enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES; try { ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); } catch (KeeperException e) { @@ -140,7 +136,7 @@ public List listPeerIds() throws ReplicationException { @Override public boolean isPeerEnabled(String peerId) throws ReplicationException { try { - return Arrays.equals(ENABLED_ZNODE_BYTES, + return Arrays.equals(PEER_STATE_ENABLED_BYTES, ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); } catch (KeeperException | InterruptedException e) { throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 9a281b16da36..fa0ff0e7b636 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -79,7 +79,7 @@ * */ @InterfaceAudience.Private -class ZKReplicationQueueStorage extends ZKReplicationStorageBase +public class ZKReplicationQueueStorage extends ZKReplicationStorageBase implements ReplicationQueueStorage { private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); @@ -199,7 +199,7 @@ public void setWALPosition(ServerName serverName, String queueId, String fileNam // Persist the max sequence id(s) of regions for serial replication atomically. if (lastSeqIds != null && lastSeqIds.size() > 0) { for (Entry lastSeqEntry : lastSeqIds.entrySet()) { - String peerId = new ReplicationQueueInfo(queueId).getPeerId(); + String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); /* * Make sure the existence of path @@ -382,7 +382,7 @@ public List getAllQueues(ServerName serverName) throws ReplicationExcept // will be overridden in UTs @VisibleForTesting - protected int getQueuesZNodeCversion() throws KeeperException { + public int getQueuesZNodeCversion() throws KeeperException { Stat stat = new Stat(); ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); return stat.getCversion(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 6d75fec9fddd..4a36e136404a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -170,9 +170,9 @@ protected static void setupZkAndReplication() throws Exception { + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", - ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); + ReplicationUtils.PEER_STATE_ENABLED_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); - ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); + ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationUtils.PEER_STATE_ENABLED_BYTES); ZKClusterId.setClusterId(zkw, new ClusterId()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java similarity index 95% rename from hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java index 5999c1ffb00a..461420e4de96 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.replication; +package org.apache.hadoop.hbase.replication.storage; import static org.hamcrest.CoreMatchers.hasItems; import static org.junit.Assert.assertEquals; @@ -30,7 +30,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.zookeeper.KeeperException; @@ -222,18 +228,19 @@ public void testReplicationPeers() throws Exception { try { rp.getPeerStorage().setPeerState("bogus", true); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); + fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId"); } catch (ReplicationException e) { } try { rp.getPeerStorage().setPeerState("bogus", false); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); + fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId"); } catch (ReplicationException e) { } try { assertFalse(rp.addPeer("bogus")); - fail("Should have thrown an ReplicationException when passed a bogus peerId"); + fail("Should have thrown an ReplicationException when creating a bogus peerId " + + "with null peer config"); } catch (ReplicationException e) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java new file mode 100644 index 000000000000..d073669e512a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.storage; + +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.TableReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.TableReplicationStorageBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationStateTableImpl extends TestReplicationStateBasic { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationStateTableImpl.class); + + private static Configuration conf; + private static HBaseTestingUtility utility = new HBaseTestingUtility(); + private static ZKWatcher zkw; + private static Connection connection; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = utility.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + utility.startMiniCluster(); + + // After the HBase Mini cluster startup, we set the storage implementation to table based + // implementation. Otherwise, we cannot setup the HBase Mini Cluster because the master will + // list peers before finish its initialization, and if master cannot finish initialization, the + // meta cannot be online, in other hand, if meta cannot be online, the list peers never success + // when using table based replication. a dead loop happen. + // Our UTs are written for testing storage layer, so no problem here. + conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + TableReplicationPeerStorage.class.getName()); + conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_STORAGE_IMPL, + TableReplicationQueueStorage.class.getName()); + + zkw = utility.getZooKeeperWatcher(); + connection = ConnectionFactory.createConnection(conf); + + KEY_ONE = initPeerClusterState("/hbase1"); + KEY_TWO = initPeerClusterState("/hbase2"); + } + + private static String initPeerClusterState(String baseZKNode) + throws IOException, KeeperException { + // Add a dummy region server and set up the cluster id + Configuration testConf = new Configuration(conf); + testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); + ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); + String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); + ZKUtil.createWithParents(zkw1, fakeRs); + ZKClusterId.setClusterId(zkw1, new ClusterId()); + return ZKConfig.getZooKeeperClusterKey(testConf); + } + + @Before + public void setUp() throws IOException { + rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + rp = ReplicationFactory.getReplicationPeers(zkw, conf); + OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); + + // Create hbase:replication meta table. + try (Admin admin = connection.getAdmin()) { + TableDescriptor table = + TableReplicationStorageBase.createReplicationTableDescBuilder(conf).build(); + admin.createTable(table); + } + } + + @After + public void tearDown() throws KeeperException, IOException { + // Drop the hbase:replication meta table. + utility.deleteTable(TableReplicationStorageBase.REPLICATION_TABLE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (connection != null) { + IOUtils.closeQuietly(connection); + } + utility.shutdownMiniZKCluster(); + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java similarity index 95% rename from hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java index 08178f4c3fb0..993f2fb70faf 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java @@ -15,14 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.replication; +package org.apache.hadoop.hbase.replication.storage; import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java similarity index 96% rename from hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java index 3290fb0097f6..190eef4dfc51 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.replication; +package org.apache.hadoop.hbase.replication.storage; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; @@ -33,9 +33,13 @@ import java.util.Random; import java.util.Set; import java.util.stream.Stream; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.junit.AfterClass; diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java similarity index 97% rename from hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java index 8ff52f3c16b4..780ff2a1342d 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.replication; +package org.apache.hadoop.hbase.replication.storage; import static org.hamcrest.CoreMatchers.hasItems; import static org.junit.Assert.assertEquals; @@ -27,10 +27,13 @@ import java.util.List; import java.util.Set; import java.util.SortedSet; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Pair; @@ -218,7 +221,7 @@ private ZKReplicationQueueStorage createWithUnstableCversion() throws IOExceptio private int called = 0; @Override - protected int getQueuesZNodeCversion() throws KeeperException { + public int getQueuesZNodeCversion() throws KeeperException { if (called < 4) { called++; }