Skip to content

Commit

Permalink
feat(schema-cache): ERCache.configureFaultTolerantRefresh Apicurio#3807
Browse files Browse the repository at this point in the history
… (Apicurio#3823)

Adds the notion of a fault tolerance in refresh for production environments where it's better to use a stale cache value than die when a cache entry refresh fails.  See issue Apicurio#3807 for details.

Co-authored-by: Devon Berry <devon.berry@riotgames.com>
  • Loading branch information
gr8routdoors and Devon Berry committed Oct 17, 2023
1 parent 7df609f commit 0ac844a
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void configure(Map<String, ?> configs, SchemaParser<S, T> schemaParser) {
schemaCache.configureLifetime(config.getCheckPeriod());
schemaCache.configureRetryBackoff(config.getRetryBackoff());
schemaCache.configureRetryCount(config.getRetryCount());
schemaCache.configureFaultTolerantRefresh(config.getFaultTolerantRefresh());

schemaCache.configureGlobalIdKeyExtractor(SchemaLookupResult::getGlobalId);
schemaCache.configureContentKeyExtractor(schema -> Optional.ofNullable(schema.getParsedSchema().getRawSchema()).map(IoUtil::toString).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.apicurio.registry.resolver.strategy.ArtifactCoordinates;
import io.apicurio.registry.rest.client.exception.RateLimitedClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -36,6 +38,7 @@
*/
public class ERCache<V> {

private final static Logger log = LoggerFactory.getLogger(ERCache.class);
/** Global ID index */
private final Map<Long, WrappedValue<V>> index1 = new ConcurrentHashMap<>();
/** Data content index */
Expand All @@ -56,6 +59,7 @@ public class ERCache<V> {
private Duration lifetime = Duration.ZERO;
private Duration backoff = Duration.ofMillis(200);
private long retries;
private boolean faultTolerantRefresh;

// === Configuration

Expand All @@ -71,6 +75,16 @@ public void configureRetryCount(long retries) {
this.retries = retries;
}

/**
* If set to {@code true}, will log the load error instead of throwing it when an exception occurs trying to refresh
* a cache entry. This will still honor retries before enacting this behavior.
*
* @param faultTolerantRefresh Whether to enable fault tolerant refresh behavior.
*/
public void configureFaultTolerantRefresh(boolean faultTolerantRefresh) {
this.faultTolerantRefresh = faultTolerantRefresh;
}

public void configureGlobalIdKeyExtractor(Function<V, Long> keyExtractor) {
this.keyExtractor1 = keyExtractor;
}
Expand All @@ -91,6 +105,16 @@ public void configureContentHashKeyExtractor(Function<V, String> keyExtractor) {
this.keyExtractor5 = keyExtractor;
}

/**
* Return whether fault tolerant refresh is enabled.
*
* @return {@code true} if it's enabled.
* @see #configureFaultTolerantRefresh(boolean)
*/
public boolean isFaultTolerantRefresh() {
return this.faultTolerantRefresh;
}

public void checkInitialized() {
boolean initialized = keyExtractor1 != null && keyExtractor2 != null &&
keyExtractor3 != null && keyExtractor4 != null && keyExtractor5 != null;
Expand Down Expand Up @@ -161,6 +185,11 @@ private <T> V getValue(WrappedValue<V> value, T key, Function<T, V> loaderFuncti
// Return
result = newValue.ok;
} else {
if (faultTolerantRefresh && value != null) {
log.warn("Error updating cache value. Fault tolerant load using expired value", newValue.error);
return value.value;
}
log.error("Failed to update cache value for key: " + key, newValue.error);
throw newValue.error;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ public class SchemaResolverConfig {
public static final String FIND_LATEST_ARTIFACT = "apicurio.registry.find-latest";
public static final boolean FIND_LATEST_ARTIFACT_DEFAULT = false;

/**
* If {@code true}, will log exceptions instead of throwing them when an error occurs trying to refresh a schema
* in the cache. This is useful for production situations where a stale schema is better than completely failing
* schema resolution. Note that this will not impact trying of retries, as retries are attempted before this flag
* is considered.
*/
public static final String FAULT_TOLERANT_REFRESH = "apicurio.registry.fault-tolerant-refresh";
public static final boolean FAULT_TOLERANT_REFRESH_DEFAULT = false;

/**
* Only applicable for serializers
* Optional, set explicitly the groupId used for querying/creating an artifact.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DefaultSchemaResolverConfig {
entry(ARTIFACT_RESOLVER_STRATEGY, ARTIFACT_RESOLVER_STRATEGY_DEFAULT),
entry(AUTO_REGISTER_ARTIFACT, AUTO_REGISTER_ARTIFACT_DEFAULT),
entry(AUTO_REGISTER_ARTIFACT_IF_EXISTS, AUTO_REGISTER_ARTIFACT_IF_EXISTS_DEFAULT),
entry(FAULT_TOLERANT_REFRESH, FAULT_TOLERANT_REFRESH_DEFAULT),
entry(FIND_LATEST_ARTIFACT, FIND_LATEST_ARTIFACT_DEFAULT),
entry(CHECK_PERIOD_MS, CHECK_PERIOD_MS_DEFAULT),
entry(RETRY_COUNT, RETRY_COUNT_DEFAULT),
Expand Down Expand Up @@ -97,6 +98,10 @@ public String autoRegisterArtifactIfExists() {
return getStringOneOf(AUTO_REGISTER_ARTIFACT_IF_EXISTS, "FAIL", "UPDATE", "RETURN", "RETURN_OR_UPDATE");
}

public boolean getFaultTolerantRefresh() {
return getBoolean(FAULT_TOLERANT_REFRESH);
}

public boolean findLatest() {
// Should be non-null, a default value is defined
return getBoolean(FIND_LATEST_ARTIFACT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
*/
package io.apicurio.registry.resolver;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import io.apicurio.registry.resolver.data.Record;
import io.apicurio.registry.resolver.strategy.ArtifactReference;

import static org.junit.jupiter.api.Assertions.*;

public class AbstractSchemaResolverTest {
@Test
void testConfigureInitializesSchemaCache() throws Exception {
Expand All @@ -37,6 +38,31 @@ void testConfigureInitializesSchemaCache() throws Exception {
}
}

@Test
void testSupportsFailureTolerantSchemaCache() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost");
configs.put(SchemaResolverConfig.FAULT_TOLERANT_REFRESH, true);

try (TestAbstractSchemaResolver<Object, Object> resolver = new TestAbstractSchemaResolver<>()) {
resolver.configure(configs, null);

assertTrue(resolver.schemaCache.isFaultTolerantRefresh());
}
}

@Test
void testDefaultsToFailureTolerantSchemaCacheDisabled() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaResolverConfig.REGISTRY_URL, "http://localhost");

try (TestAbstractSchemaResolver<Object, Object> resolver = new TestAbstractSchemaResolver<>()) {
resolver.configure(configs, null);

assertFalse(resolver.schemaCache.isFaultTolerantRefresh());
}
}

class TestAbstractSchemaResolver<SCHEMA, DATA> extends AbstractSchemaResolver<SCHEMA, DATA> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,34 @@ void testClearEmptiesContentHashIndex() {
assertFalse(cache.containsByContentHash(contentHashKey));
}

@Test
void testThrowsLoadExceptionsByDefault() {
String contentHashKey = "another key";
ERCache<String> cache = newCache(contentHashKey);
Function<String, String> staticValueLoader = (key) -> {throw new IllegalStateException("load failure");};

assertThrows(IllegalStateException.class, () -> {cache.getByContentHash(contentHashKey, staticValueLoader);});
}

@Test
void testHoldsLoadExceptionsWhenFaultTolerantRefreshEnabled() {
String contentHashKey = "another key";
ERCache<String> cache = newCache(contentHashKey);
cache.configureLifetime(Duration.ZERO);
cache.configureFaultTolerantRefresh(true);

// Seed a value
Function<String, String> workingLoader = (key) -> {return "some value";};
String originalLoadValue = cache.getByContentHash(contentHashKey, workingLoader);

// Refresh with a failing loader
Function<String, String> failingLoader = (key) -> {throw new IllegalStateException("load failure");};
String failingLoadValue = cache.getByContentHash(contentHashKey, failingLoader);

assertEquals("some value", originalLoadValue);
assertEquals("some value", failingLoadValue);
}

private ERCache<String> newCache(String contentHashKey) {
ERCache<String> cache = new ERCache<>();
cache.configureLifetime(Duration.ofDays(30));
Expand Down

0 comments on commit 0ac844a

Please sign in to comment.