Skip to content

Commit

Permalink
use file id as key to input stream resources
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Dec 21, 2017
1 parent 9171eb7 commit dd68152
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 68 deletions.
Expand Up @@ -25,20 +25,24 @@ class CachedSeekableInputStream extends SeekableUnderFileInputStream {
private Long mResourceId;
/** The file path of the input stream. */
private String mFilePath;
/** The file Id */
private long mFileId;

/**
* Creates a new {@link CachedSeekableInputStream}.
*
* @param inputStream the input stream from the under storage
* @param resourceId the resource id
* @param fileId the file id
* @param filePath the file path
*/
CachedSeekableInputStream(SeekableUnderFileInputStream inputStream, long resourceId,
CachedSeekableInputStream(SeekableUnderFileInputStream inputStream, long resourceId, long fileId,
String filePath) {
super(inputStream);
Preconditions.checkArgument(resourceId >= 0, "resource id should be positive");
mResourceId = resourceId;
mFilePath = filePath;
mFileId =fileId;
}

/**
Expand All @@ -55,6 +59,13 @@ String getFilePath() {
return mFilePath;
}

/**
* @return the file id
*/
Long getFileId() {
return mFileId;
}

@Override
public void seek(long pos) throws IOException {
((SeekableUnderFileInputStream) in).seek(pos);
Expand Down
Expand Up @@ -52,18 +52,18 @@
* {@link UfsInputStreamIds} to track the in-use input stream and the available ones. The manager
* closes the input streams after they are expired and not in-use anymore. Lock
* {@link UfsInputStreamIds} before access, and in addition to that, lock
* {@link #mFileToInputStreamIds} before the resource retrieval.
* {@link #mFileIdToInputStreamIds} before the resource retrieval.
*/
@ThreadSafe
public class UfsInputStreamManager {
private static final Logger LOG = LoggerFactory.getLogger(UfsInputStreamManager.class);
private static final long UNAVAILABLE_RESOURCE_ID = -1;

/**
* A map from the ufs file name to the metadata of the input streams. Synchronization on this map
* A map from the ufs file id to the metadata of the input streams. Synchronization on this map
* before access.
*/
private final Map<String, UfsInputStreamIds> mFileToInputStreamIds;
private final Map<Long, UfsInputStreamIds> mFileIdToInputStreamIds;
/** Cache of the input streams, from the input stream id to the input stream. */
@GuardedBy("mFileToInputStreamIds")
private final Cache<Long, CachedSeekableInputStream> mUnderFileInputStreamCache;
Expand All @@ -74,7 +74,7 @@ public class UfsInputStreamManager {
* Creates a new {@link UfsInputStreamManager}.
*/
public UfsInputStreamManager() {
mFileToInputStreamIds = new HashMap<>();
mFileIdToInputStreamIds = new HashMap<>();
mRemovalThreadPool = ExecutorServiceFactories
.fixedThreadPoolExecutorServiceFactory(Constants.UFS_INPUT_STREAM_CACHE_EXPIRATION, 2)
.create();
Expand All @@ -84,9 +84,9 @@ public UfsInputStreamManager() {
(RemovalNotification<Long, CachedSeekableInputStream> removal) -> {
CachedSeekableInputStream inputStream = removal.getValue();
boolean shouldClose = false;
synchronized (mFileToInputStreamIds) {
if (mFileToInputStreamIds.containsKey(inputStream.getFilePath())) {
UfsInputStreamIds resources = mFileToInputStreamIds.get(inputStream.getFilePath());
synchronized (mFileIdToInputStreamIds) {
if (mFileIdToInputStreamIds.containsKey(inputStream.getFileId())) {
UfsInputStreamIds resources = mFileIdToInputStreamIds.get(inputStream.getFileId());
synchronized (resources) {
// remove the key
resources.removeInUse(removal.getKey());
Expand All @@ -97,7 +97,7 @@ public UfsInputStreamManager() {
}
if (resources.isEmpty()) {
// remove the resources entry
mFileToInputStreamIds.remove(inputStream.getFilePath());
mFileIdToInputStreamIds.remove(inputStream.getFileId());
}
}
} else {
Expand All @@ -109,8 +109,9 @@ public UfsInputStreamManager() {
try {
inputStream.close();
} catch (IOException e) {
LOG.warn("Failed to close the input stream resource of file {} and resource id",
inputStream.getFilePath(), removal.getKey());
LOG.warn(
"Failed to close the input stream resource of file {} with id {} and resource id",
inputStream.getFilePath(), inputStream.getFileId(), removal.getKey());
}
}
};
Expand All @@ -134,17 +135,17 @@ public void release(InputStream inputStream) throws IOException {
return;
}

synchronized (mFileToInputStreamIds) {
if (!mFileToInputStreamIds
.containsKey(((CachedSeekableInputStream) inputStream).getFilePath())) {
synchronized (mFileIdToInputStreamIds) {
if (!mFileIdToInputStreamIds
.containsKey(((CachedSeekableInputStream) inputStream).getFileId())) {
LOG.debug("The resource {} is already expired",
((CachedSeekableInputStream) inputStream).getResourceId());
// the cache no longer tracks this input stream
inputStream.close();
return;
}
UfsInputStreamIds resources =
mFileToInputStreamIds.get(((CachedSeekableInputStream) inputStream).getFilePath());
mFileIdToInputStreamIds.get(((CachedSeekableInputStream) inputStream).getFileId());
if (!resources.release(((CachedSeekableInputStream) inputStream).getResourceId())) {
LOG.debug("Close the expired input stream resource of {}",
((CachedSeekableInputStream) inputStream).getResourceId());
Expand Down Expand Up @@ -175,9 +176,9 @@ public void invalidate(CachedSeekableInputStream inputStream) throws IOException
* @return the acquired input stream
* @throws IOException if the input stream fails to open
*/
public InputStream acquire(UnderFileSystem ufs, String path, OpenOptions openOptions)
public InputStream acquire(UnderFileSystem ufs, String path, long fileId, OpenOptions openOptions)
throws IOException {
return acquire(ufs, path, openOptions, true);
return acquire(ufs, path, fileId, openOptions, true);
}

/**
Expand All @@ -187,12 +188,13 @@ public InputStream acquire(UnderFileSystem ufs, String path, OpenOptions openOpt
*
* @param ufs the under file system
* @param path the path to the under storage file
* @param fileId the file id
* @param openOptions the open options
* @param reuse true to reuse existing input stream, otherwise acquire a new stream
* @return the acquired input stream
* @throws IOException if the input stream fails to open
*/
public InputStream acquire(UnderFileSystem ufs, String path, OpenOptions openOptions,
public InputStream acquire(UnderFileSystem ufs, String path, long fileId, OpenOptions openOptions,
boolean reuse) throws IOException {
if (!ufs.isSeekable() || !isCachingEnabled()) {
// not able to cache, always return a new input stream
Expand All @@ -203,12 +205,12 @@ public InputStream acquire(UnderFileSystem ufs, String path, OpenOptions openOpt
mUnderFileInputStreamCache.cleanUp();

UfsInputStreamIds resources;
synchronized (mFileToInputStreamIds) {
if (mFileToInputStreamIds.containsKey(path)) {
resources = mFileToInputStreamIds.get(path);
synchronized (mFileIdToInputStreamIds) {
if (mFileIdToInputStreamIds.containsKey(fileId)) {
resources = mFileIdToInputStreamIds.get(fileId);
} else {
resources = new UfsInputStreamIds();
mFileToInputStreamIds.put(path, resources);
mFileIdToInputStreamIds.put(fileId, resources);
}
}

Expand Down Expand Up @@ -237,7 +239,7 @@ public InputStream acquire(UnderFileSystem ufs, String path, OpenOptions openOpt
SeekableUnderFileInputStream ufsStream = (SeekableUnderFileInputStream) ufs.open(path,
OpenOptions.defaults().setOffset(openOptions.getOffset()));
LOG.debug("Created the under file input stream resource of {}", newId);
return new CachedSeekableInputStream(ufsStream, newId, path);
return new CachedSeekableInputStream(ufsStream, newId, fileId, path);
});
} catch (ExecutionException e) {
LOG.warn("Failed to retrieve the cached UFS instream");
Expand Down
Expand Up @@ -22,12 +22,11 @@
import alluxio.exception.InvalidWorkerStateException;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.retry.CountingRetry;
import alluxio.retry.RetryPolicy;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UfsManager.UfsInfo;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
Expand All @@ -53,8 +52,6 @@
public final class UnderFileSystemBlockReader implements BlockReader {
private static final Logger LOG = LoggerFactory.getLogger(UnderFileSystemBlockReader.class);

private static final int RETRY_COUNT = 2;

/** An object storing the mapping of tier aliases to ordinals. */
private final StorageTierAssoc mStorageTierAssoc = new WorkerStorageTierAssoc();

Expand Down Expand Up @@ -218,35 +215,7 @@ public int transferTo(ByteBuf buf) throws IOException {
}
int bytesToRead =
(int) Math.min(buf.writableBytes(), mBlockMeta.getBlockSize() - mInStreamPos);
int bytesRead = 0;

RetryPolicy retryPolicy = new CountingRetry(RETRY_COUNT);
IOException thrownException = null;
while (retryPolicy.attemptRetry()) {
try {
bytesRead = buf.writeBytes(mUnderFileSystemInputStream, bytesToRead);
} catch (IOException e) {
LOG.debug("Failed to read from ufs instream ");
thrownException = e;
if (mUnderFileSystemInputStream instanceof CachedSeekableInputStream) {
// this may happen when the cached input stream is stale
UfsInfo ufsInfo = mUfsManager.get(mBlockMeta.getMountId());
UnderFileSystem ufs = ufsInfo.getUfs();
mUfsInstreamManager
.invalidate((CachedSeekableInputStream) mUnderFileSystemInputStream);
mUnderFileSystemInputStream =
mUfsInstreamManager.acquire(ufs, mBlockMeta.getUnderFileSystemPath(),
OpenOptions.defaults().setOffset(
((CachedSeekableInputStream) mUnderFileSystemInputStream).getPos()),
false);
} else {
throw thrownException;
}
}
}
if (retryPolicy.getRetryCount() >= RETRY_COUNT) {
throw thrownException;
}
int bytesRead = buf.writeBytes(mUnderFileSystemInputStream, bytesToRead);

if (bytesRead <= 0) {
return bytesRead;
Expand Down Expand Up @@ -328,9 +297,9 @@ private void updateUnderFileSystemInputStream(long offset) throws IOException {
UfsInfo ufsInfo = mUfsManager.get(mBlockMeta.getMountId());
UnderFileSystem ufs = ufsInfo.getUfs();
mUfsMountPointUri = ufsInfo.getUfsMountPointUri();
mUnderFileSystemInputStream =
mUfsInstreamManager.acquire(ufs, mBlockMeta.getUnderFileSystemPath(),
OpenOptions.defaults().setOffset(mBlockMeta.getOffset() + offset));
mUnderFileSystemInputStream = mUfsInstreamManager.acquire(ufs,
mBlockMeta.getUnderFileSystemPath(), IdUtils.fileIdFromBlockId(mBlockMeta.getBlockId()),
OpenOptions.defaults().setOffset(mBlockMeta.getOffset() + offset));
mInStreamPos = offset;
}
}
Expand Down
Expand Up @@ -35,6 +35,7 @@
*/
public final class UfsInputStreamManagerTest {
private static final String FILE_NAME = "/test";
private static final long FILE_ID = 1;

private UnderFileSystem mUfs;
private SeekableUnderFileInputStream[] mSeekableInStreams;
Expand Down Expand Up @@ -65,11 +66,13 @@ public void testAcquireAndRelease() throws Exception {
.thenReturn(mockedStrem).thenThrow(new IllegalStateException("Should only be called once"));

// acquire a stream
InputStream instream1 = mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(2));
InputStream instream1 =
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(2));
// release
mManager.release(instream1);
// acquire a stream again
InputStream instream2 = mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(4));
InputStream instream2 =
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(4));

Assert.assertEquals(instream1, instream2);
// ensure the second time the released instream is the same one but repositioned
Expand All @@ -81,9 +84,9 @@ public void testAcquireAndRelease() throws Exception {
*/
@Test
public void testMultipleCheckIn() throws Exception {
mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(2));
mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(4));
mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(6));
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(2));
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(4));
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(6));
// 3 different input streams are acquired
Mockito.verify(mUfs, Mockito.times(3)).open(Mockito.eq(FILE_NAME),
Mockito.any(OpenOptions.class));
Expand All @@ -101,12 +104,13 @@ public void testExpire() throws Exception {
}).toResource()) {
mManager = new UfsInputStreamManager();
// check out a stream
InputStream instream = mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(2));
InputStream instream =
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(2));
// check in back
mManager.release(instream);
Thread.sleep(10);
// check out another stream should trigger the timeout
mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(4));
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(4));
Mockito.verify(mSeekableInStreams[0], Mockito.timeout(2000).times(1)).close();
}
}
Expand All @@ -124,7 +128,8 @@ public void testConcurrency() throws Exception {
for (int j = 0; j < numCheckOutPerThread; j++) {
InputStream instream;
try {
instream = mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(j));
instream =
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(j));
Thread.sleep(10);
mManager.release(instream);
} catch (Exception e) {
Expand Down Expand Up @@ -159,7 +164,8 @@ public void testConcurrencyWithExpiration() throws Exception {
for (int j = 0; j < numCheckOutPerThread; j++) {
InputStream instream;
try {
instream = mManager.acquire(mUfs, FILE_NAME, OpenOptions.defaults().setOffset(j));
instream =
mManager.acquire(mUfs, FILE_NAME, FILE_ID, OpenOptions.defaults().setOffset(j));
mManager.release(instream);
Thread.sleep(200);
} catch (Exception e) {
Expand Down

0 comments on commit dd68152

Please sign in to comment.