Skip to content

Commit

Permalink
Improve the metric of shadow cache for relatively small working set
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Update the shadow cache metrics in 'get' method

Expose shadow cache metrics by calling the incrementCounts method in
CacheContext

### Why are the changes needed?
Without this change, the shadow cache metrics would be only updated in
'get' method, which might cause the size of working set inaccurate (far
below the real value) especially when the working set is lower than the
cache capacity.

Our customer can also get table level metrics by overriding the
incrementCounts method

pr-link: #13794
change-id: cid-831719572c6d1bbfcf9f9bbf9f9662cf4b702327
  • Loading branch information
beinan committed Jul 23, 2021
1 parent fd8ff2c commit 3422cb6
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

package alluxio.client.file.cache;

import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
import alluxio.client.file.CacheContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.metrics.MetricKey;
Expand Down Expand Up @@ -151,7 +150,7 @@ private Metrics() {} // prevent instantiation
* @return true if the put was successful, false otherwise
*/
default boolean put(PageId pageId, byte[] page) {
return put(pageId, page, CacheScope.GLOBAL, CacheQuota.UNLIMITED);
return put(pageId, page, CacheContext.defaults());
}

/**
Expand All @@ -160,11 +159,10 @@ default boolean put(PageId pageId, byte[] page) {
*
* @param pageId page identifier
* @param page page data
* @param cacheScope scope of this request
* @param cacheQuota cache quota
* @param cacheContext cache related context
* @return true if the put was successful, false otherwise
*/
boolean put(PageId pageId, byte[] page, CacheScope cacheScope, CacheQuota cacheQuota);
boolean put(PageId pageId, byte[] page, CacheContext cacheContext);

/**
* Reads the entire page if the queried page is found in the cache, stores the result in buffer.
Expand All @@ -190,7 +188,25 @@ default int get(PageId pageId, int bytesToRead, byte[] buffer, int offsetInBuffe
* @param offsetInBuffer offset in the destination buffer to write
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, int offsetInBuffer);
default int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
int offsetInBuffer) {
return get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer, CacheContext.defaults());
}

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in
* buffer.
*
* @param pageId page identifier
* @param pageOffset offset into the page
* @param bytesToRead number of bytes to read in this page
* @param buffer destination buffer to write
* @param offsetInBuffer offset in the destination buffer to write
* @param cacheContext cache related context
* @return number of bytes read, 0 if page is not found, -1 on errors
*/
int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, int offsetInBuffer,
CacheContext cacheContext);

