From ce5dfda12a6e554d0c2934e025e03e17f496867f Mon Sep 17 00:00:00 2001 From: Devon Berry Date: Mon, 16 Oct 2023 12:58:04 -0400 Subject: [PATCH] fix(schema-resolver): caching of latest artifacts #3834 Caching of artifacts with no version (e.g. latest) did not work because reindex would use the new key that has the artifact version that was found in the lookup. This code changes the default behavior to index both the artifact with its version and the latest/null version. It also exposes a configuration property (`apicurio.registry.cache-latest`) where this behavior can be disabled for use cases where caching of latest is not desired. See #3824 for details. --- .../resolver/AbstractSchemaResolver.java | 1 + .../apicurio/registry/resolver/ERCache.java | 34 +++++++++++-- .../resolver/SchemaResolverConfig.java | 7 +++ .../config/DefaultSchemaResolverConfig.java | 5 ++ .../resolver/AbstractSchemaResolverTest.java | 12 +++++ .../registry/resolver/ERCacheTest.java | 51 +++++++++++++++++++ 6 files changed, 107 insertions(+), 3 deletions(-) diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java index 0d4c413e3f..d9c9fa4e41 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java @@ -99,6 +99,7 @@ public void configure(Map configs, SchemaParser schemaParser) { schemaCache.configureLifetime(config.getCheckPeriod()); schemaCache.configureRetryBackoff(config.getRetryBackoff()); schemaCache.configureRetryCount(config.getRetryCount()); + schemaCache.configureCacheLatest(config.getCacheLatest()); schemaCache.configureFaultTolerantRefresh(config.getFaultTolerantRefresh()); schemaCache.configureGlobalIdKeyExtractor(SchemaLookupResult::getGlobalId); diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java index accc4a9f20..c6a1e13db0 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java @@ -59,6 +59,7 @@ public class ERCache { private Duration lifetime = Duration.ZERO; private Duration backoff = Duration.ofMillis(200); private long retries; + private boolean cacheLatest; private boolean faultTolerantRefresh; // === Configuration @@ -75,6 +76,16 @@ public void configureRetryCount(long retries) { this.retries = retries; } + /** + * If {@code true}, will cache schema lookups that either have `latest` or no version specified. Setting this to false + * will effectively disable caching for schema lookups that do not specify a version. + * + * @param cacheLatest Whether to enable cache of artifacts without a version specified. + */ + public void configureCacheLatest(boolean cacheLatest) { + this.cacheLatest = cacheLatest; + } + /** * If set to {@code true}, will log the load error instead of throwing it when an exception occurs trying to refresh * a cache entry. This will still honor retries before enacting this behavior. @@ -105,6 +116,16 @@ public void configureContentHashKeyExtractor(Function keyExtractor) { this.keyExtractor5 = keyExtractor; } + /** + * Return whether caching of artifact lookups with {@code null} versions is enabled. + * + * @return {@code true} if it's enabled. + * @see #configureCacheLatest(boolean) + */ + public boolean isCacheLatest() { + return this.cacheLatest; + } + /** * Return whether fault tolerant refresh is enabled. * @@ -181,7 +202,7 @@ private V getValue(WrappedValue value, T key, Function loaderFuncti }); if (newValue.isOk()) { // Index - reindex(new WrappedValue<>(lifetime, Instant.now(), newValue.ok)); + reindex(new WrappedValue<>(lifetime, Instant.now(), newValue.ok), key); // Return result = newValue.ok; } else { @@ -197,11 +218,18 @@ private V getValue(WrappedValue value, T key, Function loaderFuncti return result; } - private void reindex(WrappedValue newValue) { + private void reindex(WrappedValue newValue, T lookupKey) { Optional.ofNullable(keyExtractor1.apply(newValue.value)).ifPresent(k -> index1.put(k, newValue)); Optional.ofNullable(keyExtractor2.apply(newValue.value)).ifPresent(k -> index2.put(k, newValue)); Optional.ofNullable(keyExtractor3.apply(newValue.value)).ifPresent(k -> index3.put(k, newValue)); - Optional.ofNullable(keyExtractor4.apply(newValue.value)).ifPresent(k -> index4.put(k, newValue)); + Optional.ofNullable(keyExtractor4.apply(newValue.value)).ifPresent(k -> { + index4.put(k, newValue); + // By storing the lookup key, we ensure that a null/latest lookup gets cached, as the key extractor will + // automatically add the version to the new key + if (this.cacheLatest && k.getClass().equals(lookupKey.getClass())) { + index4.put((ArtifactCoordinates) lookupKey, newValue); + } + }); Optional.ofNullable(keyExtractor5.apply(newValue.value)).ifPresent(k -> index5.put(k, newValue)); } diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java index 85fd39160c..eae1e76f83 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java @@ -63,6 +63,13 @@ public class SchemaResolverConfig { public static final String FIND_LATEST_ARTIFACT = "apicurio.registry.find-latest"; public static final boolean FIND_LATEST_ARTIFACT_DEFAULT = false; + /** + * If {@code true}, will cache schema lookups that either have `latest` or no version specified. Setting this to false + * will effectively disable caching for schema lookups that do not specify a version. + */ + public static final String CACHE_LATEST = "apicurio.registry.cache-latest"; + public static final boolean CACHE_LATEST_DEFAULT = true; + /** * If {@code true}, will log exceptions instead of throwing them when an error occurs trying to refresh a schema * in the cache. This is useful for production situations where a stale schema is better than completely failing diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java index 5f977337c5..66f3ee5ba3 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java @@ -35,6 +35,7 @@ public class DefaultSchemaResolverConfig { entry(ARTIFACT_RESOLVER_STRATEGY, ARTIFACT_RESOLVER_STRATEGY_DEFAULT), entry(AUTO_REGISTER_ARTIFACT, AUTO_REGISTER_ARTIFACT_DEFAULT), entry(AUTO_REGISTER_ARTIFACT_IF_EXISTS, AUTO_REGISTER_ARTIFACT_IF_EXISTS_DEFAULT), + entry(CACHE_LATEST, CACHE_LATEST_DEFAULT), entry(FAULT_TOLERANT_REFRESH, FAULT_TOLERANT_REFRESH_DEFAULT), entry(FIND_LATEST_ARTIFACT, FIND_LATEST_ARTIFACT_DEFAULT), entry(CHECK_PERIOD_MS, CHECK_PERIOD_MS_DEFAULT), @@ -98,6 +99,10 @@ public String autoRegisterArtifactIfExists() { return getStringOneOf(AUTO_REGISTER_ARTIFACT_IF_EXISTS, "FAIL", "UPDATE", "RETURN", "RETURN_OR_UPDATE"); } + public boolean getCacheLatest() { + return getBoolean(CACHE_LATEST); + } + public boolean getFaultTolerantRefresh() { return getBoolean(FAULT_TOLERANT_REFRESH); } diff --git a/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java b/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java index f7f392aebb..7e00eff0d0 100644 --- a/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java +++ b/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java @@ -63,6 +63,18 @@ void testDefaultsToFailureTolerantSchemaCacheDisabled() throws Exception { } } + @Test + void testDefaultsToCacheLatestEnabled() throws Exception { + Map configs = new HashMap<>(); + configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost"); + + try (TestAbstractSchemaResolver resolver = new TestAbstractSchemaResolver<>()) { + resolver.configure(configs, null); + + assertTrue(resolver.schemaCache.isCacheLatest()); + } + } + class TestAbstractSchemaResolver extends AbstractSchemaResolver { @Override diff --git a/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java b/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java index a4883e3fe5..a1754960fa 100644 --- a/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java +++ b/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.junit.jupiter.api.Test; @@ -134,6 +135,56 @@ void testHoldsLoadExceptionsWhenFaultTolerantRefreshEnabled() { assertEquals("some value", failingLoadValue); } + @Test + void testCanCacheLatestWhenEnabled() { + ERCache cache = newCache("some key"); + cache.configureLifetime(Duration.ofMinutes(10)); + cache.configureCacheLatest(true); + + ArtifactCoordinates latestKey = new ArtifactCoordinates.ArtifactCoordinatesBuilder() + .artifactId("someArtifactId") + .groupId("someGroupId") + .build(); + final AtomicInteger loadCount = new AtomicInteger(0); + Function countingLoader = (key) -> { + loadCount.incrementAndGet(); + return "some value"; + }; + + // Seed a value + String firstLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader); + // Try the same lookup + String secondLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader); + + assertEquals(firstLookupValue, secondLookupValue); + assertEquals(1, loadCount.get()); + } + + @Test + void doesNotCacheLatestWhenDisabled() { + ERCache cache = newCache("some key"); + cache.configureLifetime(Duration.ofMinutes(10)); + cache.configureCacheLatest(false); + + ArtifactCoordinates latestKey = new ArtifactCoordinates.ArtifactCoordinatesBuilder() + .artifactId("someArtifactId") + .groupId("someGroupId") + .build(); + final AtomicInteger loadCount = new AtomicInteger(0); + Function countingLoader = (key) -> { + loadCount.incrementAndGet(); + return "some value"; + }; + + // Seed a value + String firstLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader); + // Try the same lookup + String secondLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader); + + assertEquals(firstLookupValue, secondLookupValue); + assertEquals(2, loadCount.get()); + } + private ERCache newCache(String contentHashKey) { ERCache cache = new ERCache<>(); cache.configureLifetime(Duration.ofDays(30));