Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@

import static io.reactivex.rxjava3.core.Single.zip;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import lombok.experimental.Accessors;
import org.hypertrace.core.graphql.attributes.AttributeStore;
import org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString;
import org.hypertrace.core.graphql.common.request.AttributeAssociation;
import org.hypertrace.core.graphql.common.request.AttributeRequest;
import org.hypertrace.core.graphql.common.request.AttributeRequestBuilder;
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder;
import org.hypertrace.core.graphql.common.schema.attributes.AttributeScope;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterOperatorType;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterType;
import org.hypertrace.core.graphql.common.utils.Converter;
import org.hypertrace.core.graphql.context.GraphQlRequestContext;
import org.hypertrace.core.graphql.span.request.SpanRequest;
import org.hypertrace.gateway.service.v1.common.Expression;
import org.hypertrace.gateway.service.v1.common.Filter;
Expand All @@ -29,21 +33,28 @@ class SpanLogEventRequestBuilder {
private final Converter<Collection<AttributeRequest>, Set<Expression>> attributeConverter;
private final Converter<Collection<AttributeAssociation<FilterArgument>>, Filter> filterConverter;
private final FilterRequestBuilder filterRequestBuilder;
private final AttributeStore attributeStore;
private final AttributeRequestBuilder attributeRequestBuilder;

@Inject
SpanLogEventRequestBuilder(
Converter<Collection<AttributeRequest>, Set<Expression>> attributeConverter,
Converter<Collection<AttributeAssociation<FilterArgument>>, Filter> filterConverter,
FilterRequestBuilder filterRequestBuilder) {
FilterRequestBuilder filterRequestBuilder,
AttributeStore attributeStore,
AttributeRequestBuilder attributeRequestBuilder) {
this.attributeConverter = attributeConverter;
this.filterConverter = filterConverter;
this.filterRequestBuilder = filterRequestBuilder;
this.attributeStore = attributeStore;
this.attributeRequestBuilder = attributeRequestBuilder;
}

Single<LogEventsRequest> buildLogEventsRequest(
SpanRequest gqlRequest, SpansResponse spansResponse) {
return zip(
this.attributeConverter.convert(gqlRequest.logEventAttributes()),
getRequestAttributes(
gqlRequest.spanEventsRequest().context(), gqlRequest.logEventAttributes()),
buildLogEventsQueryFilter(gqlRequest, spansResponse).flatMap(filterConverter::convert),
(selections, filter) ->
LogEventsRequest.newBuilder()
Expand All @@ -56,6 +67,20 @@ Single<LogEventsRequest> buildLogEventsRequest(
.build());
}

private Single<Set<Expression>> getRequestAttributes(
GraphQlRequestContext requestContext, Collection<AttributeRequest> logEventAttributes) {
return this.attributeStore
.getForeignIdAttribute(
requestContext,
HypertraceCoreAttributeScopeString.LOG_EVENT,
HypertraceCoreAttributeScopeString.SPAN)
.map(attributeRequestBuilder::buildForAttribute)
.toObservable()
.mergeWith(Observable.fromIterable(logEventAttributes))
.collect(Collectors.toSet())
.flatMap(attributeConverter::convert);
}

private Single<List<AttributeAssociation<FilterArgument>>> buildLogEventsQueryFilter(
SpanRequest gqlRequest, SpansResponse spansResponse) {
List<String> spanIds =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.stream.Collectors;
import javax.inject.Inject;
import lombok.experimental.Accessors;
import org.hypertrace.core.graphql.attributes.AttributeModel;
import org.hypertrace.core.graphql.attributes.AttributeStore;
import org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString;
import org.hypertrace.core.graphql.common.request.AttributeRequest;
Expand Down Expand Up @@ -43,29 +44,44 @@ Single<SpanLogEventsResponse> buildResponse(
HypertraceCoreAttributeScopeString.LOG_EVENT,
HypertraceCoreAttributeScopeString.SPAN)
.flatMap(
spanId ->
buildResponse(spanId.key(), attributeRequests, spansResponse, logEventsResponse));
spanId -> buildResponse(spanId, attributeRequests, spansResponse, logEventsResponse));
}

private Single<SpanLogEventsResponse> buildResponse(
String foreignIdAttribute,
AttributeModel foreignIdAttribute,
Collection<AttributeRequest> attributeRequests,
SpansResponse spansResponse,
LogEventsResponse logEventsResponse) {
return Observable.fromIterable(logEventsResponse.getLogEventsList())
.concatMapSingle(
logEventsResponseVar -> this.convert(attributeRequests, logEventsResponseVar))
.collect(Collectors.groupingBy(logEvent -> (String) logEvent.attribute(foreignIdAttribute)))
logEventsResponseVar ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for the change here? Since we were assuming the attribute existed in the response map and now we're guaranteeing it, the old logic would have also worked, right?

Copy link
Author

@findingrish findingrish Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the original request for logs didn't have the spanId attribute we shouldn't return it back either, I guess. So, this part needed changes to filter out the spanId returned if it wasn't requested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on the way the response object is designed, it's safe to return extra data (because it's not a programmatic access but instead via the schema, we already know at request time what values of the map will be read by the serializer - that's how we generated the requested attributes)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah right spanId filtering would be automatically done, the problem was building the mapping of spanId -> List<LogEvent>. Currently the code was first converting LogEvent (gateway) -> LogEvent (gql) and then mapping spanId to logEvents, but now it is possible that spanId attribute would be lost after conversion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it - because the attribute map conversion is based on the requested rather than returned data, and we don't save the extra attribute into the request. makes sense.

this.convert(foreignIdAttribute, attributeRequests, logEventsResponseVar))
.collect(
Collectors.groupingBy(
SpanLogEventPair::spanId,
Collectors.mapping(SpanLogEventPair::logEvent, Collectors.toList())))
.map(
spanIdVsLogEventsMap -> new SpanLogEventsResponse(spansResponse, spanIdVsLogEventsMap));
}