/**
* Deletes a page from the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
import alluxio.client.file.CacheContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.metrics.MetricKey;
Expand Down Expand Up @@ -108,24 +107,27 @@ public void updateWorkingSetSize() {
}

@Override
public boolean put(PageId pageId, byte[] page, CacheScope cacheScope, CacheQuota cacheQuota) {
public boolean put(PageId pageId, byte[] page, CacheContext cacheContext) {
updateBloomFilterAndWorkingSet(pageId, page.length, cacheContext);
return mCacheManager.put(pageId, page, cacheContext);
}

private void updateBloomFilterAndWorkingSet(PageId pageId, int pageLength,
CacheContext cacheContext) {
int filterIndex = mCurrentSegmentFilterIndex;
BloomFilter<PageId> bf = mSegmentBloomFilters.get(filterIndex);
if (!bf.mightContain(pageId)) {
bf.put(pageId);
mObjEachBloomFilter.getAndIncrement(filterIndex);
mByteEachBloomFilter.getAndAdd(filterIndex, page.length);
mByteEachBloomFilter.getAndAdd(filterIndex, pageLength);
mWorkingSetBloomFilter.put(pageId);
updateFalsePositiveRatio();

long oldPages = Metrics.SHADOW_CACHE_PAGES.getCount();
mShadowCachePages = (int) mWorkingSetBloomFilter.approximateElementCount();
Metrics.SHADOW_CACHE_PAGES.inc(mShadowCachePages - oldPages);
long oldBytes = Metrics.SHADOW_CACHE_BYTES.getCount();
mShadowCacheBytes = (long) (mShadowCachePages * mAvgPageSize);
Metrics.SHADOW_CACHE_BYTES.inc(mShadowCacheBytes - oldBytes);
updateWorkingSetSize();
if (cacheContext != null) {
cacheContext
.incrementCounter(MetricKey.CLIENT_CACHE_SHADOW_CACHE_BYTES.getName(), pageLength);
}
}
return mCacheManager.put(pageId, page, cacheScope, cacheQuota);
}

/**
Expand Down Expand Up @@ -217,7 +219,7 @@ public long getShadowCacheByteHit() {

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
int offsetInBuffer) {
int offsetInBuffer, CacheContext cacheContext) {
boolean seen = false;
for (int i = 0; i < mSegmentBloomFilters.length(); ++i) {
seen |= mSegmentBloomFilters.get(i).mightContain(pageId);
Expand All @@ -227,12 +229,14 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
Metrics.SHADOW_CACHE_BYTES_HIT.inc(bytesToRead);
mShadowCachePageHit.getAndIncrement();
mShadowCacheByteHit.getAndAdd(bytesToRead);
} else {
updateBloomFilterAndWorkingSet(pageId, bytesToRead, cacheContext);
}
Metrics.SHADOW_CACHE_PAGES_READ.inc();
Metrics.SHADOW_CACHE_BYTES_READ.inc(bytesToRead);
mShadowCachePageRead.getAndIncrement();
mShadowCacheByteRead.getAndAdd(bytesToRead);
return mCacheManager.get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer);
return mCacheManager.get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer, cacheContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import alluxio.client.file.CacheContext;
import alluxio.client.file.FileInStream;
import alluxio.client.file.URIStatus;
import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
Expand Down Expand Up @@ -48,10 +46,8 @@ public class LocalCacheFileInStream extends FileInStream {
/** Local store to store pages. */
private final CacheManager mCacheManager;
private final boolean mQuotaEnabled;
/** Scope of the file. */
private final CacheScope mCacheScope;
/** Cache Scope. */
private final CacheQuota mCacheQuota;
/** Cache related context of this file. */
private final CacheContext mCacheContext;
/** File info, fetched from external FS. */
private final URIStatus mStatus;
private final FileInStreamOpener mExternalFileInStreamOpener;
Expand Down Expand Up @@ -93,11 +89,9 @@ public LocalCacheFileInStream(URIStatus status, FileInStreamOpener fileOpener,
// Currently quota is only supported when it is set by external systems in status context
mQuotaEnabled = conf.getBoolean(PropertyKey.USER_CLIENT_CACHE_QUOTA_ENABLED);
if (mQuotaEnabled && status.getCacheContext() != null) {
mCacheQuota = status.getCacheContext().getCacheQuota();
mCacheScope = status.getCacheContext().getCacheScope();
mCacheContext = status.getCacheContext();
} else {
mCacheQuota = CacheQuota.UNLIMITED;
mCacheScope = CacheScope.GLOBAL;
mCacheContext = CacheContext.defaults();
}
Metrics.registerGauges();
}
Expand Down Expand Up @@ -145,7 +139,8 @@ private int readInternal(byte[] b, int off, int len, ReadType readType, long pos
pageId = new PageId(Long.toString(mStatus.getFileId()), currentPage);
}
int bytesRead =
mCacheManager.get(pageId, currentPageOffset, bytesLeftInPage, b, off + totalBytesRead);
mCacheManager.get(pageId, currentPageOffset, bytesLeftInPage, b, off + totalBytesRead,
mCacheContext);
if (bytesRead > 0) {
totalBytesRead += bytesRead;
currentPosition += bytesRead;
Expand All @@ -167,7 +162,7 @@ private int readInternal(byte[] b, int off, int len, ReadType readType, long pos
cacheContext.incrementCounter(
MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL.getMetricName(), bytesLeftInPage);
}
mCacheManager.put(pageId, page, mCacheScope, mCacheQuota);
mCacheManager.put(pageId, page, mCacheContext);
}
}
if (!isPositionedRead) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static alluxio.client.file.cache.CacheManager.State.READ_ONLY;
import static alluxio.client.file.cache.CacheManager.State.READ_WRITE;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageStoreOptions;
import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
Expand Down Expand Up @@ -246,15 +247,15 @@ private CacheScope checkScopeToEvict(int pageSize, CacheScope scope, CacheQuota
}

