Skip to content

Commit

Permalink
Support external stores which do not read in full
Browse files Browse the repository at this point in the history
This is consistent with InputStream's API.

pr-link: #10891
change-id: cid-e9bfdb6d79fe69b5926d040c2a382e07db305efa
  • Loading branch information
calvinjia committed Feb 12, 2020
1 parent 1a4f762 commit c079aaa
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 5 deletions.
Expand Up @@ -318,10 +318,19 @@ private synchronized byte[] readExternalPage(long pos) throws IOException {
FileInStream stream = getExternalFileInStream(pageStart);
int pageSize = (int) Math.min(mPageSize, mStatus.getLength() - pageStart);
byte[] page = new byte[pageSize];
if (stream.read(page) != pageSize) {
throw new IOException("Failed to read complete page from external storage.");
int totalBytesRead = 0;
while (totalBytesRead < pageSize) {
int bytesRead = stream.read(page, totalBytesRead, pageSize - totalBytesRead);
if (bytesRead <= 0) {
break;
}
totalBytesRead += bytesRead;
}
Metrics.BYTES_READ_EXTERNAL.inc(totalBytesRead);
if (totalBytesRead != pageSize) {
throw new IOException("Failed to read complete page from external storage. Bytes read: "
+ totalBytesRead + " Page size: " + pageSize);
}
Metrics.BYTES_READ_EXTERNAL.inc(pageSize);
return page;
}

Expand Down
Expand Up @@ -307,6 +307,35 @@ public void readPagesMetrics() throws Exception {
MetricsSystem.counter(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL.getName()).getCount());
}

@Test
public void externalStoreMultiRead() throws Exception {
int fileSize = PAGE_SIZE;
byte[] testData = BufferUtils.getIncreasingByteArray(fileSize);
ByteArrayCacheManager manager = new ByteArrayCacheManager();
Map<AlluxioURI, byte[]> files = new HashMap<>();
AlluxioURI testFilename = new AlluxioURI("/test");
files.put(testFilename, testData);

ByteArrayFileSystem fs = new MultiReadByteArrayFileSystem(files);

LocalCacheFileInStream stream = new LocalCacheFileInStream(
testFilename, OpenFilePOptions.getDefaultInstance(), fs, manager);

// cache miss
byte[] cacheMiss = new byte[fileSize];
Assert.assertEquals(fileSize, stream.read(cacheMiss));
Assert.assertArrayEquals(testData, cacheMiss);
Assert.assertEquals(0, manager.mPagesServed);
Assert.assertEquals(1, manager.mPagesCached);

// cache hit
stream.seek(0);
byte[] cacheHit = new byte[fileSize];
Assert.assertEquals(fileSize, stream.read(cacheHit));
Assert.assertArrayEquals(testData, cacheHit);
Assert.assertEquals(1, manager.mPagesServed);
}

private LocalCacheFileInStream setupWithSingleFile(byte[] data, CacheManager manager) {
Map<AlluxioURI, byte[]> files = new HashMap<>();
AlluxioURI testFilename = new AlluxioURI("/test");
Expand Down Expand Up @@ -473,7 +502,8 @@ public List<SyncPointInfo> getSyncPathList() throws IOException, AlluxioExceptio
throw new UnsupportedOperationException();
}

@Override public FileInStream openFile(AlluxioURI path, OpenFilePOptions options)
@Override
public FileInStream openFile(AlluxioURI path, OpenFilePOptions options)
throws FileDoesNotExistException, OpenDirectoryException, FileIncompleteException,
IOException, AlluxioException {
if (mFiles.containsKey(path)) {
Expand All @@ -483,7 +513,8 @@ public List<SyncPointInfo> getSyncPathList() throws IOException, AlluxioExceptio
}
}

@Override public FileInStream openFile(URIStatus status, OpenFilePOptions options)
@Override
public FileInStream openFile(URIStatus status, OpenFilePOptions options)
throws FileDoesNotExistException, OpenDirectoryException, FileIncompleteException,
IOException, AlluxioException {
AlluxioURI path = new AlluxioURI(status.getPath());
Expand Down Expand Up @@ -547,4 +578,78 @@ public void close() throws IOException {
throw new UnsupportedOperationException();
}
}

private class MultiReadByteArrayFileSystem extends ByteArrayFileSystem {
MultiReadByteArrayFileSystem(Map<AlluxioURI, byte[]> files) {
super(files);
}

@Override
public FileInStream openFile(AlluxioURI path, OpenFilePOptions options)
throws FileDoesNotExistException, OpenDirectoryException, FileIncompleteException,
IOException, AlluxioException {
return new MultiReadFileInStream(super.openFile(path, options));
}

@Override
public FileInStream openFile(URIStatus status, OpenFilePOptions options)
throws FileDoesNotExistException, OpenDirectoryException, FileIncompleteException,
IOException, AlluxioException {
return new MultiReadFileInStream(super.openFile(status, options));
}
}

/**
* Mock implementation of {@link FileInStream} which delegates to a {@link ByteArrayInputStream}.
* This implementation may not serve the full read in a single call.
*/
private class MultiReadFileInStream extends FileInStream {
private final FileInStream mIn;

/**
* Creates an FileInStream that may not serve read calls in a single call.
*
* @param in the backing FileInStream
*/
public MultiReadFileInStream(FileInStream in) {
mIn = in;
}

@Override
public int read() throws IOException {
return mIn.read();
}

@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int toRead = len > 1 ? ThreadLocalRandom.current().nextInt(1, len) : len;
return mIn.read(b, off, toRead);
}

@Override
public long getPos() throws IOException {
return mIn.getPos();
}

@Override
public long remaining() {
return mIn.remaining();
}

@Override
public void seek(long pos) throws IOException {
mIn.seek(pos);
}

@Override
public int positionedRead(long position, byte[] buffer, int offset, int length)
throws IOException {
return mIn.positionedRead(position, buffer, offset, length);
}
}
}

0 comments on commit c079aaa

Please sign in to comment.