diff --git a/core/server/src/main/java/alluxio/worker/file/FileSystemWorker.java b/core/server/src/main/java/alluxio/worker/file/FileSystemWorker.java index bef2fcbc9d62..c432ae9c4605 100644 --- a/core/server/src/main/java/alluxio/worker/file/FileSystemWorker.java +++ b/core/server/src/main/java/alluxio/worker/file/FileSystemWorker.java @@ -151,9 +151,11 @@ public long createUfsFile(AlluxioURI ufsUri) throws FileAlreadyExistsException, * @param tempUfsFileId the worker specific temporary file id for the file in the under storage * @param position the absolute position in the file to position the stream at before returning * @return an input stream to the ufs file positioned at the given position + * @throws FileDoesNotExistException if the worker file id is invalid * @throws IOException if an error occurs interacting with the under file system */ - public InputStream getUfsInputStream(long tempUfsFileId, long position) throws IOException { + public InputStream getUfsInputStream(long tempUfsFileId, long position) + throws FileDoesNotExistException, IOException { return mUnderFileSystemManager.getInputStreamAtPosition(tempUfsFileId, position); } @@ -163,9 +165,9 @@ public InputStream getUfsInputStream(long tempUfsFileId, long position) throws I * * @param tempUfsFileId the worker specific temporary file id for the file in the under storage * @return the output stream writing the contents of the file - * @throws IOException if an error occurs interacting with the under file system + * @throws FileDoesNotExistException if the temporary file id is invalid */ - public OutputStream getUfsOutputStream(long tempUfsFileId) throws IOException { + public OutputStream getUfsOutputStream(long tempUfsFileId) throws FileDoesNotExistException { return mUnderFileSystemManager.getOutputStream(tempUfsFileId); } diff --git a/core/server/src/main/java/alluxio/worker/file/UnderFileSystemManager.java b/core/server/src/main/java/alluxio/worker/file/UnderFileSystemManager.java index 7c1fcfd5d764..0417611a59fb 100644 --- a/core/server/src/main/java/alluxio/worker/file/UnderFileSystemManager.java +++ b/core/server/src/main/java/alluxio/worker/file/UnderFileSystemManager.java @@ -234,19 +234,32 @@ public void completeFile(long tempUfsFileId) throws FileDoesNotExistException, I * @param tempUfsFileId the temporary ufs file id * @param position the absolute position in the file to start the stream at * @return the input stream to read from this file + * @throws FileDoesNotExistException if the worker file id not valid * @throws IOException if an error occurs when operating on the under file system */ public InputStream getInputStreamAtPosition(long tempUfsFileId, long position) - throws IOException { - return mInputStreams.get(tempUfsFileId).openAtPosition(position); + throws FileDoesNotExistException, IOException { + UnderFileSystemInputStream stream = mInputStreams.get(tempUfsFileId); + if (stream != null) { + return stream.openAtPosition(position); + } else { + throw new FileDoesNotExistException( + ExceptionMessage.BAD_WORKER_FILE_ID.getMessage(tempUfsFileId)); + } } /** * @param tempUfsFileId the temporary ufs file id * @return the output stream to write to this file */ - public OutputStream getOutputStream(long tempUfsFileId) { - return mOutputStreams.get(tempUfsFileId).getStream(); + public OutputStream getOutputStream(long tempUfsFileId) throws FileDoesNotExistException { + UnderFileSystemOutputStream stream = mOutputStreams.get(tempUfsFileId); + if (stream != null) { + return stream.getStream(); + } else { + throw new FileDoesNotExistException( + ExceptionMessage.BAD_WORKER_FILE_ID.getMessage(tempUfsFileId)); + } } /** diff --git a/core/server/src/main/java/alluxio/worker/netty/FileDataServerHandler.java b/core/server/src/main/java/alluxio/worker/netty/FileDataServerHandler.java index 29d7df8be1a9..7cda44da6054 100644 --- a/core/server/src/main/java/alluxio/worker/netty/FileDataServerHandler.java +++ b/core/server/src/main/java/alluxio/worker/netty/FileDataServerHandler.java @@ -108,8 +108,8 @@ public void handleFileWriteRequest(ChannelHandlerContext ctx, RPCFileWriteReques long length = req.getLength(); final DataBuffer data = req.getPayloadDataBuffer(); - OutputStream out = mWorker.getUfsOutputStream(ufsFileId); try { + OutputStream out = mWorker.getUfsOutputStream(ufsFileId); out.write(data.getReadOnlyByteBuffer().array()); RPCFileWriteResponse resp = new RPCFileWriteResponse(ufsFileId, offset, length, RPCResponse.Status.SUCCESS); diff --git a/core/server/src/test/java/alluxio/worker/file/UnderFileSystemManagerTest.java b/core/server/src/test/java/alluxio/worker/file/UnderFileSystemManagerTest.java index 16bf3215db2a..c1f4f97f7fed 100644 --- a/core/server/src/test/java/alluxio/worker/file/UnderFileSystemManagerTest.java +++ b/core/server/src/test/java/alluxio/worker/file/UnderFileSystemManagerTest.java @@ -18,6 +18,7 @@ import alluxio.underfs.UnderFileSystem; import alluxio.util.io.PathUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -29,6 +30,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.InputStream; import java.io.OutputStream; @RunWith(PowerMockRunner.class) @@ -40,6 +42,8 @@ public final class UnderFileSystemManagerTest { @Rule public final ExpectedException mThrown = ExpectedException.none(); + /** The mock input stream returned whenever a ufs file is read */ + private InputStream mMockInputStream; /** The mock output stream returned whenever a ufs file is created */ private OutputStream mMockOutputStream; /** The mock under file system client. */ @@ -49,7 +53,9 @@ public final class UnderFileSystemManagerTest { public void before() throws Exception { mMockUfs = Mockito.mock(UnderFileSystem.class); mMockOutputStream = Mockito.mock(OutputStream.class); + mMockInputStream = Mockito.mock(InputStream.class); Mockito.when(mMockUfs.create(Mockito.anyString())).thenReturn(mMockOutputStream); + Mockito.when(mMockUfs.open(Mockito.anyString())).thenReturn(mMockInputStream); PowerMockito.mockStatic(UnderFileSystem.class); BDDMockito.given(UnderFileSystem.get(Mockito.anyString(), Mockito.any(Configuration.class))) .willReturn(mMockUfs); @@ -133,7 +139,7 @@ public void openUfsFileTest() throws Exception { String uniqPath = PathUtils.uniqPath(); Mockito.when(mMockUfs.exists(uniqPath)).thenReturn(true); UnderFileSystemManager manager = new UnderFileSystemManager(); - long id = manager.openFile(new AlluxioURI(uniqPath)); + manager.openFile(new AlluxioURI(uniqPath)); Mockito.verify(mMockUfs).exists(uniqPath); } @@ -172,4 +178,65 @@ public void closeNonExistentUfsFileTest() throws Exception { mThrown.expect(FileDoesNotExistException.class); manager.closeFile(-1L); } + + /** + * Tests getting an output stream for a valid file returns the correct output stream. + */ + @Test + public void getOutputStreamTest() throws Exception { + String uniqPath = PathUtils.uniqPath(); + UnderFileSystemManager manager = new UnderFileSystemManager(); + long id = manager.createFile(new AlluxioURI(uniqPath)); + Assert.assertEquals(mMockOutputStream, manager.getOutputStream(id)); + } + + /** + * Tests getting an output stream from an invalid file fails. + */ + @Test + public void getNonExistentOutputStreamTest() throws Exception { + UnderFileSystemManager manager = new UnderFileSystemManager(); + mThrown.expect(FileDoesNotExistException.class); + manager.getOutputStream(-1L); + } + + /** + * Tests getting an input stream to a valid file at the start returns the correct input stream. + */ + @Test + public void getInputStreamTest() throws Exception { + String uniqPath = PathUtils.uniqPath(); + long position = 0L; + Mockito.when(mMockUfs.exists(uniqPath)).thenReturn(true); + Mockito.when(mMockInputStream.skip(position)).thenReturn(position); + UnderFileSystemManager manager = new UnderFileSystemManager(); + long id = manager.openFile(new AlluxioURI(uniqPath)); + Assert.assertEquals(mMockInputStream, manager.getInputStreamAtPosition(id, position)); + Mockito.verify(mMockInputStream).skip(position); + } + + /** + * Tests getting an input stream to a valid file at a position returns the correct input stream. + */ + @Test + public void getInputStreamAtPositionTest() throws Exception { + String uniqPath = PathUtils.uniqPath(); + long position = 100L; + Mockito.when(mMockUfs.exists(uniqPath)).thenReturn(true); + Mockito.when(mMockInputStream.skip(position)).thenReturn(position); + UnderFileSystemManager manager = new UnderFileSystemManager(); + long id = manager.openFile(new AlluxioURI(uniqPath)); + Assert.assertEquals(mMockInputStream, manager.getInputStreamAtPosition(id, position)); + Mockito.verify(mMockInputStream).skip(position); + } + + /** + * Tests getting an input stream to an invalid file fails. + */ + @Test + public void getNonExistentInputStreamTest() throws Exception { + UnderFileSystemManager manager = new UnderFileSystemManager(); + mThrown.expect(FileDoesNotExistException.class); + manager.getInputStreamAtPosition(-1L, 0L); + } }