Skip to content

Commit

Permalink
resolve conflict and refactor stream class
Browse files Browse the repository at this point in the history
  • Loading branch information
bf8086 committed Feb 19, 2020
1 parent a4f083a commit 7d54313
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 92 deletions.
@@ -0,0 +1,81 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.cache;

import alluxio.AlluxioURI;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.URIStatus;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.grpc.OpenFilePOptions;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.util.io.BufferUtils;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.io.Closer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

import javax.annotation.concurrent.NotThreadSafe;

/**
* Implementation of {@link FileInStream} that reads from a local cache if possible.
*/
@NotThreadSafe
public class DryRunLocalCacheFileInStream extends LocalCacheFileInStream {
public DryRunLocalCacheFileInStream(AlluxioURI path, OpenFilePOptions options,
FileSystem externalFs, CacheManager cacheManager) {
super(path, options, externalFs, cacheManager);
}

public DryRunLocalCacheFileInStream(URIStatus status, OpenFilePOptions options,
FileSystem externalFs, CacheManager cacheManager) {
super(status, options, externalFs, cacheManager);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int bytesRead = getExternalFileInStream(getPos()).read(b, off, len);
super.read(b, off, len);
return bytesRead;
}

@Override
public int positionedRead(long pos, byte[] b, int off, int len) throws IOException {
int bytesRead = getExternalFileInStream(getPos()).positionedRead(pos, b, off, len);
super.positionedRead(pos, b, off, len);
return bytesRead;
}

@Override
protected void readPage(byte[] buffer, int offset, ReadableByteChannel page, int bytesLeft) {
// read nothing in dry run mode
}

@Override
protected void copyPage(byte[] page, int pageOffset, byte[] buffer, int bufferOffset,
int length) {
// read nothing in dry run mode
}

@Override
protected void readExternalPage(long pageStart, int pageSize, byte[] buffer) {
// read nothing in dry run mode
}
}
Expand Up @@ -18,6 +18,8 @@
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Collection;
import java.util.Collections;

/**
* A page store used in dry run mode to keep track of the blocks without writing any data.
Expand All @@ -37,15 +39,25 @@ public ReadableByteChannel get(PageId pageId, int pageOffset) {
}

@Override
public void delete(PageId pageId) throws IOException, PageNotFoundException {
public void delete(PageId pageId, long pageSize) throws IOException, PageNotFoundException {
mSize--;
}

@Override
public int size() {
public long pages() {
return mSize;
}

@Override
public long bytes() {
return 0;
}

@Override
public Collection<PageInfo> getPages() {
return Collections.emptyList();
}

@Override
public void close() throws Exception {
// no-op
Expand Down
Expand Up @@ -54,7 +54,6 @@ public class LocalCacheFileInStream extends FileInStream {
/** File info, fetched from external FS. */
private final URIStatus mStatus;
private final OpenFilePOptions mOpenOptions;
private final boolean mDryRun;