private Single<LogEvent> convert(
private Single<SpanLogEventPair> convert(
AttributeModel foreignIdAttribute,
Collection<AttributeRequest> request,
org.hypertrace.gateway.service.v1.log.events.LogEvent logEvent) {
return this.attributeMapConverter
.convert(request, logEvent.getAttributesMap())
.map(ConvertedLogEvent::new);
.map(
attributeMap ->
new SpanLogEventPair(
logEvent.getAttributesMap().get(foreignIdAttribute.id()).getString(),
new ConvertedLogEvent(attributeMap)));
}

@lombok.Value
@Accessors(fluent = true)
private static class SpanLogEventPair {
String spanId;
LogEvent logEvent;
}

@lombok.Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.inject.AbstractModule;
import com.google.inject.Guice;
Expand All @@ -25,12 +27,16 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.hypertrace.core.graphql.attributes.AttributeModel;
import org.hypertrace.core.graphql.attributes.AttributeStore;
import org.hypertrace.core.graphql.common.request.AttributeAssociation;
import org.hypertrace.core.graphql.common.request.AttributeRequest;
import org.hypertrace.core.graphql.common.request.AttributeRequestBuilder;
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder;
import org.hypertrace.core.graphql.common.request.ResultSetRequest;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
import org.hypertrace.core.graphql.common.utils.Converter;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultAttributeRequest;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultResultSetRequest;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultSpanRequest;
import org.hypertrace.core.graphql.span.dao.DaoTestUtil.DefaultTimeRange;
Expand All @@ -54,6 +60,10 @@ class SpanLogEventRequestBuilderTest {

@Mock private FilterRequestBuilder filterRequestBuilder;

@Mock private AttributeStore attributeStore;

@Mock private AttributeRequestBuilder attributeRequestBuilder;

private SpanLogEventRequestBuilder spanLogEventRequestBuilder;

@BeforeEach
Expand Down Expand Up @@ -82,11 +92,13 @@ protected void configure() {
new TypeLiteral<Converter<Collection<AttributeRequest>, Set<Expression>>>() {}));

spanLogEventRequestBuilder =
new SpanLogEventRequestBuilder(attributeConverter, filterConverter, filterRequestBuilder);
}
new SpanLogEventRequestBuilder(
attributeConverter,
filterConverter,
filterRequestBuilder,
attributeStore,
attributeRequestBuilder);

