Skip to content

Commit

Permalink
change log-level
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Dec 15, 2017
1 parent 3bd3423 commit ea0e6a8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 31 deletions.
@@ -1,5 +1,7 @@
package alluxio.worker.block;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.underfs.SeekableUnderFileInputStream;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
Expand Down Expand Up @@ -28,8 +30,8 @@
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class UnderFileInputStreamManager {
private static final Logger LOG = LoggerFactory.getLogger(UnderFileInputStreamManager.class);
public class UfsInputStreamManager {
private static final Logger LOG = LoggerFactory.getLogger(UfsInputStreamManager.class);

private final HashMap<String, UnderFileInputStreamResources> mResources;
private final Cache<Long, SeekableUnderFileInputStream> mUnderFileInputStreamCache;
Expand All @@ -48,7 +50,7 @@ public void onRemoval(RemovalNotification<Long, SeekableUnderFileInputStream> re
resources.removeInUse(removal.getKey());
if (resources.removeAvailable(removal.getKey())) {
// close the resource
LOG.info("Removed the under file input stream resource of {}", removal.getKey());
LOG.debug("Removed the under file input stream resource of {}", removal.getKey());
try {
inputStream.close();
} catch (IOException e) {
Expand All @@ -59,27 +61,25 @@ public void onRemoval(RemovalNotification<Long, SeekableUnderFileInputStream> re
if (resources.isEmpty()) {
// remove the resources entry
mResources.remove(inputStream.getFilePath());
LOG.info("Remove the resource {} of entry {}", removal.getKey(),
inputStream.getFilePath());
}
}
} else{
LOG.warn("Try to remove the resource entry of {} but not exists any more", removal.getKey());
} else {
LOG.warn("Try to remove the resource entry of {} but not exists any more",
removal.getKey());
}
}
}
};

public UnderFileInputStreamManager() {
mResources=new HashMap<>();
public UfsInputStreamManager() {
mResources = new HashMap<>();
mRemovalThreadPool = Executors.newFixedThreadPool(2);

mUnderFileInputStreamCache = CacheBuilder.newBuilder()
// .expireAfterAccess(Configuration.getMs(PropertyKey.WORKER_UFS_INSTREAM_CACHE_EXPIRE_MS),
// TimeUnit.MILLISECONDS)
.expireAfterAccess(4000,
.expireAfterAccess(Configuration.getMs(PropertyKey.WORKER_UFS_INSTREAM_CACHE_EXPIRE_MS),
TimeUnit.MILLISECONDS)
.removalListener(RemovalListeners.asynchronous(mRemovalListener,mRemovalThreadPool)).build();
.removalListener(RemovalListeners.asynchronous(mRemovalListener, mRemovalThreadPool))
.build();
}

public void checkIn(InputStream inputStream) throws IOException {
Expand All @@ -88,19 +88,18 @@ public void checkIn(InputStream inputStream) throws IOException {
return;
}

mUnderFileInputStreamCache.cleanUp();

SeekableUnderFileInputStream seekableInputStream = (SeekableUnderFileInputStream) inputStream;
synchronized (mResources) {
if (!mResources.containsKey(seekableInputStream.getFilePath())) {
LOG.info("The resource {} is already expired", seekableInputStream.getResourceId());
LOG.debug("The resource {} is already expired", seekableInputStream.getResourceId());
// the cache no longer tracks this input stream
seekableInputStream.close();
return;
}
UnderFileInputStreamResources resources = mResources.get(seekableInputStream.getFilePath());
if (!resources.checkIn(seekableInputStream.getResourceId())) {
LOG.info("Close the expired input stream resource of {}", seekableInputStream.getResourceId());
LOG.debug("Close the expired input stream resource of {}",
seekableInputStream.getResourceId());
seekableInputStream.close();
}
}
Expand All @@ -117,8 +116,8 @@ public InputStream checkOut(UnderFileSystem ufs, String path, long offset) throw

UnderFileInputStreamResources resources;
synchronized (mResources) {
if(mResources.containsKey(path)) {
resources=mResources.get(path);
if (mResources.containsKey(path)) {
resources = mResources.get(path);
} else {
resources = new UnderFileInputStreamResources();
mResources.put(path, resources);
Expand All @@ -132,7 +131,7 @@ public InputStream checkOut(UnderFileSystem ufs, String path, long offset) throw
inputStream = mUnderFileInputStreamCache.getIfPresent(id);
if (inputStream != null) {
nextId = id;
LOG.info("Reused the under file input stream resource of {}", nextId);
LOG.debug("Reused the under file input stream resource of {}", nextId);
// for the cached ufs instream, seek to the requested position
inputStream.seek(offset);
break;
Expand All @@ -145,7 +144,7 @@ public InputStream checkOut(UnderFileSystem ufs, String path, long offset) throw
inputStream = mUnderFileInputStreamCache.get(nextId, () -> {
SeekableUnderFileInputStream ufsStream = (SeekableUnderFileInputStream) ufs.open(path,
OpenOptions.defaults().setOffset(offset));
LOG.info("Created the under file input stream resource of {}", newId);
LOG.debug("Created the under file input stream resource of {}", newId);
ufsStream.setResourceId(newId);
ufsStream.setFilePath(path);
return ufsStream;
Expand All @@ -163,13 +162,13 @@ public InputStream checkOut(UnderFileSystem ufs, String path, long offset) throw
}

@ThreadSafe
static class UnderFileInputStreamResources {
private static class UnderFileInputStreamResources {
private final Set<Long> mInUse;
private final Set<Long> mAvailable;

UnderFileInputStreamResources() {
mInUse = new HashSet<>();
mAvailable= new HashSet<>();
mAvailable = new HashSet<>();
}

public Set<Long> availableIds() {
Expand Down Expand Up @@ -204,7 +203,7 @@ public synchronized boolean removeAvailable(long id) {
*/
public synchronized boolean checkIn(long id) {
Preconditions.checkArgument(!mAvailable.contains(id));
if(mInUse.contains(id)) {
if (mInUse.contains(id)) {
mInUse.remove(id);
mAvailable.add(id);
return true;
Expand Down
Expand Up @@ -71,7 +71,7 @@ public final class UnderFileSystemBlockReader implements BlockReader {
/** The manager for different ufs. */
private final UfsManager mUfsManager;
/** The manager for all ufs instream */
private final UnderFileInputStreamManager mUfsInstreamManager;
private final UfsInputStreamManager mUfsInstreamManager;

/**
* The position of mUnderFileSystemInputStream (if not null) is blockStart + mInStreamPos.
Expand All @@ -94,7 +94,7 @@ public final class UnderFileSystemBlockReader implements BlockReader {
* @throws BlockDoesNotExistException if the UFS block does not exist in the UFS block store
*/
public static UnderFileSystemBlockReader create(UnderFileSystemBlockMeta blockMeta, long offset,
BlockStore localBlockStore, UfsManager ufsManager, UnderFileInputStreamManager ufsInstreamManager)
BlockStore localBlockStore, UfsManager ufsManager, UfsInputStreamManager ufsInstreamManager)
throws BlockDoesNotExistException, IOException {
UnderFileSystemBlockReader ufsBlockReader =
new UnderFileSystemBlockReader(blockMeta, localBlockStore, ufsManager, ufsInstreamManager);
Expand All @@ -111,7 +111,7 @@ public static UnderFileSystemBlockReader create(UnderFileSystemBlockMeta blockMe
* @param ufsInstreamManager the manager of ufs instreams
*/
private UnderFileSystemBlockReader(UnderFileSystemBlockMeta blockMeta, BlockStore localBlockStore,
UfsManager ufsManager, UnderFileInputStreamManager ufsInstreamManager) {
UfsManager ufsManager, UfsInputStreamManager ufsInstreamManager) {
mInitialBlockSize = Configuration.getBytes(PropertyKey.WORKER_FILE_BUFFER_SIZE);
mBlockMeta = blockMeta;
mLocalBlockStore = localBlockStore;
Expand Down
Expand Up @@ -75,7 +75,7 @@ public final class UnderFileSystemBlockStore implements SessionCleanable {
private final UfsManager mUfsManager;

/** The manager for all ufs instream */
private final UnderFileInputStreamManager mUfsInstreamManager;
private final UfsInputStreamManager mUfsInstreamManager;

/**
* Creates an instance of {@link UnderFileSystemBlockStore}.
Expand All @@ -86,7 +86,7 @@ public final class UnderFileSystemBlockStore implements SessionCleanable {
public UnderFileSystemBlockStore(BlockStore localBlockStore, UfsManager ufsManager) {
mLocalBlockStore = localBlockStore;
mUfsManager = ufsManager;
mUfsInstreamManager = new UnderFileInputStreamManager();
mUfsInstreamManager = new UfsInputStreamManager();
}

/**
Expand Down
Expand Up @@ -48,7 +48,7 @@ public final class UnderFileSystemBlockReaderTest {
private BlockStore mAlluxioBlockStore;
private UnderFileSystemBlockMeta mUnderFileSystemBlockMeta;
private UfsManager mUfsManager;
private UnderFileInputStreamManager mUfsInstreamManager;
private UfsInputStreamManager mUfsInstreamManager;
private Protocol.OpenUfsBlockOptions mOpenUfsBlockOptions;

/** Rule to create a new temporary folder during each test. */
Expand Down Expand Up @@ -79,7 +79,7 @@ public void before() throws Exception {

mAlluxioBlockStore = new TieredBlockStore();
mUfsManager = Mockito.mock(UfsManager.class);
mUfsInstreamManager = new UnderFileInputStreamManager();
mUfsInstreamManager = new UfsInputStreamManager();
UfsInfo ufsInfo = new UfsInfo(
Suppliers.ofInstance(UnderFileSystem.Factory.create(testFilePath)),
new AlluxioURI(testFilePath));
Expand Down

0 comments on commit ea0e6a8

Please sign in to comment.