@Override
public boolean put(PageId pageId, byte[] page, CacheScope scope, CacheQuota quota) {
public boolean put(PageId pageId, byte[] page, CacheContext cacheContext) {
LOG.debug("put({},{} bytes) enters", pageId, page.length);
if (mState.get() != READ_WRITE) {
Metrics.PUT_NOT_READY_ERRORS.inc();
Metrics.PUT_ERRORS.inc();
return false;
}
if (!mAsyncWrite) {
boolean ok = putInternal(pageId, page, scope, quota);
boolean ok = putInternal(pageId, page, cacheContext);
LOG.debug("put({},{} bytes) exits: {}", pageId, page.length, ok);
if (!ok) {
Metrics.PUT_ERRORS.inc();
Expand All @@ -268,7 +269,7 @@ public boolean put(PageId pageId, byte[] page, CacheScope scope, CacheQuota quot
try {
mAsyncCacheExecutor.submit(() -> {
try {
boolean ok = putInternal(pageId, page, scope, quota);
boolean ok = putInternal(pageId, page, cacheContext);
if (!ok) {
Metrics.PUT_ERRORS.inc();
}
Expand All @@ -289,11 +290,11 @@ public boolean put(PageId pageId, byte[] page, CacheScope scope, CacheQuota quot
return true;
}

private boolean putInternal(PageId pageId, byte[] page, CacheScope scope, CacheQuota quota) {
private boolean putInternal(PageId pageId, byte[] page, CacheContext cacheContext) {
PutResult result = PutResult.OK;
boolean forcedToEvict = false;
for (int i = 0; i <= mMaxEvictionRetries; i++) {
result = putAttempt(pageId, page, scope, quota, forcedToEvict);
result = putAttempt(pageId, page, cacheContext, forcedToEvict);
switch (result) {
case OK:
return true;
Expand Down Expand Up @@ -326,7 +327,7 @@ private boolean putInternal(PageId pageId, byte[] page, CacheScope scope, CacheQ
return false;
}

private PutResult putAttempt(PageId pageId, byte[] page, CacheScope scope, CacheQuota quota,
private PutResult putAttempt(PageId pageId, byte[] page, CacheContext cacheContext,
boolean forcedToEvict) {
LOG.debug("putInternal({},{} bytes) enters", pageId, page.length);
PageInfo victimPageInfo = null;
Expand All @@ -339,9 +340,11 @@ private PutResult putAttempt(PageId pageId, byte[] page, CacheScope scope, Cache
// TODO(binfan): we should return more informative result in the future
return PutResult.OK;
}
scopeToEvict = checkScopeToEvict(page.length, scope, quota, forcedToEvict);
scopeToEvict = checkScopeToEvict(page.length, cacheContext.getCacheScope(),
cacheContext.getCacheQuota(), forcedToEvict);
if (scopeToEvict == null) {
mMetaStore.addPage(pageId, new PageInfo(pageId, page.length, scope));
mMetaStore
.addPage(pageId, new PageInfo(pageId, page.length, cacheContext.getCacheScope()));
} else {
if (mQuotaEnabled) {
victimPageInfo = ((QuotaMetaStore) mMetaStore).evict(scopeToEvict);
Expand Down Expand Up @@ -394,9 +397,11 @@ private PutResult putAttempt(PageId pageId, byte[] page, CacheScope scope, Cache
return PutResult.BENIGN_RACING;
}
// Check if we are able to insert page after evicting victim page
scopeToEvict = checkScopeToEvict(page.length, scope, quota, false);
scopeToEvict = checkScopeToEvict(page.length, cacheContext.getCacheScope(),
cacheContext.getCacheQuota(), false);
if (scopeToEvict == null) {
mMetaStore.addPage(pageId, new PageInfo(pageId, page.length, scope));
mMetaStore
.addPage(pageId, new PageInfo(pageId, page.length, cacheContext.getCacheScope()));
}
}
// phase2: remove victim and add new page in pagestore
Expand Down Expand Up @@ -449,7 +454,7 @@ private void undoAddPage(PageId pageId) {

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
int offsetInBuffer) {
int offsetInBuffer, CacheContext cacheContext) {
Preconditions.checkArgument(pageOffset <= mPageSize,
"Read exceeds page boundary: offset=%s size=%s", pageOffset, mPageSize);
Preconditions.checkArgument(bytesToRead <= buffer.length - offsetInBuffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

package alluxio.client.file.cache;

import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
import alluxio.client.file.CacheContext;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;

Expand Down Expand Up @@ -47,11 +46,11 @@ public boolean put(PageId pageId, byte[] page) {
}

@Override
public boolean put(PageId pageId, byte[] page, CacheScope cacheScope, CacheQuota cacheQuota) {
public boolean put(PageId pageId, byte[] page, CacheContext cacheContext) {
try {
return mCacheManager.put(pageId, page, cacheScope, cacheQuota);
return mCacheManager.put(pageId, page, cacheContext);
} catch (Exception e) {
LOG.error("Failed to put page {}, scope {}, quota {}", pageId, cacheScope, cacheQuota, e);
LOG.error("Failed to put page {}, cacheContext {}", pageId, cacheContext, e);
Metrics.PUT_ERRORS.inc();
return false;
}
Expand Down Expand Up @@ -80,6 +79,20 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer,
int offsetInBuffer, CacheContext cacheContext) {
try {
return mCacheManager
.get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer, cacheContext);
} catch (Exception e) {
LOG.error("Failed to get page {}, offset {} cacheContext {}", pageId, pageOffset,
cacheContext, e);
Metrics.GET_ERRORS.inc();
return -1;
}
}

@Override
public boolean delete(PageId pageId) {
try {
Expand Down
Loading

0 comments on commit 3422cb6

Please sign in to comment.