Skip to content

Commit

Permalink
Support overwrite option in createFile
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Support overwrite option in createFile
### Why are the changes needed?

before this change, if we have the same name file existing in Alluxio,
we will try `getStatus`, `deleteFile`, and then create the new one. now
we just need to call `createFile` with overwrite option. no matter in
hdfs api or s3 api. Excessive RPCs are saved.

### Does this PR introduce any user facing changes?
add a new option in CreateFileOption for overwriting.

pr-link: #16886
change-id: cid-5b84132d9c4da731b7d1bbf35d71885052e8c5b0
  • Loading branch information
Jackson-Wang-7 committed Feb 21, 2023
1 parent d28e933 commit 16ff653
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -172,29 +171,14 @@ public FSDataOutputStream create(Path path, FsPermission permission, boolean ove

AlluxioURI uri = getAlluxioPath(path);
CreateFilePOptions options = CreateFilePOptions.newBuilder().setBlockSizeBytes(blockSize)
.setMode(new Mode(permission.toShort()).toProto()).setRecursive(true).build();
.setMode(new Mode(permission.toShort()).toProto()).setRecursive(true)
.setOverwrite(overwrite).build();

FileOutStream outStream;
try {
outStream = mFileSystem.createFile(uri, options);
} catch (AlluxioException e) {
//now we should consider the override parameter
try {
if (mFileSystem.exists(uri)) {
if (!overwrite) {
throw new IOException(
"Not allowed to create() (overwrite=false) for existing Alluxio path: " + uri);
}
if (mFileSystem.getStatus(uri).isFolder()) {
throw new IOException(MessageFormat
.format("{0} already exists. Directories cannot be overwritten with create", uri));
}
mFileSystem.delete(uri);
}
outStream = mFileSystem.createFile(uri, options);
} catch (AlluxioException e2) {
throw new IOException(e2);
}
throw new IOException(e);
}
return new FSDataOutputStream(outStream, mStatistics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import alluxio.client.file.URIStatus;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.FileAlreadyExistsException;
import alluxio.util.ConfigurationUtils;
import alluxio.wire.BlockInfo;
Expand Down Expand Up @@ -665,13 +666,15 @@ public void createWithoutOverwrite() throws Exception {
when(alluxioFs.exists(new AlluxioURI(HadoopUtils.getPathWithoutScheme(path))))
.thenReturn(true);
when(alluxioFs.createFile(eq(new AlluxioURI(HadoopUtils.getPathWithoutScheme(path))), any()))
.thenThrow(new FileAlreadyExistsException(path.toString()));
.thenThrow(new FileAlreadyExistsException(
ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(path.toString())));

try (FileSystem alluxioHadoopFs = new FileSystem(alluxioFs)) {
alluxioHadoopFs.create(path, false, 100, (short) 1, 1000);
fail("create() of existing file is expected to fail");
} catch (IOException e) {
assertEquals("Not allowed to create() (overwrite=false) for existing Alluxio path: " + path,
assertEquals("alluxio.exception.FileAlreadyExistsException: "
+ ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(path),
e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public enum ExceptionMessage {
ROOT_CANNOT_BE_RENAMED("The root directory cannot be renamed"),
JOURNAL_ENTRY_MISSING(
"Journal entries are missing between sequence number {0} (inclusive) and {1} (exclusive)."),
CANNOT_OVERWRITE_DIRECTORY("{0} already exists. Directories cannot be overwritten with create"),
CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE("{0} already exists. If you want to overwrite the file,"
+ " you need to specify the overwrite option."),

// block master
NO_WORKER_FOUND("No worker with workerId {0,number,#} is found"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,7 @@ public FileInfo createFile(AlluxioURI path, CreateFileContext context)
// Check if ufs is writable
checkUfsMode(path, OperationType.WRITE);
}
deleteFileIfOverwrite(rpcContext, inodePath, context);
createFileInternal(rpcContext, inodePath, context);
auditContext.setSrcInode(inodePath.getInode()).setSucceeded(true);
cacheOperation(context);
Expand All @@ -1878,6 +1879,41 @@ public FileInfo createFile(AlluxioURI path, CreateFileContext context)
}
}

/**
* @param rpcContext the rpc context
* @param inodePath the path to be created
* @param context the method context
*/
private void deleteFileIfOverwrite(RpcContext rpcContext, LockedInodePath inodePath,
CreateFileContext context)
throws FileDoesNotExistException, IOException, InvalidPathException,
FileAlreadyExistsException {
if (inodePath.fullPathExists()) {
Inode currentInode = inodePath.getInode();
if (!context.getOptions().hasOverwrite() || !context.getOptions().getOverwrite()) {
throw new FileAlreadyExistsException(
ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(
inodePath.getUri()));
}
// if the fullpath is a file and the option is to overwrite, delete it
if (currentInode.isDirectory()) {
throw new FileAlreadyExistsException(
ExceptionMessage.CANNOT_OVERWRITE_DIRECTORY.getMessage(inodePath.getUri()));
} else {
try {
deleteInternal(rpcContext, inodePath, DeleteContext.mergeFrom(
DeletePOptions.newBuilder().setRecursive(true)
.setAlluxioOnly(!context.isPersisted())), true);
inodePath.removeLastInode();
} catch (DirectoryNotEmptyException e) {
// Should not reach here
throw new InvalidPathException(
ExceptionMessage.CANNOT_OVERWRITE_DIRECTORY.getMessage(inodePath.getUri()));
}
}
}
}

/**
* @param rpcContext the rpc context
* @param inodePath the path to be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -105,6 +106,34 @@ public void createFileUsesOperationTime() throws Exception {
assertEquals(100, info.getLastAccessTimeMs());
}

@Test
public void createFileWithOverwrite() throws Exception {
AlluxioURI path = new AlluxioURI("/test");
mFileSystemMaster.createFile(path, CreateFileContext.defaults());
// create without overwrite
Exception e = assertThrows(FileAlreadyExistsException.class, () -> {
mFileSystemMaster.createFile(path, CreateFileContext.defaults());
});
assertTrue(e.getMessage()
.contains(ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(path)));

// create with overwrite
CreateFileContext createFileContextWithOverwrite = CreateFileContext.defaults();
createFileContextWithOverwrite.getOptions().setOverwrite(true);
mFileSystemMaster.createFile(path, createFileContextWithOverwrite);
FileInfo info = mFileSystemMaster.getFileInfo(path, GetStatusContext.defaults());

// overwrite an existed directory
AlluxioURI testpath = new AlluxioURI("/test2");
mFileSystemMaster.createDirectory(testpath, CreateDirectoryContext.defaults());

e = assertThrows(FileAlreadyExistsException.class, () -> {
mFileSystemMaster.createFile(testpath, createFileContextWithOverwrite);
});
assertTrue(e.getMessage()
.contains(ExceptionMessage.CANNOT_OVERWRITE_DIRECTORY.getMessage(testpath)));
}

/**
* Tests the {@link FileSystemMaster#delete(AlluxioURI, DeleteContext)} method.
*/
Expand Down
15 changes: 4 additions & 11 deletions core/server/proxy/src/main/java/alluxio/proxy/s3/S3ObjectTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ public Response continueTask() {
.setWriteType(S3RestUtils.getS3WriteType())
.putAllXattr(xattrMap)
.setXattrPropStrat(XAttrPropagationStrategy.LEAF_NODE)
.setOverwrite(true)
.build();

try {
Expand Down Expand Up @@ -560,11 +561,6 @@ public Response createObject(String objectPath, FileSystem userFs,
} else {
toRead = Long.parseLong(contentLength);
}
try {
S3RestUtils.deleteExistObject(userFs, objectUri);
} catch (IOException | AlluxioException e) {
throw S3RestUtils.toObjectS3Exception(e, objectUri.getPath(), auditContext);
}
FileOutStream os = userFs.createFile(objectUri, createFilePOptions);
try (DigestOutputStream digestOutputStream = new DigestOutputStream(os, md5)) {
long read = ByteStreams.copy(ByteStreams.limit(readStream, toRead),
Expand Down Expand Up @@ -657,11 +653,6 @@ public String copyObject(FileSystem userFs, S3AuditContext auditContext,
throw new S3Exception("Copying an object to itself invalid.",
targetPath, S3ErrorCode.INVALID_REQUEST);
}
try {
S3RestUtils.deleteExistObject(userFs, objectUri);
} catch (IOException | AlluxioException e) {
throw S3RestUtils.toObjectS3Exception(e, objectUri.getPath(), auditContext);
}
try (FileInStream in = userFs.openFile(new AlluxioURI(sourcePath));
FileOutStream out = userFs.createFile(objectUri, copyFilePOption)) {
MessageDigest md5 = MessageDigest.getInstance("MD5");
Expand Down Expand Up @@ -726,6 +717,7 @@ public Response continueTask() {
.setOtherBits(Bits.NONE).build())
.setWriteType(S3RestUtils.getS3WriteType())
.putAllXattr(xattrMap).setXattrPropStrat(XAttrPropagationStrategy.LEAF_NODE)
.setOverwrite(true)
.build();
return createObject(objectPath, userFs, filePOptions, auditContext);
}
Expand Down Expand Up @@ -794,7 +786,8 @@ public Response continueTask() {
.setMode(PMode.newBuilder()
.setOwnerBits(Bits.ALL)
.setGroupBits(Bits.ALL)
.setOtherBits(Bits.NONE).build());
.setOtherBits(Bits.NONE).build())
.setOverwrite(true);
String entityTag = copyObject(userFs, auditContext, objectPath,
copySource, copyFilePOptionsBuilder.build());
return new CopyPartResult(entityTag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ public Response createObjectOrUploadPart(@HeaderParam("Content-MD5") final Strin
.setOtherBits(Bits.NONE).build())
.setWriteType(S3RestUtils.getS3WriteType())
.putAllXattr(xattrMap).setXattrPropStrat(XAttrPropagationStrategy.LEAF_NODE)
.setOverwrite(true)
.build();

// not copying from an existing file
Expand All @@ -838,11 +839,6 @@ public Response createObjectOrUploadPart(@HeaderParam("Content-MD5") final Strin
} else {
toRead = Long.parseLong(contentLength);
}
try {
S3RestUtils.deleteExistObject(userFs, objectUri);
} catch (IOException | AlluxioException e) {
throw S3RestUtils.toObjectS3Exception(e, objectUri.getPath(), auditContext);
}
FileOutStream os = userFs.createFile(objectUri, filePOptions);
try (DigestOutputStream digestOutputStream = new DigestOutputStream(os, md5)) {
long read = ByteStreams.copy(ByteStreams.limit(readStream, toRead),
Expand Down Expand Up @@ -893,7 +889,8 @@ public Response createObjectOrUploadPart(@HeaderParam("Content-MD5") final Strin
.setMode(PMode.newBuilder()
.setOwnerBits(Bits.ALL)
.setGroupBits(Bits.ALL)
.setOtherBits(Bits.NONE).build());
.setOtherBits(Bits.NONE).build())
.setOverwrite(true);
// Handle metadata directive
if (metadataDirective == S3Constants.Directive.REPLACE
&& filePOptions.getXattrMap().containsKey(S3Constants.CONTENT_TYPE_XATTR_KEY)) {
Expand Down Expand Up @@ -940,11 +937,6 @@ public Response createObjectOrUploadPart(@HeaderParam("Content-MD5") final Strin
throw new S3Exception("Copying an object to itself invalid.",
objectPath, S3ErrorCode.INVALID_REQUEST);
}
try {
S3RestUtils.deleteExistObject(userFs, objectUri);
} catch (IOException | AlluxioException e) {
throw S3RestUtils.toObjectS3Exception(e, objectUri.getPath(), auditContext);
}
try (FileInStream in = userFs.openFile(new AlluxioURI(copySource));
FileOutStream out = userFs.createFile(objectUri, copyFilePOptionsBuilder.build())) {
MessageDigest md5 = MessageDigest.getInstance("MD5");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ message CreateFilePOptions {
optional int64 persistenceWaitTime = 10;
map<string, bytes> xattr = 11;
optional XAttrPropagationStrategy xattrPropStrat = 12 [default = NEW_PATHS];
optional bool overwrite = 13;
}
message CreateFilePRequest {
/** the path of the file */
Expand Down
5 changes: 5 additions & 0 deletions core/transport/src/main/proto/proto.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2449,6 +2449,11 @@
"value": "NEW_PATHS"
}
]
},
{
"id": 13,
"name": "overwrite",
"type": "bool"
}
],
"maps": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.ExceptionMessage;
import alluxio.grpc.DeletePOptions;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
Expand Down Expand Up @@ -196,7 +197,8 @@ public void copyFromLocalOverwrite() throws Exception {
String[] cmd2 = {"copyFromLocal", testFile2.getPath(), alluxioFilePath.getPath()};
Assert.assertEquals(-1, sFsShell.run(cmd2));
Assert.assertThat(mOutput.toString(), containsString(
"Not allowed to create file because path already exists: " + alluxioFilePath.getPath()));
ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(
alluxioFilePath.getPath())));
// Make sure the original file is intact
Assert.assertTrue(BufferUtils
.equalIncreasingByteArray(LEN1, readContent(alluxioFilePath, LEN1)));
Expand Down Expand Up @@ -235,15 +237,16 @@ public void copyFromLocalMustCacheThenCacheThrough() throws Exception {
File file = mTestFolder.newFile();
try (Closeable c = new ConfigurationRule(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT,
WriteType.MUST_CACHE.toString(), Configuration.modifiableGlobal()).toResource()) {
Assert.assertEquals(0, sFsShell.run("copyFromLocal", file.getAbsolutePath(), "/"));
Assert.assertEquals(0, sFsShell.run("copyFromLocal", file.getAbsolutePath(), "/test"));
}
try (Closeable c = new ConfigurationRule(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT,
WriteType.CACHE_THROUGH.toString(), Configuration.modifiableGlobal()).toResource()) {
mOutput.reset();
sFsShell.run("copyFromLocal", file.getAbsolutePath(), "/");
sFsShell.run("copyFromLocal", file.getAbsolutePath(), "/test");
}
Assert.assertThat(mOutput.toString(),
containsString("Not allowed to create file because path already exists"));
containsString(
ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage("/test")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.ExceptionMessage;
import alluxio.grpc.FileSystemMasterCommonPOptions;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
Expand Down Expand Up @@ -505,7 +506,8 @@ public void copyFromLocalOverwrite() throws Exception {
String[] cmd2 = {"cp", "file://" + testFile2.getPath(), alluxioFilePath.getPath()};
Assert.assertEquals(-1, sFsShell.run(cmd2));
Assert.assertThat(mOutput.toString(), containsString(
"Not allowed to create file because path already exists: " + alluxioFilePath.getPath()));
ExceptionMessage.CANNOT_OVERWRITE_FILE_WITHOUT_OVERWRITE.getMessage(
alluxioFilePath.getPath())));
// Make sure the original file is intact
Assert.assertTrue(BufferUtils
.equalIncreasingByteArray(LEN1, readContent(alluxioFilePath, LEN1)));
Expand Down

0 comments on commit 16ff653

Please sign in to comment.