diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java index 4bf0addae3..0d4c413e3f 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/AbstractSchemaResolver.java @@ -99,6 +99,7 @@ public void configure(Map configs, SchemaParser schemaParser) { schemaCache.configureLifetime(config.getCheckPeriod()); schemaCache.configureRetryBackoff(config.getRetryBackoff()); schemaCache.configureRetryCount(config.getRetryCount()); + schemaCache.configureFaultTolerantRefresh(config.getFaultTolerantRefresh()); schemaCache.configureGlobalIdKeyExtractor(SchemaLookupResult::getGlobalId); schemaCache.configureContentKeyExtractor(schema -> Optional.ofNullable(schema.getParsedSchema().getRawSchema()).map(IoUtil::toString).orElse(null)); diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java index 9596a2e7d2..accc4a9f20 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/ERCache.java @@ -18,6 +18,8 @@ import io.apicurio.registry.resolver.strategy.ArtifactCoordinates; import io.apicurio.registry.rest.client.exception.RateLimitedClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; @@ -36,6 +38,7 @@ */ public class ERCache { + private final static Logger log = LoggerFactory.getLogger(ERCache.class); /** Global ID index */ private final Map> index1 = new ConcurrentHashMap<>(); /** Data content index */ @@ -56,6 +59,7 @@ public class ERCache { private Duration lifetime = Duration.ZERO; private Duration backoff = Duration.ofMillis(200); private long retries; + private boolean faultTolerantRefresh; // === Configuration @@ -71,6 +75,16 @@ public void configureRetryCount(long retries) { this.retries = retries; } + /** + * If set to {@code true}, will log the load error instead of throwing it when an exception occurs trying to refresh + * a cache entry. This will still honor retries before enacting this behavior. + * + * @param faultTolerantRefresh Whether to enable fault tolerant refresh behavior. + */ + public void configureFaultTolerantRefresh(boolean faultTolerantRefresh) { + this.faultTolerantRefresh = faultTolerantRefresh; + } + public void configureGlobalIdKeyExtractor(Function keyExtractor) { this.keyExtractor1 = keyExtractor; } @@ -91,6 +105,16 @@ public void configureContentHashKeyExtractor(Function keyExtractor) { this.keyExtractor5 = keyExtractor; } + /** + * Return whether fault tolerant refresh is enabled. + * + * @return {@code true} if it's enabled. + * @see #configureFaultTolerantRefresh(boolean) + */ + public boolean isFaultTolerantRefresh() { + return this.faultTolerantRefresh; + } + public void checkInitialized() { boolean initialized = keyExtractor1 != null && keyExtractor2 != null && keyExtractor3 != null && keyExtractor4 != null && keyExtractor5 != null; @@ -161,6 +185,11 @@ private V getValue(WrappedValue value, T key, Function loaderFuncti // Return result = newValue.ok; } else { + if (faultTolerantRefresh && value != null) { + log.warn("Error updating cache value. Fault tolerant load using expired value", newValue.error); + return value.value; + } + log.error("Failed to update cache value for key: " + key, newValue.error); throw newValue.error; } } diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java index 46489fcbc9..85fd39160c 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java @@ -63,6 +63,15 @@ public class SchemaResolverConfig { public static final String FIND_LATEST_ARTIFACT = "apicurio.registry.find-latest"; public static final boolean FIND_LATEST_ARTIFACT_DEFAULT = false; + /** + * If {@code true}, will log exceptions instead of throwing them when an error occurs trying to refresh a schema + * in the cache. This is useful for production situations where a stale schema is better than completely failing + * schema resolution. Note that this will not impact trying of retries, as retries are attempted before this flag + * is considered. + */ + public static final String FAULT_TOLERANT_REFRESH = "apicurio.registry.fault-tolerant-refresh"; + public static final boolean FAULT_TOLERANT_REFRESH_DEFAULT = false; + /** * Only applicable for serializers * Optional, set explicitly the groupId used for querying/creating an artifact. diff --git a/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java b/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java index 2c6c5a9ecb..5f977337c5 100644 --- a/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java +++ b/schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java @@ -35,6 +35,7 @@ public class DefaultSchemaResolverConfig { entry(ARTIFACT_RESOLVER_STRATEGY, ARTIFACT_RESOLVER_STRATEGY_DEFAULT), entry(AUTO_REGISTER_ARTIFACT, AUTO_REGISTER_ARTIFACT_DEFAULT), entry(AUTO_REGISTER_ARTIFACT_IF_EXISTS, AUTO_REGISTER_ARTIFACT_IF_EXISTS_DEFAULT), + entry(FAULT_TOLERANT_REFRESH, FAULT_TOLERANT_REFRESH_DEFAULT), entry(FIND_LATEST_ARTIFACT, FIND_LATEST_ARTIFACT_DEFAULT), entry(CHECK_PERIOD_MS, CHECK_PERIOD_MS_DEFAULT), entry(RETRY_COUNT, RETRY_COUNT_DEFAULT), @@ -97,6 +98,10 @@ public String autoRegisterArtifactIfExists() { return getStringOneOf(AUTO_REGISTER_ARTIFACT_IF_EXISTS, "FAIL", "UPDATE", "RETURN", "RETURN_OR_UPDATE"); } + public boolean getFaultTolerantRefresh() { + return getBoolean(FAULT_TOLERANT_REFRESH); + } + public boolean findLatest() { // Should be non-null, a default value is defined return getBoolean(FIND_LATEST_ARTIFACT); diff --git a/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java b/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java index 039aac25c6..f7f392aebb 100644 --- a/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java +++ b/schema-resolver/src/test/java/io/apicurio/registry/resolver/AbstractSchemaResolverTest.java @@ -15,9 +15,8 @@ */ package io.apicurio.registry.resolver; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; - import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.Test; @@ -25,6 +24,8 @@ import io.apicurio.registry.resolver.data.Record; import io.apicurio.registry.resolver.strategy.ArtifactReference; +import static org.junit.jupiter.api.Assertions.*; + public class AbstractSchemaResolverTest { @Test void testConfigureInitializesSchemaCache() throws Exception { @@ -37,6 +38,31 @@ void testConfigureInitializesSchemaCache() throws Exception { } } + @Test + void testSupportsFailureTolerantSchemaCache() throws Exception { + Map configs = new HashMap<>(); + configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost"); + configs.put(SchemaResolverConfig.FAULT_TOLERANT_REFRESH, true); + + try (TestAbstractSchemaResolver resolver = new TestAbstractSchemaResolver<>()) { + resolver.configure(configs, null); + + assertTrue(resolver.schemaCache.isFaultTolerantRefresh()); + } + } + + @Test + void testDefaultsToFailureTolerantSchemaCacheDisabled() throws Exception { + Map configs = new HashMap<>(); + configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost"); + + try (TestAbstractSchemaResolver resolver = new TestAbstractSchemaResolver<>()) { + resolver.configure(configs, null); + + assertFalse(resolver.schemaCache.isFaultTolerantRefresh()); + } + } + class TestAbstractSchemaResolver extends AbstractSchemaResolver { @Override diff --git a/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java b/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java index dee94c0204..a4883e3fe5 100644 --- a/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java +++ b/schema-resolver/src/test/java/io/apicurio/registry/resolver/ERCacheTest.java @@ -106,6 +106,34 @@ void testClearEmptiesContentHashIndex() { assertFalse(cache.containsByContentHash(contentHashKey)); } + @Test + void testThrowsLoadExceptionsByDefault() { + String contentHashKey = "another key"; + ERCache cache = newCache(contentHashKey); + Function staticValueLoader = (key) -> {throw new IllegalStateException("load failure");}; + + assertThrows(IllegalStateException.class, () -> {cache.getByContentHash(contentHashKey, staticValueLoader);}); + } + + @Test + void testHoldsLoadExceptionsWhenFaultTolerantRefreshEnabled() { + String contentHashKey = "another key"; + ERCache cache = newCache(contentHashKey); + cache.configureLifetime(Duration.ZERO); + cache.configureFaultTolerantRefresh(true); + + // Seed a value + Function workingLoader = (key) -> {return "some value";}; + String originalLoadValue = cache.getByContentHash(contentHashKey, workingLoader); + + // Refresh with a failing loader + Function failingLoader = (key) -> {throw new IllegalStateException("load failure");}; + String failingLoadValue = cache.getByContentHash(contentHashKey, failingLoader); + + assertEquals("some value", originalLoadValue); + assertEquals("some value", failingLoadValue); + } + private ERCache newCache(String contentHashKey) { ERCache cache = new ERCache<>(); cache.configureLifetime(Duration.ofDays(30));