diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilder.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilder.java index 46d1d4f1..1e021e53 100644 --- a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilder.java +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilder.java @@ -2,6 +2,7 @@ import static io.reactivex.rxjava3.core.Single.zip; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; import java.util.Collection; import java.util.List; @@ -9,15 +10,18 @@ import java.util.stream.Collectors; import javax.inject.Inject; import lombok.experimental.Accessors; +import org.hypertrace.core.graphql.attributes.AttributeStore; import org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString; import org.hypertrace.core.graphql.common.request.AttributeAssociation; import org.hypertrace.core.graphql.common.request.AttributeRequest; +import org.hypertrace.core.graphql.common.request.AttributeRequestBuilder; import org.hypertrace.core.graphql.common.request.FilterRequestBuilder; import org.hypertrace.core.graphql.common.schema.attributes.AttributeScope; import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument; import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterOperatorType; import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterType; import org.hypertrace.core.graphql.common.utils.Converter; +import org.hypertrace.core.graphql.context.GraphQlRequestContext; import org.hypertrace.core.graphql.span.request.SpanRequest; import org.hypertrace.gateway.service.v1.common.Expression; import org.hypertrace.gateway.service.v1.common.Filter; @@ -29,21 +33,28 @@ class SpanLogEventRequestBuilder { private final Converter, Set> attributeConverter; private final Converter>, Filter> filterConverter; private final FilterRequestBuilder filterRequestBuilder; + private final AttributeStore attributeStore; + private final AttributeRequestBuilder attributeRequestBuilder; @Inject SpanLogEventRequestBuilder( Converter, Set> attributeConverter, Converter>, Filter> filterConverter, - FilterRequestBuilder filterRequestBuilder) { + FilterRequestBuilder filterRequestBuilder, + AttributeStore attributeStore, + AttributeRequestBuilder attributeRequestBuilder) { this.attributeConverter = attributeConverter; this.filterConverter = filterConverter; this.filterRequestBuilder = filterRequestBuilder; + this.attributeStore = attributeStore; + this.attributeRequestBuilder = attributeRequestBuilder; } Single buildLogEventsRequest( SpanRequest gqlRequest, SpansResponse spansResponse) { return zip( - this.attributeConverter.convert(gqlRequest.logEventAttributes()), + getRequestAttributes( + gqlRequest.spanEventsRequest().context(), gqlRequest.logEventAttributes()), buildLogEventsQueryFilter(gqlRequest, spansResponse).flatMap(filterConverter::convert), (selections, filter) -> LogEventsRequest.newBuilder() @@ -56,6 +67,20 @@ Single buildLogEventsRequest( .build()); } + private Single> getRequestAttributes( + GraphQlRequestContext requestContext, Collection logEventAttributes) { + return this.attributeStore + .getForeignIdAttribute( + requestContext, + HypertraceCoreAttributeScopeString.LOG_EVENT, + HypertraceCoreAttributeScopeString.SPAN) + .map(attributeRequestBuilder::buildForAttribute) + .toObservable() + .mergeWith(Observable.fromIterable(logEventAttributes)) + .collect(Collectors.toSet()) + .flatMap(attributeConverter::convert); + } + private Single>> buildLogEventsQueryFilter( SpanRequest gqlRequest, SpansResponse spansResponse) { List spanIds = diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverter.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverter.java index 5b9a59ce..938c84df 100644 --- a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverter.java +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverter.java @@ -7,6 +7,7 @@ import java.util.stream.Collectors; import javax.inject.Inject; import lombok.experimental.Accessors; +import org.hypertrace.core.graphql.attributes.AttributeModel; import org.hypertrace.core.graphql.attributes.AttributeStore; import org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString; import org.hypertrace.core.graphql.common.request.AttributeRequest; @@ -43,29 +44,44 @@ Single buildResponse( HypertraceCoreAttributeScopeString.LOG_EVENT, HypertraceCoreAttributeScopeString.SPAN) .flatMap( - spanId -> - buildResponse(spanId.key(), attributeRequests, spansResponse, logEventsResponse)); + spanId -> buildResponse(spanId, attributeRequests, spansResponse, logEventsResponse)); } private Single buildResponse( - String foreignIdAttribute, + AttributeModel foreignIdAttribute, Collection attributeRequests, SpansResponse spansResponse, LogEventsResponse logEventsResponse) { return Observable.fromIterable(logEventsResponse.getLogEventsList()) .concatMapSingle( - logEventsResponseVar -> this.convert(attributeRequests, logEventsResponseVar)) - .collect(Collectors.groupingBy(logEvent -> (String) logEvent.attribute(foreignIdAttribute))) + logEventsResponseVar -> + this.convert(foreignIdAttribute, attributeRequests, logEventsResponseVar)) + .collect( + Collectors.groupingBy( + SpanLogEventPair::spanId, + Collectors.mapping(SpanLogEventPair::logEvent, Collectors.toList()))) .map( spanIdVsLogEventsMap -> new SpanLogEventsResponse(spansResponse, spanIdVsLogEventsMap)); } - private Single convert( + private Single convert( + AttributeModel foreignIdAttribute, Collection request, org.hypertrace.gateway.service.v1.log.events.LogEvent logEvent) { return this.attributeMapConverter .convert(request, logEvent.getAttributesMap()) - .map(ConvertedLogEvent::new); + .map( + attributeMap -> + new SpanLogEventPair( + logEvent.getAttributesMap().get(foreignIdAttribute.id()).getString(), + new ConvertedLogEvent(attributeMap))); + } + + @lombok.Value + @Accessors(fluent = true) + private static class SpanLogEventPair { + String spanId; + LogEvent logEvent; } @lombok.Value diff --git a/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilderTest.java b/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilderTest.java index 3e1db88a..105b7298 100644 --- a/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilderTest.java +++ b/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventRequestBuilderTest.java @@ -7,8 +7,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.inject.AbstractModule; import com.google.inject.Guice; @@ -25,12 +27,16 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.hypertrace.core.graphql.attributes.AttributeModel; +import org.hypertrace.core.graphql.attributes.AttributeStore; import org.hypertrace.core.graphql.common.request.AttributeAssociation; import org.hypertrace.core.graphql.common.request.AttributeRequest; +import org.hypertrace.core.graphql.common.request.AttributeRequestBuilder; import org.hypertrace.core.graphql.common.request.FilterRequestBuilder; import org.hypertrace.core.graphql.common.request.ResultSetRequest; import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument; import org.hypertrace.core.graphql.common.utils.Converter; +import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultAttributeRequest; import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultResultSetRequest; import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultSpanRequest; import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultTimeRange; @@ -54,6 +60,10 @@ class SpanLogEventRequestBuilderTest { @Mock private FilterRequestBuilder filterRequestBuilder; + @Mock private AttributeStore attributeStore; + + @Mock private AttributeRequestBuilder attributeRequestBuilder; + private SpanLogEventRequestBuilder spanLogEventRequestBuilder; @BeforeEach @@ -82,11 +92,13 @@ protected void configure() { new TypeLiteral, Set>>() {})); spanLogEventRequestBuilder = - new SpanLogEventRequestBuilder(attributeConverter, filterConverter, filterRequestBuilder); - } + new SpanLogEventRequestBuilder( + attributeConverter, + filterConverter, + filterRequestBuilder, + attributeStore, + attributeRequestBuilder); - @Test - void testBuildRequest() { doAnswer( invocation -> { Set filterArguments = invocation.getArgument(2, Set.class); @@ -103,6 +115,21 @@ void testBuildRequest() { .when(filterRequestBuilder) .build(any(), any(), anyCollection()); + when(attributeStore.getForeignIdAttribute(any(), anyString(), anyString())) + .thenReturn(Single.just(spanIdAttribute.attribute())); + + doAnswer( + invocation -> { + AttributeModel attributeModel = invocation.getArgument(0, AttributeModel.class); + return new DefaultAttributeRequest(attributeModel); + }) + .when(attributeRequestBuilder) + .buildForAttribute(any()); + } + + @Test + void testBuildRequest() { + long startTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis() + Duration.ofHours(1).toMillis(); @@ -149,7 +176,58 @@ void testBuildRequest() { assertEquals( Set.of("attributes", "traceId", "spanId"), logEventsRequest.getSelectionList().stream() - .map(v -> v.getColumnIdentifier().getColumnName()) + .map(expression -> expression.getColumnIdentifier().getColumnName()) + .collect(Collectors.toSet())); + } + + @Test + void testBuildRequest_addSpanId() { + long startTime = System.currentTimeMillis(); + long endTime = System.currentTimeMillis() + Duration.ofHours(1).toMillis(); + + Collection logAttributeRequests = List.of(traceIdAttribute); + ResultSetRequest resultSetRequest = + new DefaultResultSetRequest( + null, + List.of(DaoTestUtil.eventIdAttribute), + new DefaultTimeRange(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)), + DaoTestUtil.eventIdAttribute, + 0, + 0, + List.of(), + Collections.emptyList(), + Optional.empty()); + SpanRequest spanRequest = new DefaultSpanRequest(resultSetRequest, logAttributeRequests); + + LogEventsRequest logEventsRequest = + spanLogEventRequestBuilder.buildLogEventsRequest(spanRequest, spansResponse).blockingGet(); + + assertEquals(Operator.IN, logEventsRequest.getFilter().getChildFilter(0).getOperator()); + assertEquals( + spanIdAttribute.attribute().id(), + logEventsRequest + .getFilter() + .getChildFilter(0) + .getLhs() + .getColumnIdentifier() + .getColumnName()); + assertEquals( + List.of("span1", "span2", "span3"), + logEventsRequest + .getFilter() + .getChildFilter(0) + .getRhs() + .getLiteral() + .getValue() + .getStringArrayList() + .stream() + .collect(Collectors.toList())); + assertEquals(startTime, logEventsRequest.getStartTimeMillis()); + assertEquals(endTime, logEventsRequest.getEndTimeMillis()); + assertEquals( + Set.of("traceId", "spanId"), + logEventsRequest.getSelectionList().stream() + .map(expression -> expression.getColumnIdentifier().getColumnName()) .collect(Collectors.toSet())); } } diff --git a/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverterTest.java b/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverterTest.java index c980e441..26de96d1 100644 --- a/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverterTest.java +++ b/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/dao/SpanLogEventResponseConverterTest.java @@ -62,7 +62,37 @@ void testBuildResponse() { Map map = invocation.getArgument(1, Map.class); return Single.just( map.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, v -> v.getValue().getString()))); + .collect( + Collectors.toMap( + Entry::getKey, valueEntry -> valueEntry.getValue().getString()))); + }) + .when(attributeMapConverter) + .convert(anyCollection(), anyMap()); + + SpanLogEventsResponse response = + spanLogEventResponseConverter + .buildResponse(requestContext, attributeRequests, spansResponse, logEventsResponse) + .blockingGet(); + + assertEquals(spansResponse, response.spansResponse()); + assertEquals(Set.of("span1", "span2"), response.spanIdToLogEvents().keySet()); + } + + @Test + void testBuildResponse_spanIdNotRequested() { + Collection attributeRequests = List.of(traceIdAttribute, attributesAttribute); + + when(attributeStore.getForeignIdAttribute(any(), anyString(), anyString())) + .thenReturn(Single.just(spanIdAttribute.attribute())); + + doAnswer( + invocation -> { + Map map = invocation.getArgument(1, Map.class); + return Single.just( + map.entrySet().stream() + .collect( + Collectors.toMap( + Entry::getKey, valueEntry -> valueEntry.getValue().getString()))); }) .when(attributeMapConverter) .convert(anyCollection(), anyMap());