Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add user rdf types cache #1935

Merged
merged 4 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class FedoraPropsConfig extends BasePropsConfig {
@Value("${fcrepo.cache.db.containment.timeout.minutes:10}")
private long containmentCacheTimeout;

@Value("${fcrepo.cache.types.size.entries:1024}")
dbernstein marked this conversation as resolved.
Show resolved Hide resolved
private long userTypesCacheSize;

@Value("${fcrepo.cache.types.timeout.minutes:10}")
private long userTypesCacheTimeout;

@Value("${fcrepo.cache.webac.acl.size.entries:1024}")
private long webacCacheSize;

Expand Down Expand Up @@ -343,6 +349,34 @@ public long getContainmentCacheTimeout() {
return containmentCacheTimeout;
}

/**
* @return The number of entries in the user types cache.
*/
public long getUserTypesCacheSize() {
return userTypesCacheSize;
}

/**
* @param userTypesCacheSize user types cache size
*/
public void setUserTypesCacheSize(final long userTypesCacheSize) {
this.userTypesCacheSize = userTypesCacheSize;
}

/**
* @return The number of minutes before items in the user types cache expire.
*/
public long getUserTypesCacheTimeout() {
return userTypesCacheTimeout;
}

/**
* @param userTypesCacheTimeout user types cache timeout
*/
public void setUserTypesCacheTimeout(final long userTypesCacheTimeout) {
this.userTypesCacheTimeout = userTypesCacheTimeout;
}

/**
* @return The number of entries in the WebAC effective ACL cache.
*/
Expand Down
2 changes: 1 addition & 1 deletion fcrepo-kernel-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>org.fcrepo</groupId>
<artifactId>fcrepo-configs</artifactId>
<version>6.1.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to DuraSpace under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership.
*
* DuraSpace licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.api.cache;

import java.net.URI;
import java.util.List;
import java.util.function.Supplier;

import org.fcrepo.kernel.api.RdfStream;
import org.fcrepo.kernel.api.identifiers.FedoraId;

/**
* Cache of user RDF types. This cache has two levels. Types are cached at the session level as well as globally.
* This is necessary to support long-running transactions that can span multiple requests.
*
* @author pwinckles
*/
public interface UserTypesCache {

/**
* Gets the user RDF types for the specified resource from the cache. First, the session's cache is checked.
* If the types were not found, then the global cache is checked. If not in either cache, then the rdfProvider
* is called to load the resource's RDF from which the types are parsed, cached, and returned.
*
* This method should NOT be called on binary resources.
*
* @param resourceId the id of the resource
* @param sessionId the id of the current session
* @param rdfProvider the provider that is called, if needed, to load the resource's rdf
* @return the resource's user RDF types
*/
List<URI> getUserTypes(final FedoraId resourceId,
final String sessionId,
final Supplier<RdfStream> rdfProvider);

/**
* Extracts the user RDF types from the RDF and caches them in the session level cache.
*
* @param resourceId the id of the resource
* @param rdf the resource's RDF
* @param sessionId the session to cache the types in
*/
void cacheUserTypes(final FedoraId resourceId,
final RdfStream rdf,
final String sessionId);

/**
* Caches the user RDF types in the session level cache.
*
* @param resourceId the id of the resource
* @param userTypes the resource's types
* @param sessionId the session to cache the types in
*/
void cacheUserTypes(final FedoraId resourceId,
final List<URI> userTypes,
final String sessionId);

/**
* Merges the session level cache into the global cache.
*
* @param sessionId the id of the session to merge
*/
void mergeSessionCache(final String sessionId);

/**
* Drops a session level cache without merging it into the global cache.
*
* @param sessionId the id of the session cache to drop
*/
void dropSessionCache(final String sessionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public interface FedoraResource {
*/
FedoraResource getParent() throws PathNotFoundException;

/**
* Get the FedoraId of this resource's parent
*
* @return the parent resource's id
*/
FedoraId getParentId();

/**
* Get the children of this resource
* @return a stream of Fedora resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.fcrepo.kernel.api.ContainmentIndex;
import org.fcrepo.kernel.api.Transaction;
import org.fcrepo.kernel.api.TransactionState;
import org.fcrepo.kernel.api.cache.UserTypesCache;
import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.api.exception.TransactionClosedException;
import org.fcrepo.kernel.api.exception.TransactionRuntimeException;
Expand All @@ -36,6 +37,7 @@
import org.fcrepo.kernel.api.services.ReferenceService;
import org.fcrepo.persistence.api.PersistentStorageSession;
import org.fcrepo.search.api.SearchIndex;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -161,6 +163,10 @@ public synchronized void rollback() {
this.getEventAccumulator().clearEvents(this);
});

execQuietly("Failed to clear user rdf types cache in transaction " + id, () -> {
this.getUserTypesCache().dropSessionCache(id);
});

updateState(TransactionState.ROLLEDBACK);

releaseLocks();
Expand Down Expand Up @@ -302,6 +308,7 @@ private void doCommitShortLived() {
// short-lived txs do not write to tx tables and do not need to commit db indexes.
this.getPersistentSession().prepare();
this.getPersistentSession().commit();
this.getUserTypesCache().mergeSessionCache(id);
}

private void doCommitLongRunning() {
Expand All @@ -317,6 +324,7 @@ private void doCommitLongRunning() {
// db's connection timeout may need to be adjusted so that the connection is not closed while
// waiting for the OCFL changes to be committed.
this.getPersistentSession().commit();
this.getUserTypesCache().mergeSessionCache(id);
});
}

Expand Down Expand Up @@ -399,6 +407,10 @@ private ResourceLockManager getResourceLockManger() {
return this.txManager.getResourceLockManager();
}

private UserTypesCache getUserTypesCache() {
return this.txManager.getUserTypesCache();
}

@Override
public String toString() {
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.fcrepo.kernel.api.ContainmentIndex;
import org.fcrepo.kernel.api.Transaction;
import org.fcrepo.kernel.api.TransactionManager;
import org.fcrepo.kernel.api.cache.UserTypesCache;
import org.fcrepo.kernel.api.exception.TransactionClosedException;
import org.fcrepo.kernel.api.exception.TransactionNotFoundException;
import org.fcrepo.kernel.api.lock.ResourceLockManager;
Expand Down Expand Up @@ -85,6 +86,9 @@ public class TransactionManagerImpl implements TransactionManager {
@Inject
private DbTransactionExecutor dbTransactionExecutor;

@Inject
private UserTypesCache userTypesCache;

TransactionManagerImpl() {
transactions = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -188,6 +192,10 @@ protected ResourceLockManager getResourceLockManager() {
return resourceLockManager;
}

protected UserTypesCache getUserTypesCache() {
return userTypesCache;
}

public DbTransactionExecutor getDbTransactionExecutor() {
return dbTransactionExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to DuraSpace under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership.
*
* DuraSpace licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.impl.cache;

import static java.util.stream.Collectors.toList;
import static org.apache.jena.vocabulary.RDF.type;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.fcrepo.config.FedoraPropsConfig;
import org.fcrepo.kernel.api.RdfStream;
import org.fcrepo.kernel.api.ReadOnlyTransaction;
import org.fcrepo.kernel.api.cache.UserTypesCache;
import org.fcrepo.kernel.api.identifiers.FedoraId;

import org.apache.jena.graph.Triple;
import org.springframework.stereotype.Component;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

/**
* Default UserTypesCache implementation
*
* @author pwinckles
*/
@Component
public class UserTypesCacheImpl implements UserTypesCache {

private final Cache<FedoraId, List<URI>> globalCache;
private final Map<String, Cache<FedoraId, List<URI>>> sessionCaches;

public UserTypesCacheImpl(final FedoraPropsConfig config) {
this.globalCache = Caffeine.newBuilder()
.maximumSize(config.getUserTypesCacheSize())
.expireAfterAccess(config.getUserTypesCacheTimeout(), TimeUnit.MINUTES)
.build();
this.sessionCaches = new ConcurrentHashMap<>();
}

/**
* {@inheritDoc}
*/
@Override
public List<URI> getUserTypes(final FedoraId resourceId,
final String sessionId,
final Supplier<RdfStream> rdfProvider) {
if (isNotReadOnlySession(sessionId)) {
final var sessionCache = getSessionCache(sessionId);

return sessionCache.get(resourceId, k -> {
return globalCache.get(resourceId, k2 -> {
return extractRdfTypes(rdfProvider.get());
});
});
} else {
return globalCache.get(resourceId, k -> {
return extractRdfTypes(rdfProvider.get());
});
}
}

/**
* {@inheritDoc}
*/
@Override
public void cacheUserTypes(final FedoraId resourceId,
final RdfStream rdf,
final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
getSessionCache(sessionId).put(resourceId, extractRdfTypes(rdf));
}
}

/**
* {@inheritDoc}
*/
@Override
public void cacheUserTypes(final FedoraId resourceId,
final List<URI> userTypes,
final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
getSessionCache(sessionId).put(resourceId, userTypes);
}
}

/**
* {@inheritDoc}
*/
@Override
public void mergeSessionCache(final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
final var sessionCache = getSessionCache(sessionId);
globalCache.putAll(sessionCache.asMap());
dropSessionCache(sessionId);
}
}

/**
* {@inheritDoc}
*/
@Override
public void dropSessionCache(final String sessionId) {
if (isNotReadOnlySession(sessionId)) {
sessionCaches.remove(sessionId);
}
}

private Cache<FedoraId, List<URI>> getSessionCache(final String sessionId) {
return sessionCaches.computeIfAbsent(sessionId, k -> {
return Caffeine.newBuilder()
.maximumSize(1024)
.build();
});
}

private List<URI> extractRdfTypes(final RdfStream rdf) {
return rdf.filter(t -> t.predicateMatches(type.asNode()))
.map(Triple::getObject)
.map(t -> URI.create(t.toString()))
.collect(toList());
}

private boolean isNotReadOnlySession(final String sessionId) {
return !ReadOnlyTransaction.READ_ONLY_TX_ID.equals(sessionId);
}
}