Skip to content

Commit

Permalink
fix(schema-resolver): caching of latest artifacts Apicurio#3834
Browse files Browse the repository at this point in the history
Caching of artifacts with no version (e.g. latest) did not work because reindex would use the new key that has the artifact version that was found in the lookup.  This code changes the default behavior to index both the artifact with its version and the latest/null version.  It also exposes a configuration property (`apicurio.registry.cache-latest`) where this behavior can be disabled for use cases where caching of latest is not desired.  See Apicurio#3824 for details.
  • Loading branch information
Devon Berry committed Oct 16, 2023
1 parent 274db0f commit ce5dfda
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void configure(Map<String, ?> configs, SchemaParser<S, T> schemaParser) {
schemaCache.configureLifetime(config.getCheckPeriod());
schemaCache.configureRetryBackoff(config.getRetryBackoff());
schemaCache.configureRetryCount(config.getRetryCount());
schemaCache.configureCacheLatest(config.getCacheLatest());
schemaCache.configureFaultTolerantRefresh(config.getFaultTolerantRefresh());

schemaCache.configureGlobalIdKeyExtractor(SchemaLookupResult::getGlobalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class ERCache<V> {
private Duration lifetime = Duration.ZERO;
private Duration backoff = Duration.ofMillis(200);
private long retries;
private boolean cacheLatest;
private boolean faultTolerantRefresh;

// === Configuration
Expand All @@ -75,6 +76,16 @@ public void configureRetryCount(long retries) {
this.retries = retries;
}

/**
* If {@code true}, will cache schema lookups that either have `latest` or no version specified. Setting this to false
* will effectively disable caching for schema lookups that do not specify a version.
*
* @param cacheLatest Whether to enable cache of artifacts without a version specified.
*/
public void configureCacheLatest(boolean cacheLatest) {
this.cacheLatest = cacheLatest;
}

/**
* If set to {@code true}, will log the load error instead of throwing it when an exception occurs trying to refresh
* a cache entry. This will still honor retries before enacting this behavior.
Expand Down Expand Up @@ -105,6 +116,16 @@ public void configureContentHashKeyExtractor(Function<V, String> keyExtractor) {
this.keyExtractor5 = keyExtractor;
}

/**
* Return whether caching of artifact lookups with {@code null} versions is enabled.
*
* @return {@code true} if it's enabled.
* @see #configureCacheLatest(boolean)
*/
public boolean isCacheLatest() {
return this.cacheLatest;
}

/**
* Return whether fault tolerant refresh is enabled.
*
Expand Down Expand Up @@ -181,7 +202,7 @@ private <T> V getValue(WrappedValue<V> value, T key, Function<T, V> loaderFuncti
});
if (newValue.isOk()) {
// Index
reindex(new WrappedValue<>(lifetime, Instant.now(), newValue.ok));
reindex(new WrappedValue<>(lifetime, Instant.now(), newValue.ok), key);
// Return
result = newValue.ok;
} else {
Expand All @@ -197,11 +218,18 @@ private <T> V getValue(WrappedValue<V> value, T key, Function<T, V> loaderFuncti
return result;
}

private void reindex(WrappedValue<V> newValue) {
private <T> void reindex(WrappedValue<V> newValue, T lookupKey) {
Optional.ofNullable(keyExtractor1.apply(newValue.value)).ifPresent(k -> index1.put(k, newValue));
Optional.ofNullable(keyExtractor2.apply(newValue.value)).ifPresent(k -> index2.put(k, newValue));
Optional.ofNullable(keyExtractor3.apply(newValue.value)).ifPresent(k -> index3.put(k, newValue));
Optional.ofNullable(keyExtractor4.apply(newValue.value)).ifPresent(k -> index4.put(k, newValue));
Optional.ofNullable(keyExtractor4.apply(newValue.value)).ifPresent(k -> {
index4.put(k, newValue);
// By storing the lookup key, we ensure that a null/latest lookup gets cached, as the key extractor will
// automatically add the version to the new key
if (this.cacheLatest && k.getClass().equals(lookupKey.getClass())) {
index4.put((ArtifactCoordinates) lookupKey, newValue);
}
});
Optional.ofNullable(keyExtractor5.apply(newValue.value)).ifPresent(k -> index5.put(k, newValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public class SchemaResolverConfig {
public static final String FIND_LATEST_ARTIFACT = "apicurio.registry.find-latest";
public static final boolean FIND_LATEST_ARTIFACT_DEFAULT = false;

/**
* If {@code true}, will cache schema lookups that either have `latest` or no version specified. Setting this to false
* will effectively disable caching for schema lookups that do not specify a version.
*/
public static final String CACHE_LATEST = "apicurio.registry.cache-latest";
public static final boolean CACHE_LATEST_DEFAULT = true;

/**
* If {@code true}, will log exceptions instead of throwing them when an error occurs trying to refresh a schema
* in the cache. This is useful for production situations where a stale schema is better than completely failing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DefaultSchemaResolverConfig {
entry(ARTIFACT_RESOLVER_STRATEGY, ARTIFACT_RESOLVER_STRATEGY_DEFAULT),
entry(AUTO_REGISTER_ARTIFACT, AUTO_REGISTER_ARTIFACT_DEFAULT),
entry(AUTO_REGISTER_ARTIFACT_IF_EXISTS, AUTO_REGISTER_ARTIFACT_IF_EXISTS_DEFAULT),
entry(CACHE_LATEST, CACHE_LATEST_DEFAULT),
entry(FAULT_TOLERANT_REFRESH, FAULT_TOLERANT_REFRESH_DEFAULT),
entry(FIND_LATEST_ARTIFACT, FIND_LATEST_ARTIFACT_DEFAULT),
entry(CHECK_PERIOD_MS, CHECK_PERIOD_MS_DEFAULT),
Expand Down Expand Up @@ -98,6 +99,10 @@ public String autoRegisterArtifactIfExists() {
return getStringOneOf(AUTO_REGISTER_ARTIFACT_IF_EXISTS, "FAIL", "UPDATE", "RETURN", "RETURN_OR_UPDATE");
}

public boolean getCacheLatest() {
return getBoolean(CACHE_LATEST);
}

public boolean getFaultTolerantRefresh() {
return getBoolean(FAULT_TOLERANT_REFRESH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ void testDefaultsToFailureTolerantSchemaCacheDisabled() throws Exception {
}
}

@Test
void testDefaultsToCacheLatestEnabled() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost");

try (TestAbstractSchemaResolver<Object, Object> resolver = new TestAbstractSchemaResolver<>()) {
resolver.configure(configs, null);

assertTrue(resolver.schemaCache.isCacheLatest());
}
}

class TestAbstractSchemaResolver<SCHEMA, DATA> extends AbstractSchemaResolver<SCHEMA, DATA> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -134,6 +135,56 @@ void testHoldsLoadExceptionsWhenFaultTolerantRefreshEnabled() {
assertEquals("some value", failingLoadValue);
}

@Test
void testCanCacheLatestWhenEnabled() {
ERCache<String> cache = newCache("some key");
cache.configureLifetime(Duration.ofMinutes(10));
cache.configureCacheLatest(true);

ArtifactCoordinates latestKey = new ArtifactCoordinates.ArtifactCoordinatesBuilder()
.artifactId("someArtifactId")
.groupId("someGroupId")
.build();
final AtomicInteger loadCount = new AtomicInteger(0);
Function<ArtifactCoordinates, String> countingLoader = (key) -> {
loadCount.incrementAndGet();
return "some value";
};

// Seed a value
String firstLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);
// Try the same lookup
String secondLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);

assertEquals(firstLookupValue, secondLookupValue);
assertEquals(1, loadCount.get());
}

@Test
void doesNotCacheLatestWhenDisabled() {
ERCache<String> cache = newCache("some key");
cache.configureLifetime(Duration.ofMinutes(10));
cache.configureCacheLatest(false);

ArtifactCoordinates latestKey = new ArtifactCoordinates.ArtifactCoordinatesBuilder()
.artifactId("someArtifactId")
.groupId("someGroupId")
.build();
final AtomicInteger loadCount = new AtomicInteger(0);
Function<ArtifactCoordinates, String> countingLoader = (key) -> {
loadCount.incrementAndGet();
return "some value";
};

// Seed a value
String firstLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);
// Try the same lookup
String secondLookupValue = cache.getByArtifactCoordinates(latestKey, countingLoader);

assertEquals(firstLookupValue, secondLookupValue);
assertEquals(2, loadCount.get());
}

private ERCache<String> newCache(String contentHashKey) {
ERCache<String> cache = new ERCache<>();
cache.configureLifetime(Duration.ofDays(30));
Expand Down

0 comments on commit ce5dfda

Please sign in to comment.