Skip to content

Commit

Permalink
Address calvin's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Sep 23, 2016
1 parent d9ebc74 commit d8a6dbf
Showing 1 changed file with 40 additions and 42 deletions.
82 changes: 40 additions & 42 deletions core/common/src/main/java/alluxio/resource/DynamicResourcePool.java
Expand Up @@ -13,6 +13,7 @@

import alluxio.Constants;

import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,7 +38,7 @@
* A dynamic pool that manages the resources. It clears old resources.
* It accepts a min and max capacity.
*
* When acquiring resources, the most recently used
* When acquiring resources, the most recently used resource is returned.
*
* @param <T> the type of the resource
*/
Expand All @@ -59,17 +60,13 @@ protected static class ResourceInternal<T> {
private long mLastAccessTimeMs;

/**
* Sets the lastAccessTimeInSecs.
*
* @param lastAccessTimeMs the last access time in ms
*/
public void setLastAccessTimeMs(long lastAccessTimeMs) {
mLastAccessTimeMs = lastAccessTimeMs;
}

/**
* Gets the lastAccessTimeMs.
*
* @return the last access time in ms
*/
public long getLastAccessTimeMs() {
Expand All @@ -88,7 +85,7 @@ public ResourceInternal(T resource) {
}

/**
* Options to initialize a Dynaminic resource pool.
* Options to initialize a Dynamic resource pool.
*/
public static class Options {
private int mMaxCapacity = 1024;
Expand Down Expand Up @@ -126,7 +123,7 @@ public int getGcIntervalMs() {

/**
* @param maxCapacity the max capacity
* @return the current object
* @return the updated object
*/
public Options setMaxCapacity(int maxCapacity) {
mMaxCapacity = maxCapacity;
Expand All @@ -135,7 +132,7 @@ public Options setMaxCapacity(int maxCapacity) {

/**
* @param minCapacity the min capacity
* @return the current object
* @return the updated object
*/
public Options setMinCapacity(int minCapacity) {
mMinCapacity = minCapacity;
Expand All @@ -144,7 +141,7 @@ public Options setMinCapacity(int minCapacity) {

/**
* @param initialDelayMs the initial delay
* @return the current object
* @return the updated object
*/
public Options setInitialDelayMs(int initialDelayMs) {
mInitialDelayMs = initialDelayMs;
Expand All @@ -153,18 +150,14 @@ public Options setInitialDelayMs(int initialDelayMs) {

/**
* @param gcIntervalMs the gc interval
* @return the current object
* @return the updated object
*/
public Options setGcIntervalMs(int gcIntervalMs) {
mGcIntervalMs = gcIntervalMs;
return this;
}

/**
* Creates an option instance.
*/
public Options() {
}
private Options() {} // prevents instantiation

/**
* @return the default option
Expand Down Expand Up @@ -238,7 +231,7 @@ public void run() {
}

for (T resource : resourcesToGc) {
LOG.info("Resource {} is garbage collected.", resource.toString());
LOG.info("Resource {} is garbage collected.", resource);
closeResource(resource);
}
}
Expand All @@ -256,8 +249,8 @@ public T acquire() throws IOException {
try {
return acquire(100 /* no timeout */, TimeUnit.DAYS);
} catch (TimeoutException e) {
LOG.error("Never should timeout in acquire().");
throw new RuntimeException(e);
// Never should timeout in acquire().
throw Throwables.propagate(e);
}
}

Expand All @@ -267,8 +260,8 @@ public T acquire() throws IOException {
* This method is like {@link #acquire()}, but it will time out if an object cannot be
* acquired before the specified amount of time.
*
* @param time an amount of time to wait, null to wait indefinitely
* @param unit the unit to use for time, null to wait indefinitely
* @param time an amount of time to wait
* @param unit the unit to use for time
* @return a resource taken from the pool
* @throws IOException if it fails to acquire because of the failure to create a new resource
* @throws TimeoutException if it fails to acquire because of time out
Expand All @@ -283,7 +276,7 @@ public T acquire(long time, TimeUnit unit) throws IOException, TimeoutException
if (isHealthy(resource.mResource)) {
return resource.mResource;
} else {
LOG.info("Clearing unhealthy resource {}.", resource.mResource.toString());
LOG.info("Clearing unhealthy resource {}.", resource.mResource);
closeResource(resource.mResource);
remove(resource.mResource);
return acquire(time, unit);
Expand All @@ -294,32 +287,31 @@ public T acquire(long time, TimeUnit unit) throws IOException, TimeoutException
// If the resource pool is empty but capacity is not yet full, create a new resource.
T newResource = createNewResource();
ResourceInternal<T> resourceInternal = new ResourceInternal<>(newResource);
// It is possible that we have more resources than mMaxCapacity here, just insert it
// instead of waiting.
add(resourceInternal);
return newResource;
if (add(resourceInternal)) {
return newResource;
} else {
closeResource(newResource);
}
}

// Otherwise, try to take a resource from the pool, blocking if none are available.
try {
mLock.lockInterruptibly();
try {
while (true) {
resource = poll();
if (resource != null) {
break;
}
long currTimeMs = System.currentTimeMillis();
if (currTimeMs >= endTimeMs || !mNotEmpty
.await(endTimeMs - currTimeMs, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Acquire resource times out.");
}
mLock.lock();
while (true) {
resource = poll();
if (resource != null) {
break;
}
long currTimeMs = System.currentTimeMillis();
if (currTimeMs >= endTimeMs || !mNotEmpty
.await(endTimeMs - currTimeMs, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Acquire resource times out.");
}
} finally {
mLock.unlock();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
throw Throwables.propagate(e);
} finally {
mLock.unlock();
}

if (isHealthy(resource.mResource)) {
Expand Down Expand Up @@ -401,11 +393,17 @@ private boolean isFull() {
* Adds a newly created resource to the pool. The resource is not available when it is added.
*
* @param resource
* @return true if the resource is successfully added
*/
private void add(ResourceInternal<T> resource) {
private boolean add(ResourceInternal<T> resource) {
try {
mLock.lock();
mResources.put(resource.mResource, resource);
if (mResources.size() >= mMaxCapacity) {
return false;
} else {
mResources.put(resource.mResource, resource);
return true;
}
} finally {
mLock.unlock();
}
Expand Down

0 comments on commit d8a6dbf

Please sign in to comment.