Skip to content

Commit

Permalink
Additional unit tests for get input/output stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed May 11, 2016
1 parent 5fa8a80 commit 1f00ec8
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 9 deletions.
Expand Up @@ -151,9 +151,11 @@ public long createUfsFile(AlluxioURI ufsUri) throws FileAlreadyExistsException,
* @param tempUfsFileId the worker specific temporary file id for the file in the under storage
* @param position the absolute position in the file to position the stream at before returning
* @return an input stream to the ufs file positioned at the given position
* @throws FileDoesNotExistException if the worker file id is invalid
* @throws IOException if an error occurs interacting with the under file system
*/
public InputStream getUfsInputStream(long tempUfsFileId, long position) throws IOException {
public InputStream getUfsInputStream(long tempUfsFileId, long position)
throws FileDoesNotExistException, IOException {
return mUnderFileSystemManager.getInputStreamAtPosition(tempUfsFileId, position);
}

Expand All @@ -163,9 +165,9 @@ public InputStream getUfsInputStream(long tempUfsFileId, long position) throws I
*
* @param tempUfsFileId the worker specific temporary file id for the file in the under storage
* @return the output stream writing the contents of the file
* @throws IOException if an error occurs interacting with the under file system
* @throws FileDoesNotExistException if the temporary file id is invalid
*/
public OutputStream getUfsOutputStream(long tempUfsFileId) throws IOException {
public OutputStream getUfsOutputStream(long tempUfsFileId) throws FileDoesNotExistException {
return mUnderFileSystemManager.getOutputStream(tempUfsFileId);
}

Expand Down
Expand Up @@ -234,19 +234,32 @@ public void completeFile(long tempUfsFileId) throws FileDoesNotExistException, I
* @param tempUfsFileId the temporary ufs file id
* @param position the absolute position in the file to start the stream at
* @return the input stream to read from this file
* @throws FileDoesNotExistException if the worker file id not valid
* @throws IOException if an error occurs when operating on the under file system
*/
public InputStream getInputStreamAtPosition(long tempUfsFileId, long position)
throws IOException {
return mInputStreams.get(tempUfsFileId).openAtPosition(position);
throws FileDoesNotExistException, IOException {
UnderFileSystemInputStream stream = mInputStreams.get(tempUfsFileId);
if (stream != null) {
return stream.openAtPosition(position);
} else {
throw new FileDoesNotExistException(
ExceptionMessage.BAD_WORKER_FILE_ID.getMessage(tempUfsFileId));
}
}

/**
* @param tempUfsFileId the temporary ufs file id
* @return the output stream to write to this file
*/
public OutputStream getOutputStream(long tempUfsFileId) {
return mOutputStreams.get(tempUfsFileId).getStream();
public OutputStream getOutputStream(long tempUfsFileId) throws FileDoesNotExistException {
UnderFileSystemOutputStream stream = mOutputStreams.get(tempUfsFileId);
if (stream != null) {
return stream.getStream();
} else {
throw new FileDoesNotExistException(
ExceptionMessage.BAD_WORKER_FILE_ID.getMessage(tempUfsFileId));
}
}

/**
Expand Down
Expand Up @@ -108,8 +108,8 @@ public void handleFileWriteRequest(ChannelHandlerContext ctx, RPCFileWriteReques
long length = req.getLength();
final DataBuffer data = req.getPayloadDataBuffer();

OutputStream out = mWorker.getUfsOutputStream(ufsFileId);
try {
OutputStream out = mWorker.getUfsOutputStream(ufsFileId);
out.write(data.getReadOnlyByteBuffer().array());
RPCFileWriteResponse resp =
new RPCFileWriteResponse(ufsFileId, offset, length, RPCResponse.Status.SUCCESS);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import alluxio.underfs.UnderFileSystem;
import alluxio.util.io.PathUtils;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -29,6 +30,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.io.InputStream;
import java.io.OutputStream;

@RunWith(PowerMockRunner.class)
Expand All @@ -40,6 +42,8 @@ public final class UnderFileSystemManagerTest {
@Rule
public final ExpectedException mThrown = ExpectedException.none();

/** The mock input stream returned whenever a ufs file is read */
private InputStream mMockInputStream;
/** The mock output stream returned whenever a ufs file is created */
private OutputStream mMockOutputStream;
/** The mock under file system client. */
Expand All @@ -49,7 +53,9 @@ public final class UnderFileSystemManagerTest {
public void before() throws Exception {
mMockUfs = Mockito.mock(UnderFileSystem.class);
mMockOutputStream = Mockito.mock(OutputStream.class);
mMockInputStream = Mockito.mock(InputStream.class);
Mockito.when(mMockUfs.create(Mockito.anyString())).thenReturn(mMockOutputStream);
Mockito.when(mMockUfs.open(Mockito.anyString())).thenReturn(mMockInputStream);
PowerMockito.mockStatic(UnderFileSystem.class);
BDDMockito.given(UnderFileSystem.get(Mockito.anyString(), Mockito.any(Configuration.class)))
.willReturn(mMockUfs);
Expand Down Expand Up @@ -133,7 +139,7 @@ public void openUfsFileTest() throws Exception {
String uniqPath = PathUtils.uniqPath();
Mockito.when(mMockUfs.exists(uniqPath)).thenReturn(true);
UnderFileSystemManager manager = new UnderFileSystemManager();
long id = manager.openFile(new AlluxioURI(uniqPath));
manager.openFile(new AlluxioURI(uniqPath));
Mockito.verify(mMockUfs).exists(uniqPath);
}

Expand Down Expand Up @@ -172,4 +178,65 @@ public void closeNonExistentUfsFileTest() throws Exception {
mThrown.expect(FileDoesNotExistException.class);
manager.closeFile(-1L);
}

/**
* Tests getting an output stream for a valid file returns the correct output stream.
*/
@Test
public void getOutputStreamTest() throws Exception {
String uniqPath = PathUtils.uniqPath();
UnderFileSystemManager manager = new UnderFileSystemManager();
long id = manager.createFile(new AlluxioURI(uniqPath));
Assert.assertEquals(mMockOutputStream, manager.getOutputStream(id));
}

/**
* Tests getting an output stream from an invalid file fails.
*/
@Test
public void getNonExistentOutputStreamTest() throws Exception {
UnderFileSystemManager manager = new UnderFileSystemManager();
mThrown.expect(FileDoesNotExistException.class);
manager.getOutputStream(-1L);
}

/**
* Tests getting an input stream to a valid file at the start returns the correct input stream.
*/
@Test
public void getInputStreamTest() throws Exception {
String uniqPath = PathUtils.uniqPath();
long position = 0L;
Mockito.when(mMockUfs.exists(uniqPath)).thenReturn(true);
Mockito.when(mMockInputStream.skip(position)).thenReturn(position);
UnderFileSystemManager manager = new UnderFileSystemManager();
long id = manager.openFile(new AlluxioURI(uniqPath));
Assert.assertEquals(mMockInputStream, manager.getInputStreamAtPosition(id, position));
Mockito.verify(mMockInputStream).skip(position);
}

/**
* Tests getting an input stream to a valid file at a position returns the correct input stream.
*/
@Test
public void getInputStreamAtPositionTest() throws Exception {
String uniqPath = PathUtils.uniqPath();
long position = 100L;
Mockito.when(mMockUfs.exists(uniqPath)).thenReturn(true);
Mockito.when(mMockInputStream.skip(position)).thenReturn(position);
UnderFileSystemManager manager = new UnderFileSystemManager();
long id = manager.openFile(new AlluxioURI(uniqPath));
Assert.assertEquals(mMockInputStream, manager.getInputStreamAtPosition(id, position));
Mockito.verify(mMockInputStream).skip(position);
}

/**
* Tests getting an input stream to an invalid file fails.
*/
@Test
public void getNonExistentInputStreamTest() throws Exception {
UnderFileSystemManager manager = new UnderFileSystemManager();
mThrown.expect(FileDoesNotExistException.class);
manager.getInputStreamAtPosition(-1L, 0L);
}
}

0 comments on commit 1f00ec8

Please sign in to comment.