Skip to content

Commit

Permalink
[ALLUXIO-3306] delay fuse open if file is not completed (#7807)
Browse files Browse the repository at this point in the history
* delay fuse open if file is not completed

* add MS to constants

* add unit test for delayed open()

* add more tests for open
  • Loading branch information
LuQQiu authored and calvinjia committed Aug 30, 2018
1 parent 68be156 commit 5c608d4
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 15 deletions.
Expand Up @@ -24,6 +24,8 @@
import alluxio.exception.InvalidPathException; import alluxio.exception.InvalidPathException;
import alluxio.security.authorization.Mode; import alluxio.security.authorization.Mode;
import alluxio.security.group.provider.ShellBasedUnixGroupsMapping; import alluxio.security.group.provider.ShellBasedUnixGroupsMapping;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
Expand All @@ -50,6 +52,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeoutException;


import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;


Expand All @@ -63,6 +66,7 @@ final class AlluxioFuseFileSystem extends FuseStubFS {
private static final Logger LOG = LoggerFactory.getLogger(AlluxioFuseFileSystem.class); private static final Logger LOG = LoggerFactory.getLogger(AlluxioFuseFileSystem.class);


private static final int MAX_OPEN_FILES = Integer.MAX_VALUE; private static final int MAX_OPEN_FILES = Integer.MAX_VALUE;
private static final int MAX_OPEN_WAITTIME_MS = 5000;
private static final long UID = AlluxioFuseUtils.getUid(System.getProperty("user.name")); private static final long UID = AlluxioFuseUtils.getUid(System.getProperty("user.name"));
private static final long GID = AlluxioFuseUtils.getGid(System.getProperty("user.name")); private static final long GID = AlluxioFuseUtils.getGid(System.getProperty("user.name"));
private final boolean mIsShellGroupMapping; private final boolean mIsShellGroupMapping;
Expand Down Expand Up @@ -383,6 +387,11 @@ public int open(String path, FuseFileInfo fi) {
return -ErrorCodes.EISDIR(); return -ErrorCodes.EISDIR();
} }


if (!status.isCompleted() && !waitForFileCompleted(uri)) {
LOG.error("File {} has not completed", uri);
return -ErrorCodes.EFAULT();
}

synchronized (mOpenFiles) { synchronized (mOpenFiles) {
if (mOpenFiles.size() == MAX_OPEN_FILES) { if (mOpenFiles.size() == MAX_OPEN_FILES) {
LOG.error("Cannot open {}: too many open files", uri); LOG.error("Cannot open {}: too many open files", uri);
Expand Down Expand Up @@ -762,6 +771,30 @@ private int rmInternal(String path, boolean mustBeFile) {
return 0; return 0;
} }


/**
* Waits for the file to complete before opening it.
*
* @param uri the file path to check
* @return whether the file is completed or not
*/
private boolean waitForFileCompleted(AlluxioURI uri) {
try {
CommonUtils.waitFor("file completed", () -> {
try {
return mFileSystem.getStatus(uri).isCompleted();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, WaitForOptions.defaults().setTimeoutMs(MAX_OPEN_WAITTIME_MS));
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
} catch (TimeoutException te) {
return false;
}
}

/** /**
* Exposed for testing. * Exposed for testing.
*/ */
Expand Down
Expand Up @@ -17,7 +17,9 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -163,33 +165,54 @@ public void mkDir() throws Exception {
} }


@Test @Test
public void open() throws Exception { public void openWithoutDelay() throws Exception {
// mocks set-up
AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar"); AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar");
FileInfo fi = new FileInfo(); setUpOpenMock(expectedPath);
fi.setFolder(false);
URIStatus status = new URIStatus(fi);


when(mFileSystem.exists(expectedPath)).thenReturn(true); mFuseFs.open("/foo/bar", mFileInfo);
when(mFileSystem.getStatus(expectedPath)).thenReturn(status); verify(mFileSystem).exists(expectedPath);
mFileInfo.flags.set(O_RDONLY.intValue()); verify(mFileSystem).getStatus(expectedPath);
verify(mFileSystem).openFile(expectedPath);
}

@Test
public void incompleteFileCannotOpen() throws Exception {
AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar");
FileInfo fi = setUpOpenMock(expectedPath);
fi.setCompleted(false);


// actual test
mFuseFs.open("/foo/bar", mFileInfo); mFuseFs.open("/foo/bar", mFileInfo);
verify(mFileSystem).exists(expectedPath); verify(mFileSystem).exists(expectedPath);
verify(mFileSystem, atLeast(100)).getStatus(expectedPath);
verify(mFileSystem, never()).openFile(expectedPath);
}

@Test
public void openWithDelay() throws Exception {
AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar");
FileInfo fi = setUpOpenMock(expectedPath);
fi.setCompleted(false);

// Use another thread to open file so that
// we could change the file status when opening it
Thread t = new Thread(() -> mFuseFs.open("/foo/bar", mFileInfo));
t.start();
Thread.sleep(1000);
// If the file exists but is not completed, we will wait for the file to complete
verify(mFileSystem).exists(expectedPath);
verify(mFileSystem, atLeast(10)).getStatus(expectedPath);
verify(mFileSystem, never()).openFile(expectedPath);

fi.setCompleted(true);
t.join();
verify(mFileSystem).openFile(expectedPath); verify(mFileSystem).openFile(expectedPath);
} }


@Test @Test
public void read() throws Exception { public void read() throws Exception {
// mocks set-up // mocks set-up
AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar"); AlluxioURI expectedPath = BASE_EXPECTED_URI.join("/foo/bar");
FileInfo fi = new FileInfo(); setUpOpenMock(expectedPath);
fi.setFolder(false);
URIStatus status = new URIStatus(fi);

when(mFileSystem.exists(expectedPath)).thenReturn(true);
when(mFileSystem.getStatus(expectedPath)).thenReturn(status);


FileInStream fakeInStream = mock(FileInStream.class); FileInStream fakeInStream = mock(FileInStream.class);
when(fakeInStream.read(any(byte[].class), anyInt(), anyInt())).then(new Answer<Integer>() { when(fakeInStream.read(any(byte[].class), anyInt(), anyInt())).then(new Answer<Integer>() {
Expand Down Expand Up @@ -290,4 +313,21 @@ private FuseFileInfo allocateNativeFileInfo() {
final Pointer pt = runtime.getMemoryManager().allocateTemporary(36, true); final Pointer pt = runtime.getMemoryManager().allocateTemporary(36, true);
return FuseFileInfo.of(pt); return FuseFileInfo.of(pt);
} }

/**
* Sets up mock for open() operation.
*
* @param uri the path to run operations on
* @return the file information
*/
private FileInfo setUpOpenMock(AlluxioURI uri) throws Exception {
FileInfo fi = new FileInfo();
fi.setCompleted(true);
fi.setFolder(false);
URIStatus status = new URIStatus(fi);

when(mFileSystem.exists(uri)).thenReturn(true);
when(mFileSystem.getStatus(uri)).thenReturn(status);
return fi;
}
} }

0 comments on commit 5c608d4

Please sign in to comment.