Skip to content

Commit

Permalink
HBASE-20206 WALEntryStream should not switch WAL file silently
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Mar 19, 2018
1 parent 3f906ba commit 67f0134
Show file tree
Hide file tree
Showing 15 changed files with 379 additions and 217 deletions.
Expand Up @@ -63,7 +63,7 @@ void removeWAL(ServerName serverName, String queueId, String fileName)
* @param serverName the name of the regionserver * @param serverName the name of the regionserver
* @param queueId a String that identifies the queue * @param queueId a String that identifies the queue
* @param fileName name of the WAL * @param fileName name of the WAL
* @param position the current position in the file * @param position the current position in the file. Will ignore if less than or equal to 0.
* @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
*/ */
void setWALPosition(ServerName serverName, String queueId, String fileName, long position, void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Expand Down
Expand Up @@ -194,27 +194,28 @@ public void setWALPosition(ServerName serverName, String queueId, String fileNam
Map<String, Long> lastSeqIds) throws ReplicationException { Map<String, Long> lastSeqIds) throws ReplicationException {
try { try {
List<ZKUtilOp> listOfOps = new ArrayList<>(); List<ZKUtilOp> listOfOps = new ArrayList<>();
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), if (position > 0) {
ZKUtil.positionToByteArray(position))); listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
ZKUtil.positionToByteArray(position)));
}
// Persist the max sequence id(s) of regions for serial replication atomically. // Persist the max sequence id(s) of regions for serial replication atomically.
if (lastSeqIds != null && lastSeqIds.size() > 0) { for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String peerId = new ReplicationQueueInfo(queueId).getPeerId(); String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); /*
/* * Make sure the existence of path
* Make sure the existence of path * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in * multiOrSequential() method said, if received a NodeExistsException, all operations will
* multiOrSequential() method said, if received a NodeExistsException, all operations will * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
* fail. So create the path here, and in fact, no need to add this operation to listOfOps, * because only need to make sure that update file position and sequence id atomically.
* because only need to make sure that update file position and sequence id atomically. */
*/ ZKUtil.createWithParents(zookeeper, path);
ZKUtil.createWithParents(zookeeper, path); // Persist the max sequence id of region to zookeeper.
// Persist the max sequence id of region to zookeeper. listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
listOfOps }
.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); if (!listOfOps.isEmpty()) {
} ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} }
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
Expand Down
Expand Up @@ -25,6 +25,7 @@
import static org.junit.Assert.fail; import static org.junit.Assert.fail;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;


import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void testReplicationQueues() throws ReplicationException {
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
rqs.setWALPosition(server3, "qId5", "filename4", 354L, null); rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap());
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));


assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
Expand Down
Expand Up @@ -24,6 +24,7 @@


import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
Expand Down Expand Up @@ -136,9 +137,10 @@ public void testAddRemoveLog() throws ReplicationException {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null); STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
Collections.emptyMap());
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
null); Collections.emptyMap());
} }


for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -64,38 +63,6 @@ protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
} }


private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down
reader.setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(new WALEntryBatch(0, currentPath));
}

@Override
protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
if (replicationPeer.getPeerConfig().isSerial()) {
return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
this) {

@Override
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
}
};
} else {
return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) {

@Override
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
}
};
}
}

public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException { public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false; boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths = PriorityBlockingQueue<Path> newPaths =
Expand Down
Expand Up @@ -48,13 +48,10 @@ public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
} }


@Override @Override
protected void postShipEdits(WALEntryBatch entryBatch) { protected void noMoreData() {
if (entryBatch.getWalEntries().isEmpty()) { LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " source.getSourceMetrics().incrCompletedRecoveryQueue();
+ source.getQueueId()); setWorkerState(WorkerState.FINISHED);
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
}
} }


@Override @Override
Expand All @@ -63,7 +60,7 @@ protected void postFinish() {
} }


@Override @Override
public long getStartPosition() { long getStartPosition() {
long startPosition = getRecoveredQueueStartPos(); long startPosition = getRecoveredQueueStartPos();
int numRetries = 0; int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) { while (numRetries <= maxRetriesMultiplier) {
Expand Down
Expand Up @@ -315,7 +315,7 @@ protected ReplicationSourceShipper createNewShipper(String walGroupId,
return new ReplicationSourceShipper(conf, walGroupId, queue, this); return new ReplicationSourceShipper(conf, walGroupId, queue, this);
} }


protected ReplicationSourceWALReader createNewWALReader(String walGroupId, private ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) { PriorityBlockingQueue<Path> queue, long startPosition) {
return replicationPeer.getPeerConfig().isSerial() return replicationPeer.getPeerConfig().isSerial()
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
Expand Down

0 comments on commit 67f0134

Please sign in to comment.