Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public class FTPFileSystem extends FileSystem {
public static final String FS_FTP_DATA_CONNECTION_MODE =
"fs.ftp.data.connection.mode";
public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode";
public static final String E_SAME_DIRECTORY_ONLY =
"only same directory renames are supported";
public static final String FS_FTP_TIMEOUT = "fs.ftp.timeout";

private URI uri;
Expand Down Expand Up @@ -625,8 +623,8 @@ private boolean isFile(FTPClient client, Path file) {
}

/*
* Assuming that parent of both source and destination is the same. Is the
* assumption correct or it is suppose to work like 'move' ?
* The parent of source and destination can be different. It is suppose to
* work like 'move'
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
Expand Down Expand Up @@ -685,21 +683,15 @@ private boolean rename(FTPClient client, Path src, Path dst)
throw new FileAlreadyExistsException("Destination path " + dst
+ " already exists");
}
String parentSrc = absoluteSrc.getParent().toUri().toString();
String parentDst = absoluteDst.getParent().toUri().toString();
if (isParentOf(absoluteSrc, absoluteDst)) {
throw new IOException("Cannot rename " + absoluteSrc + " under itself"
+ " : "+ absoluteDst);
}

if (!parentSrc.equals(parentDst)) {
throw new IOException("Cannot rename source: " + absoluteSrc
+ " to " + absoluteDst
+ " -"+ E_SAME_DIRECTORY_ONLY);
Path dstParentPath = absoluteDst.getParent();
if (!exists(dstParentPath)) {
// when the parent path of dst don't exist, the path need to create.
mkdirs(dstParentPath);
}
String from = absoluteSrc.getName();
String to = absoluteDst.getName();
client.changeWorkingDirectory(parentSrc);

String from = absoluteSrc.toUri().getPath();
String to = absoluteDst.toUri().getPath();
boolean renamed = client.rename(from, to);
return renamed;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,15 @@ private DistCpConstants() {

public static final String CLASS_INSTANTIATION_ERROR_MSG =
"Unable to instantiate ";

/**
* The prefix of target temp file path.
*/
public static final String TARGET_TEMP_FILE_PREFIX_DOT = ".distcp.tmp.";

/**
* FTP filesystem can't be work well when the path with dot prefix,
* so the target tmp file path use the prefix without dot.
*/
public static final String TARGET_TEMP_FILE_PREFIX_FTP = "distcp.tmp.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ private void deleteAttemptTempFiles(Path targetWorkPath,
return;
}

String tempFilePrefix = DistCpUtils.getTargetTempFilePrefix(targetWorkPath);
FileStatus[] tempFiles = targetFS.globStatus(
new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*"));
new Path(targetWorkPath, tempFilePrefix
+ jobId.replaceAll("job", "attempt") + "*"));

if (tempFiles != null && tempFiles.length > 0) {
for (FileStatus file : tempFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,15 @@ private long doCopy(CopyListingFileStatus source, Path target,
throws IOException {
LOG.info("Copying {} to {}", source.getPath(), target);

final Configuration configuration = context.getConfiguration();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you just get the schema in getTempFile() from the path, no need to pass in FileSystem, so no need to move this up.

FileSystem targetFS = target.getFileSystem(configuration);
final boolean toAppend = action == FileAction.APPEND;
final boolean useTempTarget = !toAppend && !directWrite;
Path targetPath = useTempTarget ? getTempFile(target, context) : target;
Path targetPath = useTempTarget ? getTempFile(target, context, targetFS) : target;

LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary"
: "direct", targetPath);

final Configuration configuration = context.getConfiguration();
FileSystem targetFS = target.getFileSystem(configuration);

try {
final Path sourcePath = source.getPath();
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
Expand All @@ -161,6 +160,7 @@ private long doCopy(CopyListingFileStatus source, Path target,
if (useTempTarget) {
LOG.info("Renaming temporary target file path {} to {}", targetPath,
target);
target = new Path(target.toUri().getPath());
promoteTmpToTarget(targetPath, target, targetFS);
}
LOG.info("Completed writing {} ({} bytes)", target, bytesRead);
Expand Down Expand Up @@ -257,17 +257,18 @@ private void promoteTmpToTarget(Path tmpTarget, Path target, FileSystem fs)
}
}

private Path getTempFile(Path target, Mapper.Context context) {
private Path getTempFile(Path target, Mapper.Context context, FileSystem fileSystem) {
Path targetWorkPath = new Path(context.getConfiguration().
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));

Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent()
: targetWorkPath;
Path tempFile = new Path(root, ".distcp.tmp." +
String tempFilePrefix = DistCpUtils.getTargetTempFilePrefix(target);
Path tempFile = new Path(root, tempFilePrefix +
context.getTaskAttemptID().toString() +
"." + String.valueOf(System.currentTimeMillis()));
LOG.info("Creating temp file: {}", tempFile);
return tempFile;
return new Path(tempFile.toUri().getPath());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,4 +662,21 @@ public static Path getSplitChunkPath(Path targetFile,
+ ".____distcpSplit____" + srcFileStatus.getChunkOffset()
+ "." + srcFileStatus.getChunkLength());
}

/**
* Return the target temp file prefix.
*
* The FTPFilesystem can't work well when the file name is starts with dot.
*
* @param targetPath target path
* @return temp file path prefix
*/
public static String getTargetTempFilePrefix(Path targetPath) {
String schema = targetPath.toUri().getScheme();
if (StringUtils.equalsIgnoreCase("ftp", schema)) {
return DistCpConstants.TARGET_TEMP_FILE_PREFIX_FTP;
} else {
return DistCpConstants.TARGET_TEMP_FILE_PREFIX_DOT;
}
}
}