/** Stream reading from the external file system, opened once. */
private FileInStream mExternalFileInStream;
Expand All @@ -74,10 +73,6 @@ public class LocalCacheFileInStream extends FileInStream {
public LocalCacheFileInStream(AlluxioURI path, OpenFilePOptions options, FileSystem externalFs,
CacheManager cacheManager) {
mPageSize = externalFs.getConf().getBytes(PropertyKey.USER_CLIENT_CACHE_PAGE_SIZE);
CacheMode dryrun = externalFs.getConf()
.getEnum(PropertyKey.USER_LOCAL_CACHE_MODE, CacheMode.class);
Preconditions.checkArgument(dryrun != CacheMode.DISABLED);
mDryRun = (dryrun == CacheMode.DRYRUN);
mPath = path;
mOpenOptions = options;
mExternalFs = externalFs;
Expand Down Expand Up @@ -136,9 +131,6 @@ public int read(byte[] b, int off, int len) throws IOException {
if (mPosition >= mStatus.getLength()) { // at end of file
return -1;
}
if (mDryRun) {
getExternalFileInStream(mPosition).read(b, off, len);
}
int bytesRead = 0;
long lengthToRead = Math.min(len, mStatus.getLength() - mPosition);
// for each page, check if it is available in the cache
Expand All @@ -149,29 +141,15 @@ public int read(byte[] b, int off, int len) throws IOException {
PageId pageId = new PageId(mStatus.getFileIdentifier(), currentPage);
try (ReadableByteChannel cachedData = mCacheManager.get(pageId, currentPageOffset)) {
if (cachedData != null) { // cache hit
// wrap return byte array in a bytebuffer and set the pos/limit for the page read
if (!mDryRun) {
ByteBuffer buf = ByteBuffer.wrap(b);
buf.position(off + bytesRead);
buf.limit(off + bytesRead + bytesLeftInPage);
// read data from cache
while (buf.position() != buf.limit()) {
if (cachedData.read(buf) == -1) {
break;
}
}
Preconditions.checkState(buf.position() == buf.limit());
}
readPage(b, off + bytesRead, cachedData, bytesLeftInPage);
bytesRead += bytesLeftInPage;
mPosition += bytesLeftInPage;
Metrics.BYTES_READ_CACHE.inc(bytesLeftInPage);
} else { // cache miss
byte[] page = readExternalPage(mPosition);
if (page.length > 0) {
mCacheManager.put(pageId, page);
if (!mDryRun) {
System.arraycopy(page, currentPageOffset, b, off + bytesRead, bytesLeftInPage);
}
copyPage(page, currentPageOffset, b, off + bytesRead, bytesLeftInPage);
bytesRead += bytesLeftInPage;
mPosition += bytesLeftInPage;
Metrics.BYTES_REQUESTED_EXTERNAL.inc(bytesLeftInPage);
Expand Down Expand Up @@ -218,9 +196,6 @@ public int positionedRead(long pos, byte[] b, int off, int len) throws IOExcepti
if (pos < 0 || pos >= mStatus.getLength()) { // at end of file
return -1;
}
if (mDryRun) {
getExternalFileInStream(mPosition).positionedRead(pos, b, off, len);
}
int bytesRead = 0;
long currentPosition = pos;
long lengthToRead = Math.min(len, mStatus.getLength() - pos);
Expand All @@ -233,27 +208,14 @@ public int positionedRead(long pos, byte[] b, int off, int len) throws IOExcepti
try (ReadableByteChannel cachedData = mCacheManager.get(pageId, currentPageOffset)) {
if (cachedData != null) { // cache hit
// wrap return byte array in a bytebuffer and set the pos/limit for the page read
if (!mDryRun) {
ByteBuffer buf = ByteBuffer.wrap(b);
buf.position(off + bytesRead);
buf.limit(off + bytesRead + bytesLeftInPage);
// read data from cache
while (buf.position() != buf.limit()) {
if (cachedData.read(buf) == -1) {
break;
}
}
Preconditions.checkState(buf.position() == buf.limit());
}
readPage(b, off + bytesRead, cachedData, bytesLeftInPage);
bytesRead += bytesLeftInPage;
currentPosition += bytesLeftInPage;
Metrics.BYTES_READ_CACHE.inc(bytesLeftInPage);
} else { // cache miss
byte[] page = readExternalPage(currentPosition);
mCacheManager.put(pageId, page);
if (!mDryRun) {
System.arraycopy(page, currentPageOffset, b, off + bytesRead, bytesLeftInPage);
}
copyPage(page, currentPageOffset, b, off + bytesRead, bytesLeftInPage);
bytesRead += bytesLeftInPage;
currentPosition += bytesLeftInPage;
Metrics.BYTES_REQUESTED_EXTERNAL.inc(bytesLeftInPage);
Expand Down Expand Up @@ -303,7 +265,7 @@ private void checkIfClosed() {
*
* @param pos position to set the external stream to
*/
private FileInStream getExternalFileInStream(long pos) throws IOException {
protected FileInStream getExternalFileInStream(long pos) throws IOException {
try {
if (mExternalFileInStream == null) {
mExternalFileInStream = mExternalFs.openFile(mPath, mOpenOptions);
Expand All @@ -319,6 +281,43 @@ private FileInStream getExternalFileInStream(long pos) throws IOException {
return mExternalFileInStream;
}

/**
* Reads a cached page from a channel into a buffer.
*
* @param buffer the buffer to write the data to
* @param offset the offset in buffer where the write should start
* @param page the channel which contains the page data
* @param bytesLeft the amount of data left in the channel
* @throws IOException
*/
protected void readPage(byte[] buffer, int offset, ReadableByteChannel page, int bytesLeft)
throws IOException {
ByteBuffer buf = ByteBuffer.wrap(buffer);
buf.position(offset);
buf.limit(offset + bytesLeft);
// read data from cache
while (buf.position() != buf.limit()) {
if (page.read(buf) == -1) {
break;
}
}
Preconditions.checkState(buf.position() == buf.limit());
}

/**
* Copies page data into a buffer.
*
* @param page the byte array contains the page data
* @param pageOffset the offset in the page where the data starts
* @param buffer the buffer to write data into
* @param bufferOffset the offset where the data should writes to
* @param length the length of the data to copy
*/
protected void copyPage(byte[] page, int pageOffset, byte[] buffer, int bufferOffset,
int length) {
System.arraycopy(page, pageOffset, buffer, bufferOffset, length);
}

@VisibleForTesting
FileInStream getExternalFileInStream() {
return mExternalFileInStream;
Expand All @@ -343,24 +342,33 @@ private synchronized byte[] readExternalPage(long pos) throws IOException {
long pageStart = pos - (pos % mPageSize);
int pageSize = (int) Math.min(mPageSize, mStatus.getLength() - pageStart);
byte[] page = new byte[pageSize];
if (!mDryRun) {
FileInStream stream = getExternalFileInStream(pageStart);
page = new byte[pageSize];
int totalBytesRead = 0;
while (totalBytesRead < pageSize) {
int bytesRead = stream.read(page, totalBytesRead, pageSize - totalBytesRead);
if (bytesRead <= 0) {
break;
}
totalBytesRead += bytesRead;
}
Metrics.BYTES_READ_EXTERNAL.inc(totalBytesRead);
if (totalBytesRead != pageSize) {
throw new IOException("Failed to read complete page from external storage. Bytes read: "
+ totalBytesRead + " Page size: " + pageSize);
readExternalPage(pageStart, pageSize, page);
Metrics.BYTES_READ_EXTERNAL.inc(pageSize);
return page;
}

/**
* Reads a page from external storage which contains the position specified.
*
* @param pageStart the offset where the page starts
* @param pageSize the size of the page
* @param buffer the buffer to read page into
* @throws IOException if an error occurs while reading data
*/
protected void readExternalPage(long pageStart, int pageSize, byte[] buffer) throws IOException {
int totalBytesRead = 0;
FileInStream stream = getExternalFileInStream(pageStart);
while (totalBytesRead < pageSize) {
int bytesRead = stream.read(buffer, totalBytesRead, pageSize - totalBytesRead);
if (bytesRead <= 0) {
break;
}
totalBytesRead += bytesRead;
}
if (totalBytesRead != pageSize) {
throw new IOException("Failed to read complete page from external storage. Bytes read: "
+ totalBytesRead + " Page size: " + pageSize);
}
return page;
}

private static final class Metrics {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import alluxio.exception.AlluxioException;
import alluxio.grpc.OpenFilePOptions;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +36,7 @@ public class LocalCacheFileSystem extends DelegatingFileSystem {
private static Optional<CacheManager> sCacheManager;

private final AlluxioConfiguration mConf;
private final boolean mDryRun;

/**
* @param fs a FileSystem instance to query on local cache miss
Expand All @@ -59,6 +61,9 @@ public LocalCacheFileSystem(FileSystem fs, AlluxioConfiguration conf) {
}
}
mConf = conf;
CacheMode cacheMode = getConf().getEnum(PropertyKey.USER_LOCAL_CACHE_MODE, CacheMode.class);
Preconditions.checkArgument(cacheMode != CacheMode.DISABLED);
mDryRun = (cacheMode == CacheMode.DRYRUN);
}

@Override
Expand All @@ -75,7 +80,12 @@ public FileInStream openFile(AlluxioURI path, OpenFilePOptions options)
if (sCacheManager == null || !sCacheManager.isPresent()) {
return mDelegatedFileSystem.openFile(path, options);
}
return new LocalCacheFileInStream(path, options, mDelegatedFileSystem, sCacheManager.get());
if (mDryRun) {
return new DryRunLocalCacheFileInStream(
path, options, mDelegatedFileSystem, sCacheManager.get());
} else {
return new LocalCacheFileInStream(path, options, mDelegatedFileSystem, sCacheManager.get());
}
}

@Override
Expand All @@ -84,6 +94,11 @@ public FileInStream openFile(URIStatus status, OpenFilePOptions options)
if (sCacheManager == null || !sCacheManager.isPresent()) {
return mDelegatedFileSystem.openFile(status, options);
}
return new LocalCacheFileInStream(status, options, mDelegatedFileSystem, sCacheManager.get());
if (mDryRun) {
return new DryRunLocalCacheFileInStream(
status, options, mDelegatedFileSystem, sCacheManager.get());
} else {
return new LocalCacheFileInStream(status, options, mDelegatedFileSystem, sCacheManager.get());
}
}
}

0 comments on commit 7d54313

Please sign in to comment.