Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-4962: invalidation support for FedX source selection cache + factory #4966

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Optional;

import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory;
import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class FedXConfig {

private String sourceSelectionCacheSpec = null;

private SourceSelectionCacheFactory sourceSelectionCacheFactory = null;

private TaskWrapper taskWrapper = null;

private String prefixDeclarations = null;
Expand Down Expand Up @@ -251,6 +254,18 @@ public FedXConfig withSourceSelectionCacheSpec(String cacheSpec) {
return this;
}

/**
* The {@link SourceSelectionCacheFactory} to be used. If not set explicitly, the default in memory implementation
* is used with the configued {@link #getSourceSelectionCacheSpec()}.
*
* @param factory the {@link SourceSelectionCacheFactory}
* @return the current config
*/
public FedXConfig withSourceSelectionCacheFactory(SourceSelectionCacheFactory factory) {
this.sourceSelectionCacheFactory = factory;
return this;
}

/**
* Sets a {@link TaskWrapper} which may be used for wrapping any background {@link Runnable}s. If no such wrapper is
* explicitly configured, the unmodified task is returned. See {@link TaskWrapper} for more information.
Expand Down Expand Up @@ -398,12 +413,24 @@ public String getPrefixDeclarations() {
* Returns the configured {@link CacheBuilderSpec} (if any) for the {@link SourceSelectionMemoryCache}. If not
* defined, the {@link SourceSelectionMemoryCache#DEFAULT_CACHE_SPEC} is used.
*
* If {@link #getSourceSelectionCacheFactory()} is configured, this setting is ignored.
*
* @return the {@link CacheBuilderSpec} or <code>null</code>
*/
public String getSourceSelectionCacheSpec() {
return this.sourceSelectionCacheSpec;
}

/**
* Returns the {@link SourceSelectionCacheFactory} (if any). If not defined, the {@link SourceSelectionCache} is
* instantiated using the default implementation and respects {@link #getSourceSelectionCacheSpec()}.
*
* @return {@link SourceSelectionCacheFactory}
*/
public SourceSelectionCacheFactory getSourceSelectionCacheFactory() {
return this.sourceSelectionCacheFactory;
}

/**
* The debug mode for query plan. If enabled, the query execution plan is printed to stdout
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,17 @@ public FederationEvalStrategy createStrategy(Dataset dataset) {
}

/**
* Create the {@link SourceSelectionCache}
* Create the {@link SourceSelectionCache}.
*
* @return the {@link SourceSelectionCache}
* @see FedXConfig#getSourceSelectionCacheSpec()
* @see FedXConfig#getSourceSelectionCacheFactory()
*/
private SourceSelectionCache createSourceSelectionCache() {
var factory = getConfig().getSourceSelectionCacheFactory();
if (factory != null) {
return factory.create();
}
String cacheSpec = getConfig().getSourceSelectionCacheSpec();
return new SourceSelectionMemoryCache(cacheSpec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ enum StatementSourceAssurance {
* @param hasStatements
*/
void updateInformation(SubQuery subQuery, Endpoint endpoint, boolean hasStatements);

/**
* Invalidate the underlying cache
*/
void invalidate();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.cache;

/**
* A factory for {@link SourceSelectionCache}.
*
* @author Andreas Schwarte
*/
public interface SourceSelectionCacheFactory {

/**
* Create the {@link SourceSelectionCache}
*
* @return
*/
SourceSelectionCache create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public void updateInformation(SubQuery subQuery, Endpoint endpoint, boolean hasS
updateInferredInformation(subQuery, endpoint, hasStatements);
}

@Override
public void invalidate() {
cache.invalidateAll(); // invalidate the entire cache
}

private void updateCacheEntry(SubQuery subQuery, Endpoint endpoint, boolean hasStatements) {
Entry entry;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ public void testCache_Integration() throws Exception {
// source selection is cached, only from fetching data
Assertions.assertEquals(1, requestsForEndpoint(endpoints.get(0)));
Assertions.assertEquals(0, requestsForEndpoint(endpoints.get(1)));

// invalidate source selection cache
federationContext().getSourceSelectionCache().invalidate();

// re-run requests
monitoring().resetMonitoringInformation();
try (TupleQueryResult tqr = federationContext().getQueryManager().prepareTupleQuery(query).evaluate()) {
Assertions.assertEquals(2, Iterations.asList(tqr).size());
}

// source selection is non longer cached,
Assertions.assertEquals(2, requestsForEndpoint(endpoints.get(0)));
Assertions.assertEquals(1, requestsForEndpoint(endpoints.get(1)));
}

@Test
Expand Down