diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java index 483683036..cf64e4155 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java @@ -5,6 +5,7 @@ import io.grpc.ManagedChannel; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.concurrent.Executor; import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.StructuredTrace; @@ -44,7 +45,7 @@ public class DefaultClientRegistry implements ClientRegistry { private final TraceAttributeReader attributeReader; private final GrpcChannelRegistry grpcChannelRegistry = new GrpcChannelRegistry(); - public DefaultClientRegistry(Config config) { + public DefaultClientRegistry(Config config, Executor cacheLoaderExecutor) { this.attributeServiceChannel = this.buildChannel( config.getString(ATTRIBUTE_SERVICE_HOST_KEY), @@ -66,9 +67,10 @@ public DefaultClientRegistry(Config config) { this.edsCacheClient = new EdsCacheClient( new EntityDataServiceClient(this.entityServiceChannel), - EntityServiceClientConfig.from(config).getCacheConfig()); + EntityServiceClientConfig.from(config).getCacheConfig(), + cacheLoaderExecutor); this.entityDataClient = EntityDataClient.builder(this.entityServiceChannel).build(); - this.entityCache = new EntityCache(this.edsCacheClient); + this.entityCache = new EntityCache(this.edsCacheClient, cacheLoaderExecutor); this.entityAccessor = new TraceEntityAccessorBuilder( EntityTypeClient.builder(this.entityServiceChannel).build(), diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/cache/EntityCache.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/cache/EntityCache.java index a834bc76a..af582a5dd 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/cache/EntityCache.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/cache/EntityCache.java @@ -7,8 +7,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; import org.apache.commons.lang3.tuple.Pair; import org.hypertrace.core.grpcutils.context.ContextualKey; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; @@ -30,89 +30,87 @@ public class EntityCache { * Cache to cache the service fqn to service Entity mapping so that we don't look it up over and * over. */ - private final LoadingCache, Optional> fqnToServiceEntity = - CacheBuilder.newBuilder() - .maximumSize(10000) - .expireAfterWrite(5, TimeUnit.MINUTES) - .recordStats() - .build( - new CacheLoader<>() { - public Optional load(@Nonnull Pair pair) { - AttributeValue fqnAttribute = - AttributeValue.newBuilder() - .setValue(Value.newBuilder().setString(pair.getRight())) - .build(); - - ByTypeAndIdentifyingAttributes request = - ByTypeAndIdentifyingAttributes.newBuilder() - .setEntityType(EntityType.SERVICE.name()) - .putIdentifyingAttributes( - EntityConstants.getValue(CommonAttribute.COMMON_ATTRIBUTE_FQN), - fqnAttribute) - .build(); - - return Optional.ofNullable( - edsClient.getByTypeAndIdentifyingAttributes(pair.getLeft(), request)); - } - }); - + private final LoadingCache, Optional> fqnToServiceEntityCache; /** * Cache to cache the service name to a list of services mapping so that we don't look it up over * and over. */ - private final LoadingCache, List> nameToServiceEntities = - CacheBuilder.newBuilder() - .maximumSize(10000) - .expireAfterWrite(5, TimeUnit.MINUTES) - .build( - new CacheLoader<>() { - @Override - public List load(@Nonnull Pair pair) { - // Lookup by name first, to see if there are any services with that name. - return edsClient.getEntitiesByName( - pair.getLeft(), EntityType.SERVICE.name(), pair.getRight()); - } - }); + private final LoadingCache, List> nameToServiceEntitiesCache; /** * Cache of K8S namespaces Key: Customer Id, Namespace name Value: List of Namespace entity ids */ - private final LoadingCache, List> namespaceCache = - CacheBuilder.newBuilder() - .maximumSize(10000) - .expireAfterWrite(5, TimeUnit.MINUTES) - .build( - new CacheLoader<>() { - @Override - public List load(@Nonnull Pair key) { - return edsClient.getEntitiesByName( - key.getLeft(), EntityType.K8S_NAMESPACE.name(), key.getRight()); - } - }); + private final LoadingCache, List> nameToNamespaceEntitiesCache; /** * Cache of Backend identifying attributes to Entity Key: Map of identifying attributes Value: * Optional Backend entity */ private final LoadingCache>, Optional> - backendIdAttrsToEntityCache = - CacheBuilder.newBuilder() - .maximumSize(10000) - .expireAfterWrite(5, TimeUnit.MINUTES) - .build(CacheLoader.from(this::loadBackendFromIdentifyingAttributes)); + backendIdAttrsToEntityCache; - public EntityCache(EdsClient edsClient) { + public EntityCache(EdsClient edsClient, Executor asyncCacheLoaderExecutor) { this.edsClient = edsClient; + fqnToServiceEntityCache = + CacheBuilder.newBuilder() + .maximumSize(10000) + .refreshAfterWrite(4, TimeUnit.MINUTES) + .expireAfterWrite(5, TimeUnit.MINUTES) + .recordStats() + .build( + CacheLoader.asyncReloading( + CacheLoader.from(this::loadServiceFromFQN), asyncCacheLoaderExecutor)); + + nameToServiceEntitiesCache = + CacheBuilder.newBuilder() + .maximumSize(10000) + .refreshAfterWrite(4, TimeUnit.MINUTES) + .expireAfterWrite(5, TimeUnit.MINUTES) + .build( + CacheLoader.asyncReloading( + CacheLoader.from( + tenantIdServiceNamePair -> + edsClient.getEntitiesByName( + tenantIdServiceNamePair.getLeft(), + EntityType.SERVICE.name(), + tenantIdServiceNamePair.getRight())), + asyncCacheLoaderExecutor)); + + nameToNamespaceEntitiesCache = + CacheBuilder.newBuilder() + .maximumSize(10000) + .refreshAfterWrite(4, TimeUnit.MINUTES) + .expireAfterWrite(5, TimeUnit.MINUTES) + .build( + CacheLoader.asyncReloading( + CacheLoader.from( + key -> + edsClient.getEntitiesByName( + key.getLeft(), EntityType.K8S_NAMESPACE.name(), key.getRight())), + asyncCacheLoaderExecutor)); + + backendIdAttrsToEntityCache = + CacheBuilder.newBuilder() + .maximumSize(10000) + .refreshAfterWrite(4, TimeUnit.MINUTES) + .expireAfterWrite(5, TimeUnit.MINUTES) + .build( + CacheLoader.asyncReloading( + CacheLoader.from(this::loadBackendFromIdentifyingAttributes), + asyncCacheLoaderExecutor)); + PlatformMetricsRegistry.registerCache( - this.getClass().getName() + DOT + "fqnToServiceEntity", - fqnToServiceEntity, + this.getClass().getName() + DOT + "fqnToServiceEntityCache", + fqnToServiceEntityCache, Collections.emptyMap()); PlatformMetricsRegistry.registerCache( - this.getClass().getName() + DOT + "nameToServiceEntities", - nameToServiceEntities, + this.getClass().getName() + DOT + "nameToServiceEntitiesCache", + nameToServiceEntitiesCache, Collections.emptyMap()); PlatformMetricsRegistry.registerCache( - this.getClass().getName() + DOT + "namespaceCache", namespaceCache, Collections.emptyMap()); + this.getClass().getName() + DOT + "nameToNamespaceEntitiesCache", + nameToNamespaceEntitiesCache, + Collections.emptyMap()); PlatformMetricsRegistry.registerCache( this.getClass().getName() + DOT + "backendIdAttrsToEntityCache", backendIdAttrsToEntityCache, @@ -120,15 +118,15 @@ public EntityCache(EdsClient edsClient) { } public LoadingCache, Optional> getFqnToServiceEntityCache() { - return fqnToServiceEntity; + return fqnToServiceEntityCache; } public LoadingCache, List> getNameToServiceEntitiesCache() { - return nameToServiceEntities; + return nameToServiceEntitiesCache; } public LoadingCache, List> getNameToNamespaceEntityIdCache() { - return namespaceCache; + return nameToNamespaceEntitiesCache; } public LoadingCache>, Optional> @@ -147,4 +145,21 @@ protected Optional loadBackendFromIdentifyingAttributes( edsClient.getByTypeAndIdentifyingAttributes( key.getContext().getTenantId().orElseThrow(), request)); } + + protected Optional loadServiceFromFQN(Pair tenantIdFQNPair) { + AttributeValue fqnAttribute = + AttributeValue.newBuilder() + .setValue(Value.newBuilder().setString(tenantIdFQNPair.getRight())) + .build(); + + ByTypeAndIdentifyingAttributes request = + ByTypeAndIdentifyingAttributes.newBuilder() + .setEntityType(EntityType.SERVICE.name()) + .putIdentifyingAttributes( + EntityConstants.getValue(CommonAttribute.COMMON_ATTRIBUTE_FQN), fqnAttribute) + .build(); + + return Optional.ofNullable( + edsClient.getByTypeAndIdentifyingAttributes(tenantIdFQNPair.getLeft(), request)); + } } diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/DefaultServiceEntityEnricherTest.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/DefaultServiceEntityEnricherTest.java index cb1df8c33..42901dbb2 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/DefaultServiceEntityEnricherTest.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/DefaultServiceEntityEnricherTest.java @@ -49,7 +49,7 @@ public class DefaultServiceEntityEnricherTest extends AbstractAttributeEnricherT @BeforeEach public void setup() { enricher = new DefaultServiceEntityEnricher(); - entityCache = new EntityCache(this.edsClient); + entityCache = new EntityCache(this.edsClient, Runnable::run); when(clientRegistry.getEdsCacheClient()).thenReturn(edsClient); when(clientRegistry.getEntityCache()).thenReturn(entityCache); diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/backend/BackendEntityEnricherTest.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/backend/BackendEntityEnricherTest.java index f32e8feb5..520e662e4 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/backend/BackendEntityEnricherTest.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/backend/BackendEntityEnricherTest.java @@ -60,7 +60,7 @@ public class BackendEntityEnricherTest extends AbstractAttributeEnricherTest { @BeforeEach public void setup() { enricher = new BackendEntityEnricher(); - entityCache = new EntityCache(edsClient); + entityCache = new EntityCache(edsClient, Runnable::run); when(clientRegistry.getEdsCacheClient()).thenReturn(edsClient); when(clientRegistry.getEntityCache()).thenReturn(entityCache); enricher.init(getEntityServiceConfig(), clientRegistry); diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/processor/StructuredTracesEnrichmentTest.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/processor/StructuredTracesEnrichmentTest.java index 71c790f68..c852b0ec3 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/processor/StructuredTracesEnrichmentTest.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/processor/StructuredTracesEnrichmentTest.java @@ -69,7 +69,7 @@ public void setup() { // Not passing the Entity Data Service configuration, unless the container id // in the span data in inside EDS when(clientRegistry.getEdsCacheClient()).thenReturn(edsClient); - entityCache = new EntityCache(edsClient); + entityCache = new EntityCache(edsClient, Runnable::run); when(clientRegistry.getEntityCache()).thenReturn(entityCache); enrichmentProcessor = createEnricherProcessor(configs); diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java index 5987cb09b..b3b0c72c1 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java @@ -1,14 +1,18 @@ package org.hypertrace.traceenricher.trace.enricher; +import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.CACHE_LOADER_THREAD_POOL_SIZE; import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.ENRICHER_CLIENTS_CONFIG_KEY; import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.ENRICHER_CONFIG_TEMPLATE; import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.ENRICHER_NAMES_CONFIG_KEY; import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.typesafe.config.Config; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; @@ -28,10 +32,18 @@ public void init(ProcessorContext context) { if (processor == null) { synchronized (StructuredTraceEnrichProcessor.class) { if (processor == null) { - Map enricherConfigs = getEnricherConfigs(context.appConfigs()); + Config jobConfig = + (Config) context.appConfigs().get(STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY); + Map enricherConfigs = getEnricherConfigs(jobConfig); EnrichmentRegistry enrichmentRegistry = new EnrichmentRegistry(); enrichmentRegistry.registerEnrichers(enricherConfigs); - clientRegistry = new DefaultClientRegistry(this.getClientsConfig(context.appConfigs())); + Config clientsConfig = this.getClientsConfig(jobConfig); + clientRegistry = + new DefaultClientRegistry( + clientsConfig, + Executors.newFixedThreadPool( + getCacheLoaderExecutorThreadsPoolSize(clientsConfig), + this.buildThreadFactory())); processor = new EnrichmentProcessor( enrichmentRegistry.getOrderedRegisteredEnrichers(), clientRegistry); @@ -51,8 +63,7 @@ public void close() { // clientRegistry.shutdown(); TODO restore cleanup once shared static instance removed } - private Map getEnricherConfigs(Map properties) { - Config jobConfig = (Config) properties.get(STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY); + private Map getEnricherConfigs(Config jobConfig) { List enrichers = jobConfig.getStringList(ENRICHER_NAMES_CONFIG_KEY); Map enricherConfigs = new LinkedHashMap<>(); for (String enricher : enrichers) { @@ -62,12 +73,24 @@ private Map getEnricherConfigs(Map properties) { return enricherConfigs; } - private Config getClientsConfig(Map properties) { - Config jobConfig = (Config) properties.get(STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY); + private Config getClientsConfig(Config jobConfig) { return jobConfig.getConfig(ENRICHER_CLIENTS_CONFIG_KEY); } + private int getCacheLoaderExecutorThreadsPoolSize(Config clientsConfig) { + return clientsConfig.hasPath(CACHE_LOADER_THREAD_POOL_SIZE) + ? clientsConfig.getInt(CACHE_LOADER_THREAD_POOL_SIZE) + : 3; + } + private String getEnricherConfigPath(String enricher) { return String.format(ENRICHER_CONFIG_TEMPLATE, enricher); } + + private ThreadFactory buildThreadFactory() { + return new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("async-cache-loader-%d") + .build(); + } } diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnricherConstants.java b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnricherConstants.java index 39c225f60..adf06305d 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnricherConstants.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnricherConstants.java @@ -7,6 +7,7 @@ public class StructuredTraceEnricherConstants { public static final String ENRICHER_CONFIGS_KEY = "enricher.configs"; public static final String ENRICHER_NAMES_CONFIG_KEY = "enricher.names"; public static final String ENRICHER_CLIENTS_CONFIG_KEY = "enricher.clients"; + public static final String CACHE_LOADER_THREAD_POOL_SIZE = "cache.loader.thread.pool.size"; public static final String ENRICHER_CONFIG_TEMPLATE = "enricher.%s"; public static final String STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY = "structured-traces-enrichment-job";