Skip to content

Commit

Permalink
HBASE-23202 ExportSnapshot (import) will fail if copying files to roo…
Browse files Browse the repository at this point in the history
…t directory takes longer than cleaner TTL
  • Loading branch information
guangxuCheng authored and Huaxiang Sun committed Jun 3, 2020
1 parent a9205f8 commit 4375fec
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 108 deletions.
Expand Up @@ -33,13 +33,15 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -77,17 +79,19 @@ public class SnapshotFileCache implements Stoppable {
interface SnapshotFileInspector {
/**
* Returns a collection of file names needed by the snapshot.
* @param fs {@link FileSystem} where snapshot mainifest files are stored
* @param snapshotDir {@link Path} to the snapshot directory to scan.
* @return the collection of file names needed by the snapshot.
*/
Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
throws IOException;
}

private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
private volatile boolean stop = false;
private final FileSystem fs;
private final FileSystem fs, workingFs;
private final SnapshotFileInspector fileInspector;
private final Path snapshotDir;
private final Path snapshotDir, workingSnapshotDir;
private final Set<String> cache = new HashSet<>();
/**
* This is a helper map of information about the snapshot directories so we don't need to rescan
Expand All @@ -104,30 +108,38 @@ interface SnapshotFileInspector {
* @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
* hbase root directory
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
* @throws IOException if the {@link FileSystem} or root directory cannot be loaded
*/
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf), 0,
cacheRefreshPeriod, refreshThreadName, inspectSnapshotFiles);
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf).
getFileSystem(conf),
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
}

/**
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
* filesystem
* @param fs {@link FileSystem} where the snapshots are stored
* @param rootDir hbase root directory
* @param workingFs {@link FileSystem} where ongoing snapshot mainifest files are stored
* @param workingDir Location to store ongoing snapshot manifest files
* @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
*/
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) {
public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) {
this.fs = fs;
this.workingFs = workingFs;
this.workingSnapshotDir = workingDir;
this.fileInspector = inspectSnapshotFiles;
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
// periodically refresh the file cache to make sure we aren't superfluously saving files.
Expand Down Expand Up @@ -176,6 +188,7 @@ public synchronized void triggerCacheRefreshForTesting() {
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
final SnapshotManager snapshotManager) throws IOException {
List<FileStatus> unReferencedFiles = Lists.newArrayList();
List<String> snapshotsInProgress = null;
boolean refreshed = false;
Lock lock = null;
if (snapshotManager != null) {
Expand All @@ -197,6 +210,12 @@ public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatu
if (cache.contains(fileName)) {
continue;
}
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
}
if (snapshotsInProgress.contains(fileName)) {
continue;
}
unReferencedFiles.add(file);
}
} finally {
Expand Down Expand Up @@ -239,7 +258,8 @@ private void refreshCache() throws IOException {
// that new snapshot, even though it has the same name as the files referenced have
// probably changed.
if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath());
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(fs,
snapshotDir.getPath());
files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
}
// add all the files to cache
Expand All @@ -251,6 +271,26 @@ private void refreshCache() throws IOException {
this.snapshots.putAll(newSnapshots);
}

@VisibleForTesting
List<String> getSnapshotsInProgress() throws IOException {
List<String> snapshotInProgress = Lists.newArrayList();
// only add those files to the cache, but not to the known snapshots

FileStatus[] snapshotsInProgress = CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);

if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
for (FileStatus snapshot : snapshotsInProgress) {
try {
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(workingFs,
snapshot.getPath()));
} catch (CorruptedSnapshotException cse) {
LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
}
}
}
return snapshotInProgress;
}

/**
* Simple helper task that just periodically attempts to refresh the cache
*/
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -93,10 +94,15 @@ public void setConf(final Configuration conf) {
DEFAULT_HFILE_CACHE_REFRESH_PERIOD);
final FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
"snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
FileSystem workingFs = workingDir.getFileSystem(conf);

cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, cacheRefreshPeriod,
cacheRefreshPeriod, "snapshot-hfile-cleaner-cache-refresher",
new SnapshotFileCache.SnapshotFileInspector() {
@Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
public Collection<String> filesUnderSnapshot(final FileSystem fs,
final Path snapshotDir)
throws IOException {
return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir);
}
Expand Down
Expand Up @@ -224,7 +224,9 @@ public void process() {
verifier.verifySnapshot(this.workingDir, serverNames);

// complete the snapshot, atomically moving from tmp to .snapshot dir.
completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs, this.workingDirFs);
SnapshotDescriptionUtils.completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs,
this.workingDirFs, this.conf);
finished = true;
msg = "Snapshot " + snapshot.getName() + " of table " + snapshotTable + " completed";
status.markComplete(msg);
LOG.info(msg);
Expand Down Expand Up @@ -258,42 +260,6 @@ public void process() {
}
}

/**
* Reset the manager to allow another snapshot to proceed.
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs) throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, this.conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
finished = true;
}

/**
* When taking snapshot, first we must acquire the exclusive table lock to confirm that there are
* no ongoing merge/split procedures. But later, we should try our best to release the exclusive
Expand Down
Expand Up @@ -18,13 +18,15 @@
package org.apache.hadoop.hbase.snapshot;

import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -383,25 +385,38 @@ public static SnapshotDescription readSnapshotInfo(FileSystem fs, Path snapshotD
}

/**
* Move the finished snapshot to its final, publicly visible directory - this marks the snapshot
* as 'complete'.
* @param snapshot description of the snapshot being tabken
* @param rootdir root directory of the hbase installation
* @param workingDir directory where the in progress snapshot was built
* @param fs {@link FileSystem} where the snapshot was built
* @throws org.apache.hadoop.hbase.snapshot.SnapshotCreationException if the
* snapshot could not be moved
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
* @param conf Configuration
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public static void completeSnapshot(SnapshotDescription snapshot, Path rootdir, Path workingDir,
FileSystem fs) throws SnapshotCreationException, IOException {
Path finishedDir = getCompletedSnapshotDir(snapshot, rootdir);
LOG.debug("Snapshot is done, just moving the snapshot from " + workingDir + " to "
+ finishedDir);
if (!fs.rename(workingDir, finishedDir)) {
throw new SnapshotCreationException(
"Failed to move working directory(" + workingDir + ") to completed directory("
+ finishedDir + ").", ProtobufUtil.createSnapshotDesc(snapshot));
public static void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs, final Configuration conf)
throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
}

Expand Down

0 comments on commit 4375fec

Please sign in to comment.