Skip to content

Commit

Permalink
HBASE-20434 Also remove remote wals when peer is in DA state
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jun 28, 2018
1 parent b281328 commit 2d203c4
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 66 deletions.
Expand Up @@ -191,6 +191,10 @@ public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId); return new Path(remoteWALDir, peerId);
} }


public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}

/** /**
* Do the sleeping logic * Do the sleeping logic
* @param msg Why we sleep * @param msg Why we sleep
Expand Down
Expand Up @@ -211,7 +211,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
case CREATE_DIR_FOR_REMOTE_WAL: case CREATE_DIR_FOR_REMOTE_WAL:
MasterFileSystem mfs = env.getMasterFileSystem(); MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = new Path(remoteWALDir, peerId); Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem(); FileSystem walFs = mfs.getWALFileSystem();
try { try {
if (walFs.exists(remoteWALDirForPeer)) { if (walFs.exists(remoteWALDirForPeer)) {
Expand Down
Expand Up @@ -570,14 +570,17 @@ public int compare(Path o1, Path o2) {
} }


/** /**
* <p>
* Split a path to get the start time * Split a path to get the start time
* </p>
* <p>
* For example: 10.20.20.171%3A60020.1277499063250 * For example: 10.20.20.171%3A60020.1277499063250
* </p>
* @param p path to split * @param p path to split
* @return start time * @return start time
*/ */
private static long getTS(Path p) { private static long getTS(Path p) {
int tsIndex = p.getName().lastIndexOf('.') + 1; return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
return Long.parseLong(p.getName().substring(tsIndex));
} }
} }


Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -561,20 +563,40 @@ void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface sour
if (source.isRecovered()) { if (source.isRecovered()) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) { if (wals != null) {
cleanOldLogs(wals, log, inclusive, source); NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
cleanOldLogs(walsToRemove, source);
walsToRemove.clear();
} }
} else { } else {
NavigableSet<String> wals;
NavigableSet<String> walsToRemove;
// synchronized on walsById to avoid race with preLogRoll // synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) { synchronized (this.walsById) {
NavigableSet<String> wals = walsById.get(source.getQueueId()).get(logPrefix); wals = walsById.get(source.getQueueId()).get(logPrefix);
if (wals != null) { if (wals == null) {
cleanOldLogs(wals, log, inclusive, source); return;
}
walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
} }
walsToRemove = new TreeSet<>(walsToRemove);
}
// cleanOldLogs may spend some time, especially for sync replication where we may want to
// remove remote wals as the remote cluster may have already been down, so we do it outside
// the lock to avoid block preLogRoll
cleanOldLogs(walsToRemove, source);
// now let's remove the files in the set
synchronized (this.walsById) {
wals.removeAll(walsToRemove);
} }
} }
} }


private void removeRemoteWALs(String peerId, String remoteWALDir, Set<String> wals) private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
throws IOException { throws IOException {
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
Expand All @@ -594,46 +616,48 @@ private void removeRemoteWALs(String peerId, String remoteWALDir, Set<String> wa
} }
} }


private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
ReplicationSourceInterface source) { LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
NavigableSet<String> walSet = wals.headSet(key, inclusive);
if (walSet.isEmpty()) {
return;
}
LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
// The intention here is that, we want to delete the remote wal files ASAP as it may effect the // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
// failover time if you want to transit the remote cluster from S to A. And the infinite retry // failover time if you want to transit the remote cluster from S to A. And the infinite retry
// is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
// not contact with the HBase cluster either, so the replication will be blocked either. // not contact with the HBase cluster either, so the replication will be blocked either.
if (source.isSyncReplication()) { if (source.isSyncReplication()) {
String peerId = source.getPeerId(); String peerId = source.getPeerId();
String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir(); String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
LOG.debug("Removing {} logs from remote dir {} in the list: {}", walSet.size(), remoteWALDir, // Filter out the wals need to be removed from the remote directory. Its name should be the
walSet); // special format, and also, the peer id in its name should match the peer id for the
for (int sleepMultiplier = 0;;) { // replication source.
try { List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
removeRemoteWALs(peerId, remoteWALDir, walSet); .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
break; .collect(Collectors.toList());
} catch (IOException e) { LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, remoteWALDir, remoteWals);
peerId); if (!remoteWals.isEmpty()) {
} for (int sleepMultiplier = 0;;) {
if (!source.isSourceActive()) { try {
// skip the following operations removeRemoteWALs(peerId, remoteWALDir, remoteWals);
return; break;
} } catch (IOException e) {
if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
sleepMultiplier, maxRetriesMultiplier)) { peerId);
sleepMultiplier++; }
if (!source.isSourceActive()) {
// skip the following operations
return;
}
if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
sleepMultiplier, maxRetriesMultiplier)) {
sleepMultiplier++;
}
} }
} }
} }
String queueId = source.getQueueId(); String queueId = source.getQueueId();
for (String wal : walSet) { for (String wal : wals) {
interruptOrAbortWhenFail( interruptOrAbortWhenFail(
() -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal)); () -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
} }
walSet.clear();
} }


