Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-2049 Feed Replication with Empty Directories are failing
Author: bvellanki <bvellanki@hortonworks.com>

Reviewers: "Venkat Ranganathan <venkat@hortonworks.com>, Ying Zheng <yzheng@hortonworks.com>, Peeyush B <peeyushb@apache.org>, Pallavi Rao <pallavi.rao@inmobi.com>"

Closes #204 from bvellanki/FALCON-2049

(cherry picked from commit b135f28)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>
  • Loading branch information
bvellanki committed Jul 1, 2016
1 parent 1fabb3f commit defc3309ad69ac26529b5ca3a2f6ce7969150d58
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
@@ -52,6 +52,11 @@
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-test-util</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
@@ -184,12 +184,13 @@ protected CommandLine getCommand(String[] args) throws ParseException {
return new GnuParser().parse(options, args);
}

protected DistCpOptions getDistCpOptions(CommandLine cmd) {
protected DistCpOptions getDistCpOptions(CommandLine cmd) throws FalconException, IOException {
String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
List<Path> srcPaths = getPaths(paths);
String trgPath = cmd.getOptionValue("targetPath").trim();
String targetPathString = cmd.getOptionValue("targetPath").trim();
Path targetPath = new Path(targetPathString);

DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(trgPath));
DistCpOptions distcpOptions = new DistCpOptions(srcPaths, targetPath);
distcpOptions.setBlocking(true);
distcpOptions.setMaxMaps(Integer.parseInt(cmd.getOptionValue("maxMaps")));
distcpOptions.setMapBandwidth(Integer.parseInt(cmd.getOptionValue("mapBandwidth")));
@@ -214,8 +215,16 @@ protected DistCpOptions getDistCpOptions(CommandLine cmd) {
// Removing deleted files by default - FALCON-1844
String removeDeletedFiles = cmd.getOptionValue(
ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), "true");
distcpOptions.setDeleteMissing(Boolean.parseBoolean(removeDeletedFiles));

boolean deleteMissing = Boolean.parseBoolean(removeDeletedFiles);
distcpOptions.setDeleteMissing(deleteMissing);
if (deleteMissing) {
// DistCP will fail with InvalidInputException if deleteMissing is set to true and
// if targetPath does not exist. Create targetPath to avoid failures.
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(targetPath.toUri(), getConf());
if (!fs.exists(targetPath)) {
fs.mkdirs(targetPath);
}
}

String preserveBlockSize = cmd.getOptionValue(
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName());
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.replication;

import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.commons.cli.CommandLine;
import org.apache.falcon.entity.Storage;
import org.apache.hadoop.fs.Path;
@@ -32,6 +33,8 @@
*/
public class FeedReplicatorTest {

private String defaultPath = "jail://FeedReplicatorTest:00/tmp";

@Test
public void testArguments() throws Exception {
/*
@@ -42,21 +45,26 @@ public void testArguments() throws Exception {
* <arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg>
* <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
*/

// creates jailed cluster in which DistCpOtions command can be tested.
EmbeddedCluster cluster = EmbeddedCluster.newCluster("FeedReplicatorTest");

final String[] args = {
"true",
"-maxMaps", "3",
"-mapBandwidth", "4",
"-sourcePaths", "hdfs://localhost:8020/tmp/",
"-targetPath", "hdfs://localhost1:8020/tmp/",
"-sourcePaths", defaultPath,
"-targetPath", defaultPath,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
};

FeedReplicator replicator = new FeedReplicator();
CommandLine cmd = replicator.getCommand(args);
replicator.setConf(cluster.getConf());
DistCpOptions options = replicator.getDistCpOptions(cmd);

List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
srcPaths.add(new Path(defaultPath));
validateMandatoryArguments(options, srcPaths, true);
Assert.assertTrue(options.shouldDeleteMissing());
}
@@ -82,8 +90,8 @@ public void testOptionalArguments() throws Exception {
"true",
"-maxMaps", "3",
"-mapBandwidth", "4",
"-sourcePaths", "hdfs://localhost:8020/tmp/",
"-targetPath", "hdfs://localhost1:8020/tmp/",
"-sourcePaths", defaultPath,
"-targetPath", defaultPath,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
"-overwrite", "true",
"-ignoreErrors", "false",
@@ -99,7 +107,7 @@ public void testOptionalArguments() throws Exception {
DistCpOptions options = replicator.getDistCpOptions(cmd);

List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
srcPaths.add(new Path(defaultPath));
validateMandatoryArguments(options, srcPaths, false);
validateOptionalArguments(options);
}
@@ -108,7 +116,7 @@ private void validateMandatoryArguments(DistCpOptions options, List<Path> srcPat
Assert.assertEquals(options.getMaxMaps(), 3);
Assert.assertEquals(options.getMapBandwidth(), 4);
Assert.assertEquals(options.getSourcePaths(), srcPaths);
Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost1:8020/tmp/"));
Assert.assertEquals(options.getTargetPath(), new Path(defaultPath));
Assert.assertEquals(options.shouldSyncFolder(), shouldSyncFolder);
}

0 comments on commit defc330

Please sign in to comment.