Skip to content

Commit

Permalink
Implement async write into local cache
Browse files Browse the repository at this point in the history
pr-link: #11151
change-id: cid-b946864a1bc17515f3ad7d112a3e4634e8cd40c5
  • Loading branch information
apc999 committed Mar 12, 2020
1 parent 1c8b981 commit ba9756c
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 11 deletions.
Expand Up @@ -12,6 +12,7 @@
package alluxio.client.file.cache;

import alluxio.client.file.cache.store.PageStoreOptions;
import alluxio.collections.ConcurrentHashSet;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
Expand All @@ -33,6 +34,11 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
Expand Down Expand Up @@ -67,6 +73,7 @@ public class LocalCacheManager implements CacheManager {
private static final int LOCK_SIZE = 1024;
private final long mPageSize;
private final long mCacheSize;
private final boolean mAsyncWrite;
private final CacheEvictor mEvictor;
/** A readwrite lock pool to guard individual pages based on striping. */
private final ReadWriteLock[] mPageLocks = new ReentrantReadWriteLock[LOCK_SIZE];
Expand All @@ -75,6 +82,9 @@ public class LocalCacheManager implements CacheManager {
private final ReadWriteLock mMetaLock = new ReentrantReadWriteLock();
@GuardedBy("mMetaLock")
private final MetaStore mMetaStore;
/** Executor service for execute the async cache tasks. */
private final ExecutorService mAsyncCacheExecutor;
private final ConcurrentHashSet<PageId> mPendingRequests;

/**
* Restores a page store a the configured location, updating meta store and evictor.
Expand All @@ -85,8 +95,8 @@ public class LocalCacheManager implements CacheManager {
* @param evictor evictor
* @return whether the restore succeeds or not
*/
private static boolean restore(
PageStore pageStore, PageStoreOptions options, MetaStore metaStore, CacheEvictor evictor) {
private static boolean restore(PageStore pageStore, PageStoreOptions options, MetaStore metaStore,
CacheEvictor evictor) {
LOG.info("Restore PageStore with {}", options);
Path rootDir = Paths.get(options.getRootDir());
if (!Files.exists(rootDir)) {
Expand All @@ -104,17 +114,16 @@ private static boolean restore(
metaStore.addPage(pageInfo.getPageId(), pageInfo);
evictor.updateOnPut(pageInfo.getPageId());
if (metaStore.bytes() > pageStore.getCacheSize()) {
LOG.error("Loaded pages exceed cache capacity ({} bytes)",
pageStore.getCacheSize());
LOG.error("Loaded pages exceed cache capacity ({} bytes)", pageStore.getCacheSize());
return false;
}
}
} catch (Exception e) {
LOG.error("Failed to restore PageStore", e);
return false;
}
LOG.info("Restored PageStore with {} existing pages and {} bytes",
metaStore.pages(), metaStore.bytes());
LOG.info("Restored PageStore with {} existing pages and {} bytes", metaStore.pages(),
metaStore.bytes());
return true;
}

Expand Down Expand Up @@ -156,16 +165,24 @@ public static LocalCacheManager create(AlluxioConfiguration conf) throws IOExcep
* @param pageStore the page store manages the cache data
*/
@VisibleForTesting
LocalCacheManager(AlluxioConfiguration conf, MetaStore metaStore,
PageStore pageStore, CacheEvictor evictor) {
LocalCacheManager(AlluxioConfiguration conf, MetaStore metaStore, PageStore pageStore,
CacheEvictor evictor) {
mMetaStore = metaStore;
mPageStore = pageStore;
mEvictor = evictor;
mPageSize = conf.getBytes(PropertyKey.USER_CLIENT_CACHE_PAGE_SIZE);
mAsyncWrite = conf.getBoolean(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED);
mCacheSize = pageStore.getCacheSize();
for (int i = 0; i < LOCK_SIZE; i++) {
mPageLocks[i] = new ReentrantReadWriteLock();
}
mPendingRequests = new ConcurrentHashSet<>();
mAsyncCacheExecutor =
mAsyncWrite
? new ThreadPoolExecutor(conf.getInt(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_THREADS),
conf.getInt(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_THREADS), 60,
TimeUnit.SECONDS, new SynchronousQueue<>())
: null;
Metrics.registerGauges(mCacheSize, mMetaStore);
}

Expand Down Expand Up @@ -209,6 +226,37 @@ private Pair<ReadWriteLock, ReadWriteLock> getPageLockPair(PageId pageId1, PageI
@Override
public boolean put(PageId pageId, byte[] page) {
LOG.debug("put({},{} bytes) enters", pageId, page.length);
if (!mAsyncWrite) {
boolean inserted = putInternal(pageId, page);
LOG.debug("put({},{} bytes) exits: {}", pageId, page.length, inserted);
return inserted;
}

if (!mPendingRequests.add(pageId)) { // already queued
return false;
}
try {
mAsyncCacheExecutor.submit(() -> {
try {
putInternal(pageId, page);
} finally {
mPendingRequests.remove(pageId);
}
});
} catch (RejectedExecutionException e) { // queue is full, skip
// RejectedExecutionException may be thrown in extreme cases when the
// highly concurrent caching workloads. In these cases, return false
mPendingRequests.remove(pageId);
Metrics.PUT_ERRORS.inc();
LOG.debug("put({},{} bytes) fails due to full queue", pageId, page.length);
return false;
}
LOG.debug("put({},{} bytes) exits with async write", pageId, page.length);
return true;
}

private boolean putInternal(PageId pageId, byte[] page) {
LOG.debug("putInternal({},{} bytes) enters", pageId, page.length);
PageId victim = null;
PageInfo victimPageInfo = null;
boolean enoughSpace;
Expand All @@ -229,7 +277,7 @@ public boolean put(PageId pageId, byte[] page) {
}
if (enoughSpace) {
boolean ret = addPage(pageId, page);
LOG.debug("put({},{} bytes) exits without eviction, success: {}", pageId, page.length, ret);
LOG.debug("Add page ({},{} bytes) without eviction: {}", pageId, page.length, ret);
return ret;
}
}
Expand Down Expand Up @@ -264,12 +312,13 @@ public boolean put(PageId pageId, byte[] page) {
}
if (enoughSpace) {
boolean ret = addPage(pageId, page);
LOG.debug("put({},{} bytes) exits after evicting ({}), success: {}", pageId, page.length,
LOG.debug("Add page ({},{} bytes) after evicting ({}), success: {}", pageId, page.length,
victimPageInfo, ret);
return ret;
}
}
LOG.debug("put({},{} bytes) fails after evicting ({})", pageId, page.length, victimPageInfo);
LOG.debug("putInternal({},{} bytes) fails after evicting ({})", pageId, page.length,
victimPageInfo);
return false;
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

import alluxio.ConfigurationTestUtils;
import alluxio.Constants;
import alluxio.client.file.cache.store.LocalPageStore;
import alluxio.client.file.cache.store.PageStoreOptions;
import alluxio.client.file.cache.store.PageStoreType;
import alluxio.conf.InstancedConfiguration;
Expand All @@ -38,6 +39,8 @@
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -66,6 +69,7 @@ public void before() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_PAGE_SIZE, PAGE_SIZE_BYTES);
mConf.set(PropertyKey.USER_CLIENT_CACHE_SIZE, CACHE_SIZE_BYTES);
mConf.set(PropertyKey.USER_CLIENT_CACHE_DIR, mTemp.getRoot().getAbsolutePath());
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED, false);
mMetaStore = MetaStore.create();
mPageStore = PageStore.create(PageStoreOptions.create(mConf), true);
mEvictor = new FIFOEvictor(mMetaStore);
Expand Down Expand Up @@ -296,6 +300,77 @@ public void restoreFailed() throws Exception {
assertNull(mCacheManager.get(pageUuid));
}

@Test
public void asyncCache() throws Exception {
final int threads = 16;
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED, true);
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_THREADS, threads);
mConf.set(PropertyKey.USER_CLIENT_CACHE_STORE_TYPE, "LOCAL");
PutDelayedPageStore pageStore = new PutDelayedPageStore();
mCacheManager = new LocalCacheManager(mConf, mMetaStore, pageStore, mEvictor);
for (int i = 0; i < threads; i++) {
PageId pageId = new PageId("5", i);
assertTrue(mCacheManager.put(pageId, page(i, PAGE_SIZE_BYTES)));
}
// fail due to full queue
assertFalse(mCacheManager.put(PAGE_ID1, PAGE1));
pageStore.setHanging(false);
while (pageStore.getPuts() < threads) {
Thread.sleep(1000);
}
pageStore.setHanging(true);
for (int i = 0; i < threads; i++) {
PageId pageId = new PageId("6", i);
assertTrue(mCacheManager.put(pageId, page(i, PAGE_SIZE_BYTES)));
}
}

@Test
public void asyncCacheSamePage() throws Exception {
final int threads = 16;
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED, true);
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_THREADS, threads);
mConf.set(PropertyKey.USER_CLIENT_CACHE_STORE_TYPE, "LOCAL");
PutDelayedPageStore pageStore = new PutDelayedPageStore();
mCacheManager = new LocalCacheManager(mConf, mMetaStore, pageStore, mEvictor);
assertTrue(mCacheManager.put(PAGE_ID1, PAGE1));
assertFalse(mCacheManager.put(PAGE_ID1, PAGE1));
pageStore.setHanging(false);
while (pageStore.getPuts() < 1) {
Thread.sleep(1000);
}
pageStore.setHanging(true);
assertTrue(mCacheManager.put(PAGE_ID1, PAGE1));
}

