Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ dependencies {
implementation(project(":span-normalizer:raw-span-constants"))
implementation(project(":span-normalizer:span-normalizer-constants"))
implementation(project(":semantic-convention-utils"))
implementation("org.hypertrace.entity.service:entity-service-api:0.6.10")
implementation("org.hypertrace.entity.service:entity-service-api:0.8.0")

testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
implementation(project(":hypertrace-trace-enricher:trace-reader"))

implementation("org.hypertrace.core.datamodel:data-model:0.1.18")
implementation("org.hypertrace.entity.service:entity-service-client:0.6.10")
implementation("org.hypertrace.entity.service:entity-service-client:0.8.0")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.28")
implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.5.2")
implementation("org.hypertrace.config.service:spaces-config-service-api:0.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.entity.data.service.client.EdsCacheClient;
import org.hypertrace.trace.accessor.entities.TraceEntityAccessor;
import org.hypertrace.trace.reader.attributes.TraceAttributeReader;
import org.hypertrace.trace.reader.entities.TraceEntityReader;
import org.hypertrace.traceenricher.enrichment.enrichers.cache.EntityCache;

public interface ClientRegistry {
Expand All @@ -17,7 +17,7 @@ public interface ClientRegistry {

Channel getConfigServiceChannel();

TraceEntityReader<StructuredTrace, Event> getEntityReader();
TraceEntityAccessor getTraceEntityAccessor();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is both reading and writing, tried my hand at a rename... happy to take suggestions though!


TraceAttributeReader<StructuredTrace, Event> getAttributeReader();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import org.hypertrace.entity.data.service.rxclient.EntityDataClient;
import org.hypertrace.entity.service.client.config.EntityServiceClientConfig;
import org.hypertrace.entity.type.service.rxclient.EntityTypeClient;
import org.hypertrace.trace.accessor.entities.TraceEntityAccessor;
import org.hypertrace.trace.accessor.entities.TraceEntityAccessorBuilder;
import org.hypertrace.trace.reader.attributes.TraceAttributeReader;
import org.hypertrace.trace.reader.attributes.TraceAttributeReaderFactory;
import org.hypertrace.trace.reader.entities.TraceEntityReader;
import org.hypertrace.trace.reader.entities.TraceEntityReaderBuilder;
import org.hypertrace.traceenricher.enrichment.enrichers.cache.EntityCache;

public class DefaultClientRegistry implements ClientRegistry {
Expand All @@ -36,7 +36,7 @@ public class DefaultClientRegistry implements ClientRegistry {
private final EdsCacheClient edsCacheClient;
private final CachingAttributeClient cachingAttributeClient;
private final EntityCache entityCache;
private final TraceEntityReader<StructuredTrace, Event> entityReader;
private final TraceEntityAccessor entityAccessor;
private final TraceAttributeReader<StructuredTrace, Event> attributeReader;
private final GrpcChannelRegistry grpcChannelRegistry = new GrpcChannelRegistry();

Expand Down Expand Up @@ -64,8 +64,8 @@ public DefaultClientRegistry(Config config) {
new EntityDataServiceClient(this.entityServiceChannel),
EntityServiceClientConfig.from(config).getCacheConfig());
this.entityCache = new EntityCache(this.edsCacheClient);
this.entityReader =
new TraceEntityReaderBuilder(
this.entityAccessor =
new TraceEntityAccessorBuilder(
EntityTypeClient.builder(this.entityServiceChannel).build(),
EntityDataClient.builder(this.entityServiceChannel).build(),
this.cachingAttributeClient)
Expand All @@ -92,8 +92,8 @@ public Channel getConfigServiceChannel() {
}

@Override
public TraceEntityReader<StructuredTrace, Event> getEntityReader() {
return this.entityReader;
public TraceEntityAccessor getTraceEntityAccessor() {
return this.entityAccessor;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,27 @@
import com.typesafe.config.Config;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.trace.reader.entities.TraceEntityReader;
import org.hypertrace.trace.accessor.entities.TraceEntityAccessor;
import org.hypertrace.traceenricher.enrichment.AbstractTraceEnricher;
import org.hypertrace.traceenricher.enrichment.clients.ClientRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntitySpanEnricher extends AbstractTraceEnricher {
private static final Logger LOG = LoggerFactory.getLogger(EntitySpanEnricher.class);
private TraceEntityReader<StructuredTrace, Event> entityReader;
private TraceEntityAccessor entityAccessor;

@Override
public void enrichEvent(StructuredTrace trace, Event event) {
// Don't block, this is just meant to eventually write the entities
this.entityReader
.getAssociatedEntitiesForSpan(trace, event)
.doOnError(error -> LOG.error("Failed to enrich entities on span", error))
.onErrorComplete()
.subscribe();
try {
this.entityAccessor.writeAssociatedEntitiesForSpanEventually(trace, event);
} catch (Exception exception) {
LOG.error("Failed to enrich entities on span", exception);
}
}

@Override
public void init(Config enricherConfig, ClientRegistry clientRegistry) {
this.entityReader = clientRegistry.getEntityReader();
this.entityAccessor = clientRegistry.getTraceEntityAccessor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dependencies {
implementation("org.hypertrace.core.datamodel:data-model:0.1.18")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26")
implementation("org.hypertrace.entity.service:entity-service-client:0.6.10")
implementation("org.hypertrace.entity.service:entity-service-client:0.8.0")

implementation("com.typesafe:config:1.4.1")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21")
Expand Down
4 changes: 2 additions & 2 deletions hypertrace-trace-enricher/trace-reader/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ plugins {
dependencies {
api("org.hypertrace.core.attribute.service:attribute-service-api:0.12.3")
api("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.12.3")
api("org.hypertrace.entity.service:entity-type-service-rx-client:0.6.10")
api("org.hypertrace.entity.service:entity-data-service-rx-client:0.6.10")
api("org.hypertrace.entity.service:entity-type-service-rx-client:0.8.0")
api("org.hypertrace.entity.service:entity-data-service-rx-client:0.8.0")
api("org.hypertrace.core.datamodel:data-model:0.1.17")
implementation("org.hypertrace.core.attribute.service:attribute-projection-registry:0.12.3")
implementation("org.hypertrace.core.grpcutils:grpc-client-rx-utils:0.5.2")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.hypertrace.trace.reader.entities;
package org.hypertrace.trace.accessor.entities;

import io.reactivex.rxjava3.core.Maybe;
import org.hypertrace.core.attribute.service.v1.LiteralValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.hypertrace.trace.reader.entities;
package org.hypertrace.trace.accessor.entities;

import static java.util.Objects.nonNull;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package org.hypertrace.trace.reader.entities;
package org.hypertrace.trace.accessor.entities;

import static io.reactivex.rxjava3.core.Maybe.zip;
import static java.util.function.Predicate.not;

import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.attribute.service.v1.AttributeMetadata;
import org.hypertrace.core.attribute.service.v1.AttributeSource;
import org.hypertrace.core.attribute.service.v1.LiteralValue;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.grpcutils.client.rx.GrpcRxExecutionContext;
import org.hypertrace.core.grpcutils.context.RequestContext;
import org.hypertrace.entity.data.service.rxclient.EntityDataClient;
import org.hypertrace.entity.data.service.v1.AttributeValue;
import org.hypertrace.entity.data.service.v1.AttributeValue.TypeCase;
Expand All @@ -32,19 +32,18 @@
import org.hypertrace.trace.reader.attributes.TraceAttributeReader;

@Slf4j
class DefaultTraceEntityReader<T extends GenericRecord, S extends GenericRecord>
implements TraceEntityReader<T, S> {
class DefaultTraceEntityAccessor implements TraceEntityAccessor {
private final EntityTypeClient entityTypeClient;
private final EntityDataClient entityDataClient;
private final CachingAttributeClient attributeClient;
private final TraceAttributeReader<T, S> traceAttributeReader;
private final TraceAttributeReader<StructuredTrace, Event> traceAttributeReader;
private final Duration writeThrottleDuration;

DefaultTraceEntityReader(
DefaultTraceEntityAccessor(
EntityTypeClient entityTypeClient,
EntityDataClient entityDataClient,
CachingAttributeClient attributeClient,
TraceAttributeReader<T, S> traceAttributeReader,
TraceAttributeReader<StructuredTrace, Event> traceAttributeReader,
Duration writeThrottleDuration) {
this.entityTypeClient = entityTypeClient;
this.entityDataClient = entityDataClient;
Expand All @@ -54,41 +53,32 @@ class DefaultTraceEntityReader<T extends GenericRecord, S extends GenericRecord>
}

@Override
public Maybe<Entity> getAssociatedEntityForSpan(String entityType, T trace, S span) {
return spanTenantContext(span)
.wrapSingle(() -> this.entityTypeClient.get(entityType))
.flatMapMaybe(
entityTypeDefinition -> this.getAndWriteEntity(entityTypeDefinition, trace, span));
public void writeAssociatedEntitiesForSpanEventually(StructuredTrace trace, Event span) {
this.spanTenantContext(span)
.wrapSingle(() -> this.entityTypeClient.getAll().toList())
.blockingGet()
.forEach(entityType -> this.writeEntityIfExists(entityType, trace, span));
}

@Override
public Single<Map<String, Entity>> getAssociatedEntitiesForSpan(T trace, S span) {
return spanTenantContext(span)
.wrapSingle(
() ->
this.entityTypeClient
.getAll()
.flatMapMaybe(entityType -> this.getAndWriteEntity(entityType, trace, span))
.toMap(Entity::getEntityType)
.map(Collections::unmodifiableMap));
}

private Maybe<Entity> getAndWriteEntity(EntityType entityType, T trace, S span) {
return this.buildEntity(entityType, trace, span)
.flatMapSingle(
entity ->
this.buildUpsertCondition(entityType, trace, span)
.defaultIfEmpty(UpsertCondition.getDefaultInstance())
.flatMap(
condition ->
spanTenantContext(span)
.wrapSingle(
() ->
this.entityDataClient.createOrUpdateEntityEventually(
entity, condition, this.writeThrottleDuration))));
private void writeEntityIfExists(EntityType entityType, StructuredTrace trace, Event span) {
Entity entity = this.buildEntity(entityType, trace, span).blockingGet();
if (entity == null) {
return;
}
UpsertCondition upsertCondition =
this.buildUpsertCondition(entityType, trace, span)
.defaultIfEmpty(UpsertCondition.getDefaultInstance())
.blockingGet();

this.entityDataClient.createOrUpdateEntityEventually(
RequestContext.forTenantId(this.traceAttributeReader.getTenantId(span)),
entity,
upsertCondition,
this.writeThrottleDuration);
}

private Maybe<UpsertCondition> buildUpsertCondition(EntityType entityType, T trace, S span) {
private Maybe<UpsertCondition> buildUpsertCondition(
EntityType entityType, StructuredTrace trace, Event span) {
if (entityType.getTimestampAttributeKey().isEmpty()) {
return Maybe.empty();
}
Expand All @@ -106,7 +96,7 @@ private Maybe<UpsertCondition> buildUpsertCondition(EntityType entityType, T tra
}

private Maybe<UpsertCondition> buildUpsertCondition(
AttributeMetadata attribute, PredicateOperator operator, T trace, S span) {
AttributeMetadata attribute, PredicateOperator operator, StructuredTrace trace, Event span) {

return this.traceAttributeReader
.getSpanValue(trace, span, attribute.getScopeString(), attribute.getKey())
Expand All @@ -128,7 +118,7 @@ private Maybe<UpsertCondition> buildUpsertCondition(
.build());
}

private Maybe<Entity> buildEntity(EntityType entityType, T trace, S span) {
private Maybe<Entity> buildEntity(EntityType entityType, StructuredTrace trace, Event span) {
Maybe<Map<String, AttributeValue>> attributes =
this.resolveAllAttributes(entityType.getAttributeScope(), trace, span).cache();

Expand Down Expand Up @@ -171,7 +161,8 @@ private boolean passesFormationCondition(Entity entity, EntityFormationCondition
}
}

private Maybe<Map<String, AttributeValue>> resolveAllAttributes(String scope, T trace, S span) {
private Maybe<Map<String, AttributeValue>> resolveAllAttributes(
String scope, StructuredTrace trace, Event span) {
return spanTenantContext(span)
.wrapSingle(() -> this.attributeClient.getAllInScope(scope))
.flattenAsObservable(list -> list)
Expand All @@ -182,7 +173,7 @@ private Maybe<Map<String, AttributeValue>> resolveAllAttributes(String scope, T
}

private Maybe<Entry<String, AttributeValue>> resolveAttribute(
AttributeMetadata attributeMetadata, T trace, S span) {
AttributeMetadata attributeMetadata, StructuredTrace trace, Event span) {
return this.traceAttributeReader
.getSpanValue(trace, span, attributeMetadata.getScopeString(), attributeMetadata.getKey())
.onErrorComplete()
Expand All @@ -200,7 +191,7 @@ private Optional<String> extractNonEmptyString(
.filter(not(String::isEmpty));
}

private GrpcRxExecutionContext spanTenantContext(S span) {
private GrpcRxExecutionContext spanTenantContext(Event span) {
return GrpcRxExecutionContext.forTenantContext(traceAttributeReader.getTenantId(span));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.hypertrace.trace.accessor.entities;

import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;

public interface TraceEntityAccessor {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the previously parameterized version was always used with the same parameters, so removed them.


void writeAssociatedEntitiesForSpanEventually(StructuredTrace trace, Event span);
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package org.hypertrace.trace.reader.entities;
package org.hypertrace.trace.accessor.entities;

import java.time.Duration;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.entity.data.service.rxclient.EntityDataClient;
import org.hypertrace.entity.type.service.rxclient.EntityTypeClient;
import org.hypertrace.trace.reader.attributes.TraceAttributeReaderFactory;

public class TraceEntityReaderBuilder {
public class TraceEntityAccessorBuilder {
private final EntityTypeClient entityTypeClient;
private final EntityDataClient entityDataClient;
private final CachingAttributeClient attributeClient;
private Duration entityWriteThrottleDuration = Duration.ofSeconds(15);

public TraceEntityReaderBuilder(
public TraceEntityAccessorBuilder(
EntityTypeClient entityTypeClient,
EntityDataClient entityDataClient,
CachingAttributeClient attributeClient) {
Expand All @@ -23,13 +21,13 @@ public TraceEntityReaderBuilder(
this.attributeClient = attributeClient;
}

public TraceEntityReaderBuilder withEntityWriteThrottleDuration(Duration duration) {
public TraceEntityAccessorBuilder withEntityWriteThrottleDuration(Duration duration) {
this.entityWriteThrottleDuration = duration;
return this;
}

public TraceEntityReader<StructuredTrace, Event> build() {
return new DefaultTraceEntityReader<>(
public TraceEntityAccessor build() {
return new DefaultTraceEntityAccessor(
this.entityTypeClient,
this.entityDataClient,
this.attributeClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.hypertrace.trace.reader.entities;
package org.hypertrace.trace.accessor.entities;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this package, but I left the attribute reader and the project named trace-reader to keep the diff from getting too crazy. I'll rename those packages/classes and the project itself later (probably when we start supporting write for attributes, unless there's a desire to do it sooner).


import io.grpc.Channel;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
Expand Down

This file was deleted.

Loading