diff --git a/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuseFileSystem.java b/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuseFileSystem.java index 1139b66e9cb0..fc716a6eeadf 100644 --- a/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuseFileSystem.java +++ b/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuseFileSystem.java @@ -24,6 +24,8 @@ import alluxio.exception.InvalidPathException; import alluxio.security.authorization.Mode; import alluxio.security.group.provider.ShellBasedUnixGroupsMapping; +import alluxio.util.CommonUtils; +import alluxio.util.WaitForOptions; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -50,6 +52,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.ThreadSafe; @@ -63,6 +66,7 @@ final class AlluxioFuseFileSystem extends FuseStubFS { private static final Logger LOG = LoggerFactory.getLogger(AlluxioFuseFileSystem.class); private static final int MAX_OPEN_FILES = Integer.MAX_VALUE; + private static final int MAX_OPEN_WAITTIME_MS = 5000; private static final long UID = AlluxioFuseUtils.getUid(System.getProperty("user.name")); private static final long GID = AlluxioFuseUtils.getGid(System.getProperty("user.name")); private final boolean mIsShellGroupMapping; @@ -383,6 +387,11 @@ public int open(String path, FuseFileInfo fi) { return -ErrorCodes.EISDIR(); } + if (!status.isCompleted() && !waitForFileCompleted(uri)) { + LOG.error("File {} has not completed", uri); + return -ErrorCodes.EFAULT(); + } + synchronized (mOpenFiles) { if (mOpenFiles.size() == MAX_OPEN_FILES) { LOG.error("Cannot open {}: too many open files", uri); @@ -762,6 +771,30 @@ private int rmInternal(String path, boolean mustBeFile) { return 0; } + /** + * Waits for the file to complete before opening it. + * + * @param uri the file path to check + * @return whether the file is completed or not + */ + private boolean waitForFileCompleted(AlluxioURI uri) { + try { + CommonUtils.waitFor("file completed", () -> { + try { + return mFileSystem.getStatus(uri).isCompleted(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, WaitForOptions.defaults().setTimeoutMs(MAX_OPEN_WAITTIME_MS)); + return true; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } catch (TimeoutException te) { + return false; + } + } + /** * Exposed for testing. */ diff --git a/integration/fuse/src/test/java/alluxio/fuse/AlluxioFuseFileSystemTest.java b/integration/fuse/src/test/java/alluxio/fuse/AlluxioFuseFileSystemTest.java index 6059ef569f1f..d08a86542cf3 100644 --- a/integration/fuse/src/test/java/alluxio/fuse/AlluxioFuseFileSystemTest.java +++ b/integration/fuse/src/test/java/alluxio/fuse/AlluxioFuseFileSystemTest.java @@ -17,7 +17,9 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; @@ -163,20 +165,46 @@ public void mkDir() throws Exception { } @Test - public void open() throws Exception { - // mocks set-up + public void openWithoutDelay() throws Exception { AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar"); - FileInfo fi = new FileInfo(); - fi.setFolder(false); - URIStatus status = new URIStatus(fi); + setUpOpenMock(expectedPath); - when(mFileSystem.exists(expectedPath)).thenReturn(true); - when(mFileSystem.getStatus(expectedPath)).thenReturn(status); - mFileInfo.flags.set(O_RDONLY.intValue()); + mFuseFs.open("/foo/bar", mFileInfo); + verify(mFileSystem).exists(expectedPath); + verify(mFileSystem).getStatus(expectedPath); + verify(mFileSystem).openFile(expectedPath); + } + + @Test + public void incompleteFileCannotOpen() throws Exception { + AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar"); + FileInfo fi = setUpOpenMock(expectedPath); + fi.setCompleted(false); - // actual test mFuseFs.open("/foo/bar", mFileInfo); verify(mFileSystem).exists(expectedPath); + verify(mFileSystem, atLeast(100)).getStatus(expectedPath); + verify(mFileSystem, never()).openFile(expectedPath); + } + + @Test + public void openWithDelay() throws Exception { + AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar"); + FileInfo fi = setUpOpenMock(expectedPath); + fi.setCompleted(false); + + // Use another thread to open file so that + // we could change the file status when opening it + Thread t = new Thread(() -> mFuseFs.open("/foo/bar", mFileInfo)); + t.start(); + Thread.sleep(1000); + // If the file exists but is not completed, we will wait for the file to complete + verify(mFileSystem).exists(expectedPath); + verify(mFileSystem, atLeast(10)).getStatus(expectedPath); + verify(mFileSystem, never()).openFile(expectedPath); + + fi.setCompleted(true); + t.join(); verify(mFileSystem).openFile(expectedPath); } @@ -184,12 +212,7 @@ public void open() throws Exception { public void read() throws Exception { // mocks set-up AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar"); - FileInfo fi = new FileInfo(); - fi.setFolder(false); - URIStatus status = new URIStatus(fi); - - when(mFileSystem.exists(expectedPath)).thenReturn(true); - when(mFileSystem.getStatus(expectedPath)).thenReturn(status); + setUpOpenMock(expectedPath); FileInStream fakeInStream = mock(FileInStream.class); when(fakeInStream.read(any(byte[].class), anyInt(), anyInt())).then(new Answer() { @@ -290,4 +313,21 @@ private FuseFileInfo allocateNativeFileInfo() { final Pointer pt = runtime.getMemoryManager().allocateTemporary(36, true); return FuseFileInfo.of(pt); } + + /** + * Sets up mock for open() operation. + * + * @param uri the path to run operations on + * @return the file information + */ + private FileInfo setUpOpenMock(AlluxioURI uri) throws Exception { + FileInfo fi = new FileInfo(); + fi.setCompleted(true); + fi.setFolder(false); + URIStatus status = new URIStatus(fi); + + when(mFileSystem.exists(uri)).thenReturn(true); + when(mFileSystem.getStatus(uri)).thenReturn(status); + return fi; + } }