Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-10494 - Add cache to reduce number of remote blobstore calls. #1155

Open
wants to merge 20 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.security.MessageDigest;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,7 +45,7 @@
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.MultiDataStoreAware;
import org.apache.jackrabbit.guava.common.cache.CacheLoader;
import org.apache.jackrabbit.oak.commons.properties.SystemPropertySupplier;
import org.apache.jackrabbit.oak.plugins.blob.datastore.TypedDataStore;
import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
Expand All @@ -58,6 +59,9 @@

import org.apache.jackrabbit.guava.common.base.Function;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.cache.Cache;
import org.apache.jackrabbit.guava.common.cache.CacheBuilder;
import org.apache.jackrabbit.guava.common.cache.CacheLoader;
import org.apache.jackrabbit.guava.common.collect.ImmutableList;
import org.apache.jackrabbit.guava.common.collect.Iterators;
import org.apache.jackrabbit.guava.common.io.Closeables;
Expand All @@ -75,6 +79,8 @@
* <param name="{@link #setUploadThreads(int) uploadThreads}" value="10"/>
* <param name="{@link #setStagingPurgeInterval(int) stagingPurgeInterval}" value="300"/>
* <param name="{@link #setStagingRetryInterval(int) stagingRetryInterval} " value="600"/>
* <param name="{@link #setRecordCacheSize(long) recordCacheSize}" value="10000"/>
* <param name="{@link #setRecordCacheExpiration(long) recordCacheExpiration}" value="15"/>
* </DataStore>
* </pre>
*/
Expand Down Expand Up @@ -146,6 +152,29 @@ public abstract class AbstractSharedCachingDataStore extends AbstractDataStore

protected ExecutorService executor;

/**
* DataRecord cache
*/
protected Optional<Cache<String, DataRecord>> recordCache = Optional.empty();

/**
* DataRecord cache size
*/
private long recordCacheSize = SystemPropertySupplier
.create("oak.blob.recordcache.size", 10000L)
.loggingTo(LOG)
.formatSetMessage( (name, value) -> String.format("%s set to: %s", name, value) )
.get();

/**
* DataRecord cache expiration in minutes
*/
private long recordCacheExpiration = SystemPropertySupplier
.create("oak.blob.recordcache.expiration", 15)
.loggingTo(LOG)
.formatSetMessage( (name, value) -> String.format("%s set to: %s minutes", name, value) )
.get();

public void init(String homeDir) throws DataStoreException {
if (path == null) {
path = homeDir + "/repository/datastore";
Expand Down Expand Up @@ -179,6 +208,13 @@ public void init(String homeDir) throws DataStoreException {
}
}, statisticsProvider, listeningExecutor, schedulerExecutor, executor, stagingPurgeInterval,
stagingRetryInterval);
if (recordCacheSize > 0) {
this.recordCache = Optional.of(CacheBuilder
.newBuilder()
.maximumSize(recordCacheSize)
.expireAfterAccess(recordCacheExpiration, TimeUnit.MINUTES)
.build());
}
}

