Skip to content

Commit

Permalink
HADOOP-15209. DistCp to eliminate needless deletion of files under al…
Browse files Browse the repository at this point in the history
…ready-deleted directories.

Contributed by Steve Loughran.
  • Loading branch information
steveloughran committed Mar 15, 2018
1 parent 78b05fd commit 1976e00
Show file tree
Hide file tree
Showing 20 changed files with 1,510 additions and 211 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
public static void verifyFileContents(FileSystem fs, public static void verifyFileContents(FileSystem fs,
Path path, Path path,
byte[] original) throws IOException { byte[] original) throws IOException {
assertIsFile(fs, path);
FileStatus stat = fs.getFileStatus(path); FileStatus stat = fs.getFileStatus(path);
String statText = stat.toString(); String statText = stat.toString();
assertTrue("not a file " + statText, stat.isFile());
assertEquals("wrong length " + statText, original.length, stat.getLen()); assertEquals("wrong length " + statText, original.length, stat.getLen());
byte[] bytes = readDataset(fs, path, original.length); byte[] bytes = readDataset(fs, path, original.length);
compareByteArrays(original, bytes, original.length); compareByteArrays(original, bytes, original.length);
Expand Down Expand Up @@ -853,6 +853,36 @@ public static void assertIsFile(Path filename, FileStatus status) {
status.isSymlink()); status.isSymlink());
} }


/**
* Assert that a varargs list of paths exist.
* @param fs filesystem
* @param message message for exceptions
* @param paths paths
* @throws IOException IO failure
*/
public static void assertPathsExist(FileSystem fs,
String message,
Path... paths) throws IOException {
for (Path path : paths) {
assertPathExists(fs, message, path);
}
}

/**
* Assert that a varargs list of paths do not exist.
* @param fs filesystem
* @param message message for exceptions
* @param paths paths
* @throws IOException IO failure
*/
public static void assertPathsDoNotExist(FileSystem fs,
String message,
Path... paths) throws IOException {
for (Path path : paths) {
assertPathDoesNotExist(fs, message, path);
}
}