/**
* A PageStore where put will always hang.
*/
public class PutDelayedPageStore extends LocalPageStore {
private AtomicBoolean mHanging = new AtomicBoolean(true);
private AtomicInteger mPut = new AtomicInteger(0);

public PutDelayedPageStore() {
super(PageStoreOptions.create(mConf).toOptions());
}

@Override
public void put(PageId pageId, byte[] page) throws IOException {
// never quit
while (mHanging.get()) {}
super.put(pageId, page);
mPut.getAndIncrement();
}

void setHanging(boolean value) {
mHanging.set(value);
}

int getPuts() {
return mPut.get();
}
}

/**
* Implementation of Evictor using FIFO eviction policy for the test.
*/
Expand Down
18 changes: 18 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -3202,6 +3202,20 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED =
new Builder(Name.USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED)
.setDefaultValue(true)
.setDescription("If this is enabled, cache data asynchronously.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_CLIENT_CACHE_ASYNC_WRITE_THREADS =
new Builder(Name.USER_CLIENT_CACHE_ASYNC_WRITE_THREADS)
.setDefaultValue(16)
.setDescription("Number of threads to asynchronously cache data.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_CLIENT_CACHE_ENABLED =
new Builder(Name.USER_CLIENT_CACHE_ENABLED)
.setDefaultValue(false)
Expand Down Expand Up @@ -4666,6 +4680,10 @@ public static final class Name {
"alluxio.user.block.worker.client.read.retry";
public static final String USER_BLOCK_WRITE_LOCATION_POLICY =
"alluxio.user.block.write.location.policy.class";
public static final String USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED =
"alluxio.user.client.cache.async.write.enabled";
public static final String USER_CLIENT_CACHE_ASYNC_WRITE_THREADS =
"alluxio.user.client.cache.async.write.threads";
public static final String USER_CLIENT_CACHE_ENABLED =
"alluxio.user.client.cache.enabled";
public static final String USER_CLIENT_CACHE_EVICTOR_CLASS =
Expand Down
Expand Up @@ -49,6 +49,7 @@ public final class LocalCacheFileInStreamIntegrationTest extends BaseIntegration
.setProperty(PropertyKey.USER_CLIENT_CACHE_SIZE, CACHE_SIZE_BYTES)
.setProperty(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT, Constants.MB)
.setProperty(PropertyKey.USER_CLIENT_CACHE_ENABLED, true)
.setProperty(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED, false)
.build();

@Rule
Expand Down
Expand Up @@ -64,6 +64,7 @@ public void before() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_SIZE, CACHE_SIZE_BYTES);
mConf.set(PropertyKey.USER_CLIENT_CACHE_ENABLED, true);
mConf.set(PropertyKey.USER_CLIENT_CACHE_DIR, mTemp.getRoot().getPath());
mConf.set(PropertyKey.USER_CLIENT_CACHE_ASYNC_WRITE_ENABLED, false);
}

@After
Expand Down

0 comments on commit ba9756c

Please sign in to comment.