protected abstract AbstractSharedBackend createBackend();
Expand Down Expand Up @@ -208,7 +244,12 @@ public DataRecord getRecordIfStored(DataIdentifier dataIdentifier)
} else {
// Return the metadata from backend and lazily load the stream
try {
DataRecord rec = backend.getRecord(dataIdentifier);
DataRecord rec;
if (recordCache.isPresent()) {
rec = recordCache.get().get(dataIdentifier.toString(), () -> backend.getRecord(dataIdentifier));
} else {
rec = backend.getRecord(dataIdentifier);
}
return new FileCacheDataRecord(this, backend, dataIdentifier, rec.getLength(),
tmp, rec.getLastModified());
} catch (Exception e) {
Expand Down Expand Up @@ -287,6 +328,7 @@ public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
@Override
public void deleteRecord(DataIdentifier dataIdentifier) throws DataStoreException {
cache.invalidate(dataIdentifier.toString());
recordCache.ifPresent( c -> c.invalidate(dataIdentifier.toString()) );
backend.deleteRecord(dataIdentifier);
}

Expand Down Expand Up @@ -416,6 +458,14 @@ public void setStatisticsProvider(StatisticsProvider statisticsProvider) {
this.statisticsProvider = statisticsProvider;
}

public void setRecordCacheSize(long recordCacheSize) {
this.recordCacheSize = recordCacheSize;
}

public void setRecordCacheExpiration(long recordCacheExpiration) {
this.recordCacheExpiration = recordCacheExpiration;
}

/**------------------------ SharedDataStore methods -----------------------------------------**/

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public TestFutureCallback(CountDownLatch latch) {
static class TestMemoryBackend extends AbstractSharedBackend {
final Map<DataIdentifier, File> _backend = Maps.newHashMap();
private final File root;
private long backendResponseDelay = 0L;

public TestMemoryBackend(File root) {
this.root = root;
Expand Down Expand Up @@ -336,6 +337,12 @@ public TestMemoryBackend(File root) {
}

@Override public DataRecord getRecord(DataIdentifier id) throws DataStoreException {
if (backendResponseDelay > 0) {
try {
Thread.sleep(backendResponseDelay);
} catch (InterruptedException e) {
}
}
if (_backend.containsKey(id)) {
final File f = _backend.get(id);
return new AbstractDataRecord(this, id) {
Expand Down Expand Up @@ -415,6 +422,10 @@ public TestMemoryBackend(File root) {
public String getReferenceFromIdentifier(DataIdentifier identifier) {
return super.getReferenceFromIdentifier(identifier);
}

public void setBackendResponseDelay(long backendResponseDelay) {
this.backendResponseDelay = backendResponseDelay;
}
}

static InputStream randomStream(int seed, int size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public void setup() throws Exception {
}

private void init(int i, int cacheSize, int uploadSplit) throws Exception {
init(i, cacheSize, uploadSplit, 0L, 0L);
}

private void init(int i, int cacheSize, int uploadSplit, long recordCacheSize, long backendResponseDelay) throws Exception {
LOG.info("Starting init");

// create executor
Expand All @@ -117,6 +121,7 @@ private void init(int i, int cacheSize, int uploadSplit) throws Exception {

backendRoot = folder.newFolder();
final TestMemoryBackend testBackend = new TestMemoryBackend(backendRoot);
testBackend.setBackendResponseDelay(backendResponseDelay);
this.backend = testBackend;

dataStore = new AbstractSharedCachingDataStore() {
Expand All @@ -136,6 +141,7 @@ private void init(int i, int cacheSize, int uploadSplit) throws Exception {
dataStore.executor = newDirectExecutorService();
dsPath = new File(root.getAbsolutePath(), "ds").getAbsolutePath();
dataStore.setPath(dsPath);
dataStore.setRecordCacheSize(recordCacheSize);
dataStore.init(root.getAbsolutePath());

LOG.info("Finished init");
Expand Down Expand Up @@ -638,4 +644,41 @@ private void waitFinish() {
e.printStackTrace();
}
}

@Test
public void performanceGetRecordIfStored() throws Exception {
tear();
final int iterations = 100;
final long backendResponseDelay = 4L;

final File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
final String id = getIdForInputStream(f);
final DataIdentifier di = new DataIdentifier(id);

init(1, 64 * 1024 * 1024, 10, 0L, backendResponseDelay);
// we write directly to the backend because that's the situation we have in a shared remote datastore:
// the file was already written by a different datastore but isn't present in the datastore.cache.
backend.write(di, f);

long start = System.nanoTime();
for (int i = 0; i < iterations; ++i) {
LOG.trace("" + dataStore.getRecordIfStored(di)); // LOG.trace to avoid the call being optimised away
}
long timeUncached = System.nanoTime() - start;

tear();

init(1, 64 * 1024 * 1024, 10, 10000L, backendResponseDelay);
// we write directly to the backend because that's the situation we have in a shared remote datastore:
// the file was already written by a different datastore but isn't present in the datastore.cache.
backend.write(di, f);

start = System.nanoTime();
for (int i = 0; i < iterations; ++i) {
LOG.trace("" + dataStore.getRecordIfStored(di)); // LOG.trace to avoid the call being optimised away
}
long timeCached = System.nanoTime() - start;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Now I understood why you are adding a delay. The assertion with time can be quite fragile. Can't we just assert on the cache object not being empty and presence of the record with identifier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wanted to "prove" the effectiveness of the cache but although I've only checked for a 5x improvement where the difference should be at least 100x for that test case, I agree it is still fragile and should be avoided.

I've changed the test to only check that the record is in the cache after the first access, and to ensure that it is loaded from the cache when accessed a second time. Is that what you meant?

Copy link
Contributor

@amit-jain amit-jain Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. If you need to add test performance numbers maybe you can also add to oak-benchmarks.

assertTrue(String.format("timeCached: %d, timeUncached: %d", timeCached, timeUncached), 5 * timeCached < timeUncached);
}
}