@Test
void testBuildRequest() {
doAnswer(
invocation -> {
Set<FilterArgument> filterArguments = invocation.getArgument(2, Set.class);
Expand All @@ -103,6 +115,21 @@ void testBuildRequest() {
.when(filterRequestBuilder)
.build(any(), any(), anyCollection());

when(attributeStore.getForeignIdAttribute(any(), anyString(), anyString()))
.thenReturn(Single.just(spanIdAttribute.attribute()));

doAnswer(
invocation -> {
AttributeModel attributeModel = invocation.getArgument(0, AttributeModel.class);
return new DefaultAttributeRequest(attributeModel);
})
.when(attributeRequestBuilder)
.buildForAttribute(any());
}

@Test
void testBuildRequest() {

long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + Duration.ofHours(1).toMillis();

Expand Down Expand Up @@ -149,7 +176,58 @@ void testBuildRequest() {
assertEquals(
Set.of("attributes", "traceId", "spanId"),
logEventsRequest.getSelectionList().stream()
.map(v -> v.getColumnIdentifier().getColumnName())
.map(expression -> expression.getColumnIdentifier().getColumnName())
.collect(Collectors.toSet()));
}

@Test
void testBuildRequest_addSpanId() {
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis() + Duration.ofHours(1).toMillis();

Collection<AttributeRequest> logAttributeRequests = List.of(traceIdAttribute);
ResultSetRequest resultSetRequest =
new DefaultResultSetRequest(
null,
List.of(DaoTestUtil.eventIdAttribute),
new DefaultTimeRange(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)),
DaoTestUtil.eventIdAttribute,
0,
0,
List.of(),
Collections.emptyList(),
Optional.empty());
SpanRequest spanRequest = new DefaultSpanRequest(resultSetRequest, logAttributeRequests);

LogEventsRequest logEventsRequest =
spanLogEventRequestBuilder.buildLogEventsRequest(spanRequest, spansResponse).blockingGet();

assertEquals(Operator.IN, logEventsRequest.getFilter().getChildFilter(0).getOperator());
assertEquals(
spanIdAttribute.attribute().id(),
logEventsRequest
.getFilter()
.getChildFilter(0)
.getLhs()
.getColumnIdentifier()
.getColumnName());
assertEquals(
List.of("span1", "span2", "span3"),
logEventsRequest
.getFilter()
.getChildFilter(0)
.getRhs()
.getLiteral()
.getValue()
.getStringArrayList()
.stream()
.collect(Collectors.toList()));
assertEquals(startTime, logEventsRequest.getStartTimeMillis());
assertEquals(endTime, logEventsRequest.getEndTimeMillis());
assertEquals(
Set.of("traceId", "spanId"),
logEventsRequest.getSelectionList().stream()
.map(expression -> expression.getColumnIdentifier().getColumnName())
.collect(Collectors.toSet()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,37 @@ void testBuildResponse() {
Map<String, Value> map = invocation.getArgument(1, Map.class);
return Single.just(
map.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, v -> v.getValue().getString())));
.collect(
Collectors.toMap(
Entry::getKey, valueEntry -> valueEntry.getValue().getString())));
})
.when(attributeMapConverter)
.convert(anyCollection(), anyMap());

SpanLogEventsResponse response =
spanLogEventResponseConverter
.buildResponse(requestContext, attributeRequests, spansResponse, logEventsResponse)
.blockingGet();

assertEquals(spansResponse, response.spansResponse());
assertEquals(Set.of("span1", "span2"), response.spanIdToLogEvents().keySet());
}

@Test
void testBuildResponse_spanIdNotRequested() {
Collection<AttributeRequest> attributeRequests = List.of(traceIdAttribute, attributesAttribute);

when(attributeStore.getForeignIdAttribute(any(), anyString(), anyString()))
.thenReturn(Single.just(spanIdAttribute.attribute()));

doAnswer(
invocation -> {
Map<String, Value> map = invocation.getArgument(1, Map.class);
return Single.just(
map.entrySet().stream()
.collect(
Collectors.toMap(
Entry::getKey, valueEntry -> valueEntry.getValue().getString())));
})
.when(attributeMapConverter)
.convert(anyCollection(), anyMap());
Expand Down