/** /**
* Create a dataset for use in the tests; all data is in the range * Create a dataset for use in the tests; all data is in the range
* base to (base+modulo-1) inclusive. * base to (base+modulo-1) inclusive.
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@


package org.apache.hadoop.fs.contract.s3a; package org.apache.hadoop.fs.contract.s3a;


import java.io.IOException;

import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS; import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;


/** /**
Expand Down Expand Up @@ -57,4 +61,17 @@ protected Configuration createConfiguration() {
protected S3AContract createContract(Configuration conf) { protected S3AContract createContract(Configuration conf) {
return new S3AContract(conf); return new S3AContract(conf);
} }

/**
* Always inject the delay path in, so if the destination is inconsistent,
* and uses this key, inconsistency triggered.
* @param filepath path string in
* @return path on the remote FS for distcp
* @throws IOException IO failure
*/
@Override
protected Path path(final String filepath) throws IOException {
Path path = super.path(filepath);
return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
}
} }
19 changes: 19 additions & 0 deletions hadoop-tools/hadoop-azure-datalake/pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -147,5 +147,24 @@
<version>${okhttp.version}</version> <version>${okhttp.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies> </dependencies>
</project> </project>
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.hadoop.fs.adl.live;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;

/**
* Test DistCP operations.
*/
public class TestAdlContractDistCpLive extends AbstractContractDistCpTest {

@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}

}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ private void validateFinalListing(Path pathToListFile, DistCpContext context)
throws DuplicateFileException, IOException { throws DuplicateFileException, IOException {


Configuration config = getConf(); Configuration config = getConf();
FileSystem fs = pathToListFile.getFileSystem(config);


final boolean splitLargeFile = context.splitLargeFile(); final boolean splitLargeFile = context.splitLargeFile();


Expand All @@ -153,7 +152,7 @@ private void validateFinalListing(Path pathToListFile, DistCpContext context)
// <chunkOffset, chunkLength> is continuous. // <chunkOffset, chunkLength> is continuous.
// //
Path checkPath = splitLargeFile? Path checkPath = splitLargeFile?
pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile); pathToListFile : DistCpUtils.sortListing(config, pathToListFile);


SequenceFile.Reader reader = new SequenceFile.Reader( SequenceFile.Reader reader = new SequenceFile.Reader(
config, SequenceFile.Reader.file(checkPath)); config, SequenceFile.Reader.file(checkPath));
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map.Entry; import java.util.Map.Entry;


import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
Expand All @@ -46,8 +47,18 @@
/** /**
* CopyListingFileStatus is a view of {@link FileStatus}, recording additional * CopyListingFileStatus is a view of {@link FileStatus}, recording additional
* data members useful to distcp. * data members useful to distcp.
*
* This is the datastructure persisted in the sequence files generated
* in the CopyCommitter when deleting files.
* Any tool working with these generated files needs to be aware of an
* important stability guarantee: there is none; expect it to change
* across minor Hadoop releases without any support for reading the files of
* different versions.
* Tools parsing the listings must be built and tested against the point
* release of Hadoop which they intend to support.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate("Distcp support tools")
@InterfaceStability.Unstable
public final class CopyListingFileStatus implements Writable { public final class CopyListingFileStatus implements Writable {


private static final byte NO_ACL_ENTRIES = -1; private static final byte NO_ACL_ENTRIES = -1;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@


package org.apache.hadoop.tools; package org.apache.hadoop.tools;


import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;


/** /**
* Utility class to hold commonly used constants. * Utility class to hold commonly used constants.
*/ */
public class DistCpConstants { @InterfaceAudience.LimitedPrivate("Distcp support tools")
@InterfaceStability.Evolving
public final class DistCpConstants {

private DistCpConstants() {
}


/* Default number of threads to use for building file listing */ /* Default number of threads to use for building file listing */
public static final int DEFAULT_LISTSTATUS_THREADS = 1; public static final int DEFAULT_LISTSTATUS_THREADS = 1;
Expand Down Expand Up @@ -52,6 +59,8 @@ public class DistCpConstants {
"distcp.preserve.rawxattrs"; "distcp.preserve.rawxattrs";
public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders"; public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
public static final String CONF_LABEL_TRACK_MISSING =
"distcp.track.missing.source";
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads"; public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
Expand Down Expand Up @@ -148,4 +157,13 @@ public class DistCpConstants {
static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp"; static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";


public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024; public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;

/** Filename of sorted files in when tracking saves them. */
public static final String SOURCE_SORTED_FILE = "source_sorted.seq";

/** Filename of unsorted target listing. */
public static final String TARGET_LISTING_FILE = "target_listing.seq";

/** Filename of sorted target listing. */
public static final String TARGET_SORTED_FILE = "target_sorted.seq";
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.tools; package org.apache.hadoop.tools;


import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;


/** /**
Expand Down Expand Up @@ -63,17 +64,32 @@ public enum DistCpOptionSwitch {
*/ */
SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
new Option("update", false, "Update target, copying only missing" + new Option("update", false, "Update target, copying only missing" +
"files or directories")), " files or directories")),


/** /**
* Deletes missing files in target that are missing from source * Deletes missing files in target that are missing from source.
* This allows the target to be in sync with the source contents * This allows the target to be in sync with the source contents
* Typically used in conjunction with SYNC_FOLDERS * Typically used in conjunction with SYNC_FOLDERS
* Incompatible with ATOMIC_COMMIT * Incompatible with ATOMIC_COMMIT
*/ */
DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING, DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
new Option("delete", false, "Delete from target, " + new Option("delete", false, "Delete from target, " +
"files missing in source. Delete is applicable only with update or overwrite options")), "files missing in source. Delete is applicable only with update or overwrite options")),

/**
* Track missing files in target that are missing from source
* This allows for other applications to complete the synchronization,
* possibly with object-store-specific delete algorithms.
* Typically used in conjunction with SYNC_FOLDERS
* Incompatible with ATOMIC_COMMIT
*/
@InterfaceStability.Unstable
TRACK_MISSING(DistCpConstants.CONF_LABEL_TRACK_MISSING,
new Option("xtrack", true,
"Save information about missing source files to the"
+ " specified directory")),


/** /**
* Number of threads for building source file listing (before map-reduce * Number of threads for building source file listing (before map-reduce
* phase, max one listStatus per thread at a time). * phase, max one listStatus per thread at a time).
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
Expand All @@ -43,6 +45,8 @@
* *
* This class is immutable. * This class is immutable.
*/ */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class DistCpOptions { public final class DistCpOptions {
private static final Logger LOG = LoggerFactory.getLogger(Builder.class); private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
public static final int MAX_NUM_LISTSTATUS_THREADS = 40; public static final int MAX_NUM_LISTSTATUS_THREADS = 40;
Expand All @@ -68,6 +72,9 @@ public final class DistCpOptions {
/** Whether source and target folder contents be sync'ed up. */ /** Whether source and target folder contents be sync'ed up. */
private final boolean syncFolder; private final boolean syncFolder;


/** Path to save source/dest sequence files to, if non-null. */
private final Path trackPath;

/** Whether files only present in target should be deleted. */ /** Whether files only present in target should be deleted. */
private boolean deleteMissing; private boolean deleteMissing;


Expand Down Expand Up @@ -208,6 +215,7 @@ private DistCpOptions(Builder builder) {


this.copyBufferSize = builder.copyBufferSize; this.copyBufferSize = builder.copyBufferSize;
this.verboseLog = builder.verboseLog; this.verboseLog = builder.verboseLog;
this.trackPath = builder.trackPath;
} }


public Path getSourceFileListing() { public Path getSourceFileListing() {
Expand Down Expand Up @@ -331,6 +339,10 @@ public boolean shouldVerboseLog() {
return verboseLog; return verboseLog;
} }


public Path getTrackPath() {
return trackPath;
}

/** /**
* Add options to configuration. These will be used in the Mapper/committer * Add options to configuration. These will be used in the Mapper/committer
* *
Expand Down Expand Up @@ -371,6 +383,11 @@ public void appendToConf(Configuration conf) {
String.valueOf(copyBufferSize)); String.valueOf(copyBufferSize));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG,
String.valueOf(verboseLog)); String.valueOf(verboseLog));
if (trackPath != null) {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING,
String.valueOf(trackPath));
}

} }


/** /**
Expand Down Expand Up @@ -441,6 +458,7 @@ public static class Builder {
private String filtersFile; private String filtersFile;


private Path logPath; private Path logPath;
private Path trackPath;
private String copyStrategy = DistCpConstants.UNIFORMSIZE; private String copyStrategy = DistCpConstants.UNIFORMSIZE;


private int numListstatusThreads = 0; // 0 indicates that flag is not set. private int numListstatusThreads = 0; // 0 indicates that flag is not set.
Expand Down Expand Up @@ -641,6 +659,11 @@ public Builder withLogPath(Path newLogPath) {
return this; return this;
} }


public Builder withTrackMissing(Path path) {
this.trackPath = path;
return this;
}

public Builder withCopyStrategy(String newCopyStrategy) { public Builder withCopyStrategy(String newCopyStrategy) {
this.copyStrategy = newCopyStrategy; this.copyStrategy = newCopyStrategy;
return this; return this;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public static DistCpOptions parse(String[] args)
builder.withAtomicWorkPath(new Path(workPath)); builder.withAtomicWorkPath(new Path(workPath));
} }
} }
if (command.hasOption(DistCpOptionSwitch.TRACK_MISSING.getSwitch())) {
builder.withTrackMissing(
new Path(getVal(
command,
DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
}


if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try { try {
Expand Down
Loading

0 comments on commit 1976e00

Please sign in to comment.