diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java index 6899bb8d87426..5876b53daa412 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java @@ -72,8 +72,6 @@ public class FTPFileSystem extends FileSystem { public static final String FS_FTP_DATA_CONNECTION_MODE = "fs.ftp.data.connection.mode"; public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode"; - public static final String E_SAME_DIRECTORY_ONLY = - "only same directory renames are supported"; public static final String FS_FTP_TIMEOUT = "fs.ftp.timeout"; private URI uri; @@ -625,8 +623,8 @@ private boolean isFile(FTPClient client, Path file) { } /* - * Assuming that parent of both source and destination is the same. Is the - * assumption correct or it is suppose to work like 'move' ? + * The parent of source and destination can be different. It is suppose to + * work like 'move' */ @Override public boolean rename(Path src, Path dst) throws IOException { @@ -685,21 +683,15 @@ private boolean rename(FTPClient client, Path src, Path dst) throw new FileAlreadyExistsException("Destination path " + dst + " already exists"); } - String parentSrc = absoluteSrc.getParent().toUri().toString(); - String parentDst = absoluteDst.getParent().toUri().toString(); - if (isParentOf(absoluteSrc, absoluteDst)) { - throw new IOException("Cannot rename " + absoluteSrc + " under itself" - + " : "+ absoluteDst); - } - if (!parentSrc.equals(parentDst)) { - throw new IOException("Cannot rename source: " + absoluteSrc - + " to " + absoluteDst - + " -"+ E_SAME_DIRECTORY_ONLY); + Path dstParentPath = absoluteDst.getParent(); + if (!exists(dstParentPath)) { + // when the parent path of dst don't exist, the path need to create. + mkdirs(dstParentPath); } - String from = absoluteSrc.getName(); - String to = absoluteDst.getName(); - client.changeWorkingDirectory(parentSrc); + + String from = absoluteSrc.toUri().getPath(); + String to = absoluteDst.toUri().getPath(); boolean renamed = client.rename(from, to); return renamed; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ftp/TestFTPContractRename.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ftp/TestFTPContractRename.java deleted file mode 100644 index fb8718bf24648..0000000000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ftp/TestFTPContractRename.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.contract.ftp; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractRenameTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; -import org.apache.hadoop.fs.ftp.FTPFileSystem; - -import java.io.IOException; - -public class TestFTPContractRename extends AbstractContractRenameTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new FTPContract(conf); - } - - /** - * Check the exception was about cross-directory renames - * -if not, rethrow it. - * @param e exception raised - * @throws IOException - */ - private void verifyUnsupportedDirRenameException(IOException e) throws IOException { - if (!e.toString().contains(FTPFileSystem.E_SAME_DIRECTORY_ONLY)) { - throw e; - } - } - - @Override - public void testRenameDirIntoExistingDir() throws Throwable { - try { - super.testRenameDirIntoExistingDir(); - fail("Expected a failure"); - } catch (IOException e) { - verifyUnsupportedDirRenameException(e); - } - } - - @Override - public void testRenameFileNonexistentDir() throws Throwable { - try { - super.testRenameFileNonexistentDir(); - fail("Expected a failure"); - } catch (IOException e) { - verifyUnsupportedDirRenameException(e); - } - } -} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index c75c0e85dd791..4af274897d01a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -192,4 +192,15 @@ private DistCpConstants() { public static final String CLASS_INSTANTIATION_ERROR_MSG = "Unable to instantiate "; + + /** + * The prefix of target temp file path. + */ + public static final String TARGET_TEMP_FILE_PREFIX_DOT = ".distcp.tmp."; + + /** + * FTP filesystem can't be work well when the path with dot prefix, + * so the target tmp file path use the prefix without dot. + */ + public static final String TARGET_TEMP_FILE_PREFIX_FTP = "distcp.tmp."; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 2272781f72476..d4f35a3b4973f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -170,8 +170,10 @@ private void deleteAttemptTempFiles(Path targetWorkPath, return; } + String tempFilePrefix = DistCpUtils.getTargetTempFilePrefix(targetWorkPath); FileStatus[] tempFiles = targetFS.globStatus( - new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*")); + new Path(targetWorkPath, tempFilePrefix + + jobId.replaceAll("job", "attempt") + "*")); if (tempFiles != null && tempFiles.length > 0) { for (FileStatus file : tempFiles) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 9a72c9d7dbfde..2d7223af31238 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -129,16 +129,15 @@ private long doCopy(CopyListingFileStatus source, Path target, throws IOException { LOG.info("Copying {} to {}", source.getPath(), target); + final Configuration configuration = context.getConfiguration(); + FileSystem targetFS = target.getFileSystem(configuration); final boolean toAppend = action == FileAction.APPEND; final boolean useTempTarget = !toAppend && !directWrite; - Path targetPath = useTempTarget ? getTempFile(target, context) : target; + Path targetPath = useTempTarget ? getTempFile(target, context, targetFS) : target; LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary" : "direct", targetPath); - final Configuration configuration = context.getConfiguration(); - FileSystem targetFS = target.getFileSystem(configuration); - try { final Path sourcePath = source.getPath(); final FileSystem sourceFS = sourcePath.getFileSystem(configuration); @@ -161,6 +160,7 @@ private long doCopy(CopyListingFileStatus source, Path target, if (useTempTarget) { LOG.info("Renaming temporary target file path {} to {}", targetPath, target); + target = new Path(target.toUri().getPath()); promoteTmpToTarget(targetPath, target, targetFS); } LOG.info("Completed writing {} ({} bytes)", target, bytesRead); @@ -257,17 +257,18 @@ private void promoteTmpToTarget(Path tmpTarget, Path target, FileSystem fs) } } - private Path getTempFile(Path target, Mapper.Context context) { + private Path getTempFile(Path target, Mapper.Context context, FileSystem fileSystem) { Path targetWorkPath = new Path(context.getConfiguration(). get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent() : targetWorkPath; - Path tempFile = new Path(root, ".distcp.tmp." + + String tempFilePrefix = DistCpUtils.getTargetTempFilePrefix(target); + Path tempFile = new Path(root, tempFilePrefix + context.getTaskAttemptID().toString() + "." + String.valueOf(System.currentTimeMillis())); LOG.info("Creating temp file: {}", tempFile); - return tempFile; + return new Path(tempFile.toUri().getPath()); } @VisibleForTesting diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index 1af434e19f823..84201d0fa3370 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -662,4 +662,21 @@ public static Path getSplitChunkPath(Path targetFile, + ".____distcpSplit____" + srcFileStatus.getChunkOffset() + "." + srcFileStatus.getChunkLength()); } + + /** + * Return the target temp file prefix. + * + * The FTPFilesystem can't work well when the file name is starts with dot. + * + * @param targetPath target path + * @return temp file path prefix + */ + public static String getTargetTempFilePrefix(Path targetPath) { + String schema = targetPath.toUri().getScheme(); + if (StringUtils.equalsIgnoreCase("ftp", schema)) { + return DistCpConstants.TARGET_TEMP_FILE_PREFIX_FTP; + } else { + return DistCpConstants.TARGET_TEMP_FILE_PREFIX_DOT; + } + } }