Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.grpc.ManagedChannel;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Executor;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
Expand Down Expand Up @@ -44,7 +45,7 @@ public class DefaultClientRegistry implements ClientRegistry {
private final TraceAttributeReader<StructuredTrace, Event> attributeReader;
private final GrpcChannelRegistry grpcChannelRegistry = new GrpcChannelRegistry();

public DefaultClientRegistry(Config config) {
public DefaultClientRegistry(Config config, Executor cacheLoaderExecutor) {
this.attributeServiceChannel =
this.buildChannel(
config.getString(ATTRIBUTE_SERVICE_HOST_KEY),
Expand All @@ -66,9 +67,10 @@ public DefaultClientRegistry(Config config) {
this.edsCacheClient =
new EdsCacheClient(
new EntityDataServiceClient(this.entityServiceChannel),
EntityServiceClientConfig.from(config).getCacheConfig());
EntityServiceClientConfig.from(config).getCacheConfig(),
cacheLoaderExecutor);
this.entityDataClient = EntityDataClient.builder(this.entityServiceChannel).build();
this.entityCache = new EntityCache(this.edsCacheClient);
this.entityCache = new EntityCache(this.edsCacheClient, cacheLoaderExecutor);
this.entityAccessor =
new TraceEntityAccessorBuilder(
EntityTypeClient.builder(this.entityServiceChannel).build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.hypertrace.core.grpcutils.context.ContextualKey;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
Expand All @@ -30,105 +30,103 @@ public class EntityCache {
* Cache to cache the service fqn to service Entity mapping so that we don't look it up over and
* over.
*/
private final LoadingCache<Pair<String, String>, Optional<Entity>> fqnToServiceEntity =
CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.recordStats()
.build(
new CacheLoader<>() {
public Optional<Entity> load(@Nonnull Pair<String, String> pair) {
AttributeValue fqnAttribute =
AttributeValue.newBuilder()
.setValue(Value.newBuilder().setString(pair.getRight()))
.build();

ByTypeAndIdentifyingAttributes request =
ByTypeAndIdentifyingAttributes.newBuilder()
.setEntityType(EntityType.SERVICE.name())
.putIdentifyingAttributes(
EntityConstants.getValue(CommonAttribute.COMMON_ATTRIBUTE_FQN),
fqnAttribute)
.build();

return Optional.ofNullable(
edsClient.getByTypeAndIdentifyingAttributes(pair.getLeft(), request));
}
});

private final LoadingCache<Pair<String, String>, Optional<Entity>> fqnToServiceEntityCache;
/**
* Cache to cache the service name to a list of services mapping so that we don't look it up over
* and over.
*/
private final LoadingCache<Pair<String, String>, List<Entity>> nameToServiceEntities =
CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(
new CacheLoader<>() {
@Override
public List<Entity> load(@Nonnull Pair<String, String> pair) {
// Lookup by name first, to see if there are any services with that name.
return edsClient.getEntitiesByName(
pair.getLeft(), EntityType.SERVICE.name(), pair.getRight());
}
});
private final LoadingCache<Pair<String, String>, List<Entity>> nameToServiceEntitiesCache;

/**
* Cache of K8S namespaces Key: Customer Id, Namespace name Value: List of Namespace entity ids
*/
private final LoadingCache<Pair<String, String>, List<Entity>> namespaceCache =
CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(
new CacheLoader<>() {
@Override
public List<Entity> load(@Nonnull Pair<String, String> key) {
return edsClient.getEntitiesByName(
key.getLeft(), EntityType.K8S_NAMESPACE.name(), key.getRight());
}
});
private final LoadingCache<Pair<String, String>, List<Entity>> nameToNamespaceEntitiesCache;

/**
* Cache of Backend identifying attributes to Entity Key: Map of identifying attributes Value:
* Optional Backend entity
*/
private final LoadingCache<ContextualKey<Map<String, AttributeValue>>, Optional<Entity>>
backendIdAttrsToEntityCache =
CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(CacheLoader.from(this::loadBackendFromIdentifyingAttributes));
backendIdAttrsToEntityCache;

public EntityCache(EdsClient edsClient) {
public EntityCache(EdsClient edsClient, Executor asyncCacheLoaderExecutor) {
this.edsClient = edsClient;
fqnToServiceEntityCache =
CacheBuilder.newBuilder()
.maximumSize(10000)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ability to configure these caches is an existing problem. Will address them in a different PR.

.refreshAfterWrite(4, TimeUnit.MINUTES)
.expireAfterWrite(5, TimeUnit.MINUTES)
.recordStats()
.build(
CacheLoader.asyncReloading(
CacheLoader.from(this::loadServiceFromFQN), asyncCacheLoaderExecutor));

nameToServiceEntitiesCache =
CacheBuilder.newBuilder()
.maximumSize(10000)
.refreshAfterWrite(4, TimeUnit.MINUTES)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(
CacheLoader.asyncReloading(
CacheLoader.from(
tenantIdServiceNamePair ->
edsClient.getEntitiesByName(
tenantIdServiceNamePair.getLeft(),
EntityType.SERVICE.name(),
tenantIdServiceNamePair.getRight())),
asyncCacheLoaderExecutor));

nameToNamespaceEntitiesCache =
CacheBuilder.newBuilder()
.maximumSize(10000)
.refreshAfterWrite(4, TimeUnit.MINUTES)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(
CacheLoader.asyncReloading(
CacheLoader.from(
key ->
edsClient.getEntitiesByName(
key.getLeft(), EntityType.K8S_NAMESPACE.name(), key.getRight())),
asyncCacheLoaderExecutor));

backendIdAttrsToEntityCache =
CacheBuilder.newBuilder()
.maximumSize(10000)
.refreshAfterWrite(4, TimeUnit.MINUTES)
.expireAfterWrite(5, TimeUnit.MINUTES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we missing refreshAfterWrite here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Fixed it.

.build(
CacheLoader.asyncReloading(
CacheLoader.from(this::loadBackendFromIdentifyingAttributes),
asyncCacheLoaderExecutor));

PlatformMetricsRegistry.registerCache(
this.getClass().getName() + DOT + "fqnToServiceEntity",
fqnToServiceEntity,
this.getClass().getName() + DOT + "fqnToServiceEntityCache",
fqnToServiceEntityCache,
Collections.emptyMap());
PlatformMetricsRegistry.registerCache(
this.getClass().getName() + DOT + "nameToServiceEntities",
nameToServiceEntities,
this.getClass().getName() + DOT + "nameToServiceEntitiesCache",
nameToServiceEntitiesCache,
Collections.emptyMap());
PlatformMetricsRegistry.registerCache(
this.getClass().getName() + DOT + "namespaceCache", namespaceCache, Collections.emptyMap());
this.getClass().getName() + DOT + "nameToNamespaceEntitiesCache",
nameToNamespaceEntitiesCache,
Collections.emptyMap());
PlatformMetricsRegistry.registerCache(
this.getClass().getName() + DOT + "backendIdAttrsToEntityCache",
backendIdAttrsToEntityCache,
Collections.emptyMap());
}

public LoadingCache<Pair<String, String>, Optional<Entity>> getFqnToServiceEntityCache() {
return fqnToServiceEntity;
return fqnToServiceEntityCache;
}

public LoadingCache<Pair<String, String>, List<Entity>> getNameToServiceEntitiesCache() {
return nameToServiceEntities;
return nameToServiceEntitiesCache;
}

public LoadingCache<Pair<String, String>, List<Entity>> getNameToNamespaceEntityIdCache() {
return namespaceCache;
return nameToNamespaceEntitiesCache;
}

public LoadingCache<ContextualKey<Map<String, AttributeValue>>, Optional<Entity>>
Expand All @@ -147,4 +145,21 @@ protected Optional<Entity> loadBackendFromIdentifyingAttributes(
edsClient.getByTypeAndIdentifyingAttributes(
key.getContext().getTenantId().orElseThrow(), request));
}

protected Optional<Entity> loadServiceFromFQN(Pair<String, String> tenantIdFQNPair) {
AttributeValue fqnAttribute =
AttributeValue.newBuilder()
.setValue(Value.newBuilder().setString(tenantIdFQNPair.getRight()))
.build();

ByTypeAndIdentifyingAttributes request =
ByTypeAndIdentifyingAttributes.newBuilder()
.setEntityType(EntityType.SERVICE.name())
.putIdentifyingAttributes(
EntityConstants.getValue(CommonAttribute.COMMON_ATTRIBUTE_FQN), fqnAttribute)
.build();

return Optional.ofNullable(
edsClient.getByTypeAndIdentifyingAttributes(tenantIdFQNPair.getLeft(), request));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class DefaultServiceEntityEnricherTest extends AbstractAttributeEnricherT
@BeforeEach
public void setup() {
enricher = new DefaultServiceEntityEnricher();
entityCache = new EntityCache(this.edsClient);
entityCache = new EntityCache(this.edsClient, Runnable::run);
when(clientRegistry.getEdsCacheClient()).thenReturn(edsClient);
when(clientRegistry.getEntityCache()).thenReturn(entityCache);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class BackendEntityEnricherTest extends AbstractAttributeEnricherTest {
@BeforeEach
public void setup() {
enricher = new BackendEntityEnricher();
entityCache = new EntityCache(edsClient);
entityCache = new EntityCache(edsClient, Runnable::run);
when(clientRegistry.getEdsCacheClient()).thenReturn(edsClient);
when(clientRegistry.getEntityCache()).thenReturn(entityCache);
enricher.init(getEntityServiceConfig(), clientRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setup() {
// Not passing the Entity Data Service configuration, unless the container id
// in the span data in inside EDS
when(clientRegistry.getEdsCacheClient()).thenReturn(edsClient);
entityCache = new EntityCache(edsClient);
entityCache = new EntityCache(edsClient, Runnable::run);
when(clientRegistry.getEntityCache()).thenReturn(entityCache);

enrichmentProcessor = createEnricherProcessor(configs);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package org.hypertrace.traceenricher.trace.enricher;

import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.CACHE_LOADER_THREAD_POOL_SIZE;
import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.ENRICHER_CLIENTS_CONFIG_KEY;
import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.ENRICHER_CONFIG_TEMPLATE;
import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.ENRICHER_NAMES_CONFIG_KEY;
import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -28,10 +32,18 @@ public void init(ProcessorContext context) {
if (processor == null) {
synchronized (StructuredTraceEnrichProcessor.class) {
if (processor == null) {
Map<String, Config> enricherConfigs = getEnricherConfigs(context.appConfigs());
Config jobConfig =
(Config) context.appConfigs().get(STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY);
Map<String, Config> enricherConfigs = getEnricherConfigs(jobConfig);
EnrichmentRegistry enrichmentRegistry = new EnrichmentRegistry();
enrichmentRegistry.registerEnrichers(enricherConfigs);
clientRegistry = new DefaultClientRegistry(this.getClientsConfig(context.appConfigs()));
Config clientsConfig = this.getClientsConfig(jobConfig);
clientRegistry =
new DefaultClientRegistry(
clientsConfig,
Executors.newFixedThreadPool(
getCacheLoaderExecutorThreadsPoolSize(clientsConfig),
this.buildThreadFactory()));
processor =
new EnrichmentProcessor(
enrichmentRegistry.getOrderedRegisteredEnrichers(), clientRegistry);
Expand All @@ -51,8 +63,7 @@ public void close() {
// clientRegistry.shutdown(); TODO restore cleanup once shared static instance removed
}

private Map<String, Config> getEnricherConfigs(Map<String, Object> properties) {
Config jobConfig = (Config) properties.get(STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY);
private Map<String, Config> getEnricherConfigs(Config jobConfig) {
List<String> enrichers = jobConfig.getStringList(ENRICHER_NAMES_CONFIG_KEY);
Map<String, Config> enricherConfigs = new LinkedHashMap<>();
for (String enricher : enrichers) {
Expand All @@ -62,12 +73,24 @@ private Map<String, Config> getEnricherConfigs(Map<String, Object> properties) {
return enricherConfigs;
}

private Config getClientsConfig(Map<String, Object> properties) {
Config jobConfig = (Config) properties.get(STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY);
private Config getClientsConfig(Config jobConfig) {
return jobConfig.getConfig(ENRICHER_CLIENTS_CONFIG_KEY);
}

private int getCacheLoaderExecutorThreadsPoolSize(Config clientsConfig) {
return clientsConfig.hasPath(CACHE_LOADER_THREAD_POOL_SIZE)
? clientsConfig.getInt(CACHE_LOADER_THREAD_POOL_SIZE)
: 3;
}

private String getEnricherConfigPath(String enricher) {
return String.format(ENRICHER_CONFIG_TEMPLATE, enricher);
}

private ThreadFactory buildThreadFactory() {
return new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("async-cache-loader-%d")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class StructuredTraceEnricherConstants {
public static final String ENRICHER_CONFIGS_KEY = "enricher.configs";
public static final String ENRICHER_NAMES_CONFIG_KEY = "enricher.names";
public static final String ENRICHER_CLIENTS_CONFIG_KEY = "enricher.clients";
public static final String CACHE_LOADER_THREAD_POOL_SIZE = "cache.loader.thread.pool.size";
public static final String ENRICHER_CONFIG_TEMPLATE = "enricher.%s";
public static final String STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY =
"structured-traces-enrichment-job";
Expand Down