diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java index d25392daa2f..251cfa4cb8d 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java @@ -13,6 +13,7 @@ import java.util.Optional; import org.eclipse.rdf4j.federated.cache.SourceSelectionCache; +import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory; import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache; import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper; @@ -57,6 +58,8 @@ public class FedXConfig { private String sourceSelectionCacheSpec = null; + private SourceSelectionCacheFactory sourceSelectionCacheFactory = null; + private TaskWrapper taskWrapper = null; private String prefixDeclarations = null; @@ -251,6 +254,18 @@ public FedXConfig withSourceSelectionCacheSpec(String cacheSpec) { return this; } + /** + * The {@link SourceSelectionCacheFactory} to be used. If not set explicitly, the default in memory implementation + * is used with the configued {@link #getSourceSelectionCacheSpec()}. + * + * @param factory the {@link SourceSelectionCacheFactory} + * @return the current config + */ + public FedXConfig withSourceSelectionCacheFactory(SourceSelectionCacheFactory factory) { + this.sourceSelectionCacheFactory = factory; + return this; + } + /** * Sets a {@link TaskWrapper} which may be used for wrapping any background {@link Runnable}s. If no such wrapper is * explicitly configured, the unmodified task is returned. See {@link TaskWrapper} for more information. @@ -398,12 +413,24 @@ public String getPrefixDeclarations() { * Returns the configured {@link CacheBuilderSpec} (if any) for the {@link SourceSelectionMemoryCache}. If not * defined, the {@link SourceSelectionMemoryCache#DEFAULT_CACHE_SPEC} is used. * + * If {@link #getSourceSelectionCacheFactory()} is configured, this setting is ignored. + * * @return the {@link CacheBuilderSpec} or null */ public String getSourceSelectionCacheSpec() { return this.sourceSelectionCacheSpec; } + /** + * Returns the {@link SourceSelectionCacheFactory} (if any). If not defined, the {@link SourceSelectionCache} is + * instantiated using the default implementation and respects {@link #getSourceSelectionCacheSpec()}. + * + * @return {@link SourceSelectionCacheFactory} + */ + public SourceSelectionCacheFactory getSourceSelectionCacheFactory() { + return this.sourceSelectionCacheFactory; + } + /** * The debug mode for query plan. If enabled, the query execution plan is printed to stdout * diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationContext.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationContext.java index a29cadc3029..025d0e0fa37 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationContext.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationContext.java @@ -97,12 +97,17 @@ public FederationEvalStrategy createStrategy(Dataset dataset) { } /** - * Create the {@link SourceSelectionCache} + * Create the {@link SourceSelectionCache}. * * @return the {@link SourceSelectionCache} * @see FedXConfig#getSourceSelectionCacheSpec() + * @see FedXConfig#getSourceSelectionCacheFactory() */ private SourceSelectionCache createSourceSelectionCache() { + var factory = getConfig().getSourceSelectionCacheFactory(); + if (factory != null) { + return factory.create(); + } String cacheSpec = getConfig().getSourceSelectionCacheSpec(); return new SourceSelectionMemoryCache(cacheSpec); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCache.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCache.java index 75dcaf0de73..91c740860b5 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCache.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCache.java @@ -66,4 +66,10 @@ enum StatementSourceAssurance { * @param hasStatements */ void updateInformation(SubQuery subQuery, Endpoint endpoint, boolean hasStatements); + + /** + * Invalidate the underlying cache + */ + void invalidate(); + } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCacheFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCacheFactory.java new file mode 100644 index 00000000000..b9ddedae922 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCacheFactory.java @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.cache; + +/** + * A factory for {@link SourceSelectionCache}. + * + * @author Andreas Schwarte + */ +public interface SourceSelectionCacheFactory { + + /** + * Create the {@link SourceSelectionCache} + * + * @return + */ + SourceSelectionCache create(); +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCache.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCache.java index b46b446c38b..9457c6f9273 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCache.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCache.java @@ -83,6 +83,11 @@ public void updateInformation(SubQuery subQuery, Endpoint endpoint, boolean hasS updateInferredInformation(subQuery, endpoint, hasStatements); } + @Override + public void invalidate() { + cache.invalidateAll(); // invalidate the entire cache + } + private void updateCacheEntry(SubQuery subQuery, Endpoint endpoint, boolean hasStatements) { Entry entry; try { diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCacheTest.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCacheTest.java index 77cb703bda8..62d373da0e5 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCacheTest.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCacheTest.java @@ -148,6 +148,19 @@ public void testCache_Integration() throws Exception { // source selection is cached, only from fetching data Assertions.assertEquals(1, requestsForEndpoint(endpoints.get(0))); Assertions.assertEquals(0, requestsForEndpoint(endpoints.get(1))); + + // invalidate source selection cache + federationContext().getSourceSelectionCache().invalidate(); + + // re-run requests + monitoring().resetMonitoringInformation(); + try (TupleQueryResult tqr = federationContext().getQueryManager().prepareTupleQuery(query).evaluate()) { + Assertions.assertEquals(2, Iterations.asList(tqr).size()); + } + + // source selection is non longer cached, + Assertions.assertEquals(2, requestsForEndpoint(endpoints.get(0))); + Assertions.assertEquals(1, requestsForEndpoint(endpoints.get(1))); } @Test