Skip to content

Commit

Permalink
add user rdf types cache (#1935)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwinckles committed Sep 16, 2021
1 parent 57f5974 commit 1d9929e
Show file tree
Hide file tree
Showing 29 changed files with 573 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class FedoraPropsConfig extends BasePropsConfig {
@Value("${fcrepo.cache.db.containment.timeout.minutes:10}")
private long containmentCacheTimeout;

@Value("${fcrepo.cache.types.size.entries:1024}")
private long userTypesCacheSize;

@Value("${fcrepo.cache.types.timeout.minutes:10}")
private long userTypesCacheTimeout;

@Value("${fcrepo.cache.webac.acl.size.entries:1024}")
private long webacCacheSize;

Expand Down Expand Up @@ -343,6 +349,34 @@ public long getContainmentCacheTimeout() {
return containmentCacheTimeout;
}

/**
* @return The number of entries in the user types cache.
*/
public long getUserTypesCacheSize() {
return userTypesCacheSize;
}

/**
* @param userTypesCacheSize user types cache size
*/
public void setUserTypesCacheSize(final long userTypesCacheSize) {
this.userTypesCacheSize = userTypesCacheSize;
}

/**
* @return The number of minutes before items in the user types cache expire.
*/
public long getUserTypesCacheTimeout() {
return userTypesCacheTimeout;
}

/**
* @param userTypesCacheTimeout user types cache timeout
*/
public void setUserTypesCacheTimeout(final long userTypesCacheTimeout) {
this.userTypesCacheTimeout = userTypesCacheTimeout;
}

/**
* @return The number of entries in the WebAC effective ACL cache.
*/
Expand Down
2 changes: 1 addition & 1 deletion fcrepo-kernel-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>org.fcrepo</groupId>
<artifactId>fcrepo-configs</artifactId>
<version>6.1.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to DuraSpace under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership.
*
* DuraSpace licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.api.cache;

import java.net.URI;
import java.util.List;
import java.util.function.Supplier;

import org.fcrepo.kernel.api.RdfStream;
import org.fcrepo.kernel.api.identifiers.FedoraId;

/**
* Cache of user RDF types. This cache has two levels. Types are cached at the session level as well as globally.
* This is necessary to support long-running transactions that can span multiple requests.
*
* @author pwinckles
*/
public interface UserTypesCache {

/**
* Gets the user RDF types for the specified resource from the cache. First, the session's cache is checked.
* If the types were not found, then the global cache is checked. If not in either cache, then the rdfProvider
* is called to load the resource's RDF from which the types are parsed, cached, and returned.
*
* This method should NOT be called on binary resources.
*
* @param resourceId the id of the resource
* @param sessionId the id of the current session
* @param rdfProvider the provider that is called, if needed, to load the resource's rdf
* @return the resource's user RDF types
*/
List<URI> getUserTypes(final FedoraId resourceId,
final String sessionId,
final Supplier<RdfStream> rdfProvider);

/**
* Extracts the user RDF types from the RDF and caches them in the session level cache.
*
* @param resourceId the id of the resource
* @param rdf the resource's RDF
* @param sessionId the session to cache the types in
*/
void cacheUserTypes(final FedoraId resourceId,
final RdfStream rdf,
final String sessionId);

/**
* Caches the user RDF types in the session level cache.
*
* @param resourceId the id of the resource
* @param userTypes the resource's types
* @param sessionId the session to cache the types in
*/
void cacheUserTypes(final FedoraId resourceId,
final List<URI> userTypes,
final String sessionId);

/**
* Merges the session level cache into the global cache.
*
* @param sessionId the id of the session to merge
*/
void mergeSessionCache(final String sessionId);

/**
* Drops a session level cache without merging it into the global cache.
*
* @param sessionId the id of the session cache to drop
*/
void dropSessionCache(final String sessionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public interface FedoraResource {
*/
FedoraResource getParent() throws PathNotFoundException;

/**
* Get the FedoraId of this resource's parent
*
* @return the parent resource's id
*/
FedoraId getParentId();

/**
* Get the children of this resource
* @return a stream of Fedora resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.fcrepo.kernel.api.ContainmentIndex;
import org.fcrepo.kernel.api.Transaction;
import org.fcrepo.kernel.api.TransactionState;
import org.fcrepo.kernel.api.cache.UserTypesCache;
import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.api.exception.TransactionClosedException;
import org.fcrepo.kernel.api.exception.TransactionRuntimeException;
Expand All @@ -36,6 +37,7 @@
import org.fcrepo.kernel.api.services.ReferenceService;
import org.fcrepo.persistence.api.PersistentStorageSession;
import org.fcrepo.search.api.SearchIndex;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -160,6 +162,10 @@ public synchronized void rollback() {
this.getEventAccumulator().clearEvents(this);
});

execQuietly("Failed to clear user rdf types cache in transaction " + id, () -> {
this.getUserTypesCache().dropSessionCache(id);
});

updateState(TransactionState.ROLLEDBACK);

releaseLocks();
Expand Down Expand Up @@ -301,6 +307,7 @@ private void doCommitShortLived() {
// short-lived txs do not write to tx tables and do not need to commit db indexes.
this.getPersistentSession().prepare();
this.getPersistentSession().commit();
this.getUserTypesCache().mergeSessionCache(id);
}

private void doCommitLongRunning() {
Expand All @@ -316,6 +323,7 @@ private void doCommitLongRunning() {
// db's connection timeout may need to be adjusted so that the connection is not closed while
// waiting for the OCFL changes to be committed.
this.getPersistentSession().commit();
this.getUserTypesCache().mergeSessionCache(id);
});
}

Expand Down Expand Up @@ -398,6 +406,10 @@ private ResourceLockManager getResourceLockManger() {
return this.txManager.getResourceLockManager();
}

private UserTypesCache getUserTypesCache() {
return this.txManager.getUserTypesCache();
}

@Override
public String toString() {
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.fcrepo.kernel.api.ContainmentIndex;
import org.fcrepo.kernel.api.Transaction;
import org.fcrepo.kernel.api.TransactionManager;
import org.fcrepo.kernel.api.cache.UserTypesCache;
import org.fcrepo.kernel.api.exception.TransactionClosedException;
import org.fcrepo.kernel.api.exception.TransactionNotFoundException;
import org.fcrepo.kernel.api.lock.ResourceLockManager;
Expand Down Expand Up @@ -85,6 +86,9 @@ public class TransactionManagerImpl implements TransactionManager {
@Inject
private DbTransactionExecutor dbTransactionExecutor;

@Inject
private UserTypesCache userTypesCache;

TransactionManagerImpl() {
transactions = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -188,6 +192,10 @@ protected ResourceLockManager getResourceLockManager() {
return resourceLockManager;
}

protected UserTypesCache getUserTypesCache() {
return userTypesCache;
}

public DbTransactionExecutor getDbTransactionExecutor() {
return dbTransactionExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to DuraSpace under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership.
*
* DuraSpace licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.impl.cache;

import static java.util.stream.Collectors.toList;
import static org.apache.jena.vocabulary.RDF.type;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.fcrepo.config.FedoraPropsConfig;
import org.fcrepo.kernel.api.RdfStream;
import org.fcrepo.kernel.api.ReadOnlyTransaction;
import org.fcrepo.kernel.api.cache.UserTypesCache;
import org.fcrepo.kernel.api.identifiers.FedoraId;

import org.apache.jena.graph.Triple;
import org.springframework.stereotype.Component;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

/**
* Default UserTypesCache implementation
*
* @author pwinckles
*/
@Component
public class UserTypesCacheImpl implements UserTypesCache {

private final Cache<FedoraId, List<URI>> globalCache;
private final Map<String, Cache<FedoraId, List<URI>>> sessionCaches;

public UserTypesCacheImpl(final FedoraPropsConfig config) {
this.globalCache = Caffeine.newBuilder()
.maximumSize(config.getUserTypesCacheSize())
.expireAfterAccess(config.getUserTypesCacheTimeout(), TimeUnit.MINUTES)
.build();
this.sessionCaches = new ConcurrentHashMap<>();
}

/**
* {@inheritDoc}
*/
@Override
public List<URI> getUserTypes(final FedoraId resourceId,
final String sessionId,
final Supplier<RdfStream> rdfProvider) {
if (isNotReadOnlySession(sessionId)) {
final var sessionCache = getSessionCache(sessionId);

return sessionCache.get(resourceId, k -> {
return globalCache.get(resourceId, k2 -> {
return extractRdfTypes(rdfProvider.get());
});
});
} else {
return globalCache.get(resourceId, k -> {
return extractRdfTypes(rdfProvider.get());
});
}
}

/**
* {@inheritDoc}
*/
@Override
public void cacheUserTypes(final FedoraId resourceId,
final RdfStream rdf,
final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
getSessionCache(sessionId).put(resourceId, extractRdfTypes(rdf));
}
}

/**
* {@inheritDoc}
*/
@Override
public void cacheUserTypes(final FedoraId resourceId,
final List<URI> userTypes,
final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
getSessionCache(sessionId).put(resourceId, userTypes);
}
}

/**
* {@inheritDoc}
*/
@Override
public void mergeSessionCache(final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
final var sessionCache = getSessionCache(sessionId);
globalCache.putAll(sessionCache.asMap());
dropSessionCache(sessionId);
}
}

/**
* {@inheritDoc}
*/
@Override
public void dropSessionCache(final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
sessionCaches.remove(sessionId);
}
}

private Cache<FedoraId, List<URI>> getSessionCache(final String sessionId) {
return sessionCaches.computeIfAbsent(sessionId, k -> {
return Caffeine.newBuilder()
.maximumSize(1024)
.build();
});
}

private List<URI> extractRdfTypes(final RdfStream rdf) {
return rdf.filter(t -> t.predicateMatches(type.asNode()))
.map(Triple::getObject)
.map(t -> URI.create(t.toString()))
.collect(toList());
}

private boolean isNotReadOnlySession(final String sessionId) {
return !ReadOnlyTransaction.READ_ONLY_TX_ID.equals(sessionId);
}
}

0 comments on commit 1d9929e

Please sign in to comment.