// public because of we call it in TestReplicationEmptyWALRecovery // public because of we call it in TestReplicationEmptyWALRecovery
Expand Down
Expand Up @@ -517,6 +517,14 @@ public void addWALActionsListener(WALActionsListener listener) {
listeners.add(listener); listeners.add(listener);
} }


private static String getWALNameGroupFromWALName(String name, int group) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
if (matcher.matches()) {
return matcher.group(group);
} else {
throw new IllegalArgumentException(name + " is not a valid wal file name");
}
}
/** /**
* Get prefix of the log from its name, assuming WAL name in format of * Get prefix of the log from its name, assuming WAL name in format of
* log_prefix.filenumber.log_suffix * log_prefix.filenumber.log_suffix
Expand All @@ -526,11 +534,10 @@ public void addWALActionsListener(WALActionsListener listener) {
* @see AbstractFSWAL#getCurrentFileName() * @see AbstractFSWAL#getCurrentFileName()
*/ */
public static String getWALPrefixFromWALName(String name) { public static String getWALPrefixFromWALName(String name) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name); return getWALNameGroupFromWALName(name, 1);
if (matcher.matches()) { }
return matcher.group(1);
} else { public static long getWALStartTimeFromWALName(String name) {
throw new IllegalArgumentException(name + " is not a valid wal file name"); return Long.parseLong(getWALNameGroupFromWALName(name, 2));
}
} }
} }
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate; import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand All @@ -48,6 +50,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams; import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
Expand All @@ -64,7 +67,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen


private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class); private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);


private static final String LOG_SUFFIX = ".syncrep"; @VisibleForTesting
public static final String LOG_SUFFIX = ".syncrep";


private final WALProvider provider; private final WALProvider provider;


Expand Down Expand Up @@ -288,4 +292,28 @@ public boolean checkState(TableName table,
return false; return false;
} }
} }

private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)");

/**
* <p>
* Returns the peer id if the wal file name is in the special group for a sync replication peer.
* </p>
* <p>
* The prefix format is &lt;factoryId&gt;-&lt;ts&gt;-&lt;peerId&gt;.
* </p>
*/
public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
if (!name.endsWith(LOG_SUFFIX)) {
// fast path to return earlier if the name is not for a sync replication peer.
return Optional.empty();
}
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix);
if (matcher.matches()) {
return Optional.of(matcher.group(1));
} else {
return Optional.empty();
}
}
} }
@@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import static org.hamcrest.CoreMatchers.endsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class);

private void waitUntilDeleted(Path remoteWAL) throws Exception {
MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
return !mfs.getWALFileSystem().exists(remoteWAL);
}

@Override
public String explainFailure() throws Exception {
return remoteWAL + " has not been deleted yet";
}
});
}

@Test
public void testRemoveRemoteWAL() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);

MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer(
new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length);
Path remoteWAL = remoteWALStatus[0].getPath();
assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);

HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
rs.getWalRoller().requestRollAll();
// The replicated wal file should be deleted finally
waitUntilDeleted(remoteWAL);
remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length);
remoteWAL = remoteWALStatus[0].getPath();
assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));

UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 100, 200);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);

// should still be there since the peer is disabled and we haven't replicated the data yet
assertTrue(mfs.getWALFileSystem().exists(remoteWAL));

UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
waitUntilReplicationDone(UTIL2, 200);
verifyThroughRegion(UTIL2, 100, 200);

// Confirm that we will also remove the remote wal files in DA state
waitUntilDeleted(remoteWAL);
}
}

0 comments on commit 2d203c4

Please sign in to comment.