Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RunAggregationQuery instrumentation #1447

Merged
merged 12 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.datastore;

import static com.google.cloud.BaseService.EXCEPTION_HANDLER;
import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetrySettings;
Expand All @@ -39,9 +38,6 @@
import com.google.datastore.v1.RunAggregationQueryResponse;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.concurrent.Callable;

/**
Expand All @@ -52,7 +48,7 @@
public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {

private final DatastoreRpc datastoreRpc;
private final TraceUtil traceUtil;
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
private final RetrySettings retrySettings;
private final DatastoreOptions datastoreOptions;

Expand All @@ -62,9 +58,9 @@ public RetryAndTraceDatastoreRpcDecorator(
RetrySettings retrySettings,
DatastoreOptions datastoreOptions) {
this.datastoreRpc = datastoreRpc;
this.traceUtil = traceUtil;
this.retrySettings = retrySettings;
this.datastoreOptions = datastoreOptions;
this.otelTraceUtil = datastoreOptions.getTraceUtil();
}

@Override
Expand Down Expand Up @@ -106,19 +102,20 @@ public RunQueryResponse runQuery(RunQueryRequest request) {
@Override
public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) {
return invokeRpc(
() -> datastoreRpc.runAggregationQuery(request), SPAN_NAME_RUN_AGGREGATION_QUERY);
() -> datastoreRpc.runAggregationQuery(request),
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY);
}

public <O> O invokeRpc(Callable<O> block, String startSpan) {
Span span = traceUtil.startSpan(startSpan);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface TraceUtil {
static final String SPAN_NAME_LOOKUP = "Lookup";
static final String SPAN_NAME_COMMIT = "Commit";
static final String SPAN_NAME_RUN_QUERY = "RunQuery";
static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery";

/**
* Creates and returns an instance of the TraceUtil class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package com.google.cloud.datastore;

import static com.google.cloud.datastore.TraceUtil.END_SPAN_OPTIONS;
import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.common.truth.Truth.assertThat;
import static com.google.rpc.Code.UNAVAILABLE;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
Expand All @@ -29,8 +26,6 @@
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.datastore.v1.RunAggregationQueryRequest;
import com.google.datastore.v1.RunAggregationQueryResponse;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -49,15 +44,13 @@ public class RetryAndTraceDatastoreRpcDecoratorTest {
@Before
public void setUp() throws Exception {
mockDatastoreRpc = createStrictMock(DatastoreRpc.class);
mockTraceUtil = createStrictMock(TraceUtil.class);
datastoreRpcDecorator =
new RetryAndTraceDatastoreRpcDecorator(
mockDatastoreRpc, mockTraceUtil, retrySettings, datastoreOptions);
}

@Test
public void testRunAggregationQuery() {
Span mockSpan = createStrictMock(Span.class);
RunAggregationQueryRequest aggregationQueryRequest =
RunAggregationQueryRequest.getDefaultInstance();
RunAggregationQueryResponse aggregationQueryResponse =
Expand All @@ -69,16 +62,13 @@ public void testRunAggregationQuery() {
UNAVAILABLE.getNumber(), "API not accessible currently", UNAVAILABLE.name()))
.times(2)
.andReturn(aggregationQueryResponse);
expect(mockTraceUtil.startSpan(SPAN_NAME_RUN_AGGREGATION_QUERY)).andReturn(mockSpan);
expect(mockTraceUtil.getTracer()).andReturn(createNiceMock(Tracer.class));
mockSpan.end(END_SPAN_OPTIONS);

replay(mockDatastoreRpc, mockTraceUtil, mockSpan);
replay(mockDatastoreRpc);

RunAggregationQueryResponse actualAggregationQueryResponse =
datastoreRpcDecorator.runAggregationQuery(aggregationQueryRequest);

assertThat(actualAggregationQueryResponse).isSameInstanceAs(aggregationQueryResponse);
verify(mockDatastoreRpc, mockTraceUtil, mockSpan);
verify(mockDatastoreRpc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.google.cloud.datastore.it;

import static com.google.cloud.datastore.aggregation.Aggregation.count;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY;
import static com.google.common.truth.Truth.assertThat;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -27,12 +30,16 @@
import static org.junit.Assert.assertTrue;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.datastore.AggregationQuery;
import com.google.cloud.datastore.AggregationResult;
import com.google.cloud.datastore.AggregationResults;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.QueryResults;
import com.google.cloud.datastore.StructuredQuery;
import com.google.cloud.datastore.StructuredQuery.PropertyFilter;
import com.google.cloud.datastore.testing.RemoteDatastoreHelper;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
Expand Down Expand Up @@ -95,6 +102,7 @@
// 5. Traces are read-back using TraceServiceClient and verified against expected Call Stacks.
@RunWith(TestParameterInjector.class)
public class ITE2ETracingTest {

protected boolean isUsingGlobalOpenTelemetrySDK() {
return useGlobalOpenTelemetrySDK;
}
Expand Down Expand Up @@ -214,6 +222,10 @@ private boolean dfsContainsCallStack(long spanId, List<String> expectedCallStack

private static Key KEY2;

private static Key KEY3;

private static Key KEY4;

// Random int generator for trace ID and span ID
private static Random random;

Expand Down Expand Up @@ -309,10 +321,17 @@ public void before() throws Exception {
.setNamespace(options.getNamespace())
.build();
KEY2 =
Key.newBuilder(projectId, kind1, "key3", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();
KEY3 =
Key.newBuilder(projectId, kind1, "key4", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();
KEY4 =
Key.newBuilder(projectId, kind1, "key2", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();

// Set up the tracer for custom TraceID injection
rootSpanName =
String.format("%s%d", this.getClass().getSimpleName(), System.currentTimeMillis());
Expand Down Expand Up @@ -658,4 +677,62 @@ public void runQueryTraceTest() throws Exception {

fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_QUERY);
}

@Test
public void runAggregationQueryTraceTest() throws Exception {
Entity entity1 =
Entity.newBuilder(KEY1)
.set("pepper_name", "jalapeno")
.set("max_scoville_level", 10000)
.build();
Entity entity2 =
Entity.newBuilder(KEY2)
.set("pepper_name", "serrano")
.set("max_scoville_level", 25000)
.build();
Entity entity3 =
Entity.newBuilder(KEY3)
.set("pepper_name", "habanero")
.set("max_scoville_level", 350000)
.build();
Entity entity4 =
Entity.newBuilder(KEY4)
.set("pepper_name", "ghost")
.set("max_scoville_level", 1500000)
.build();

List<Entity> entityList = new ArrayList<>();
entityList.add(entity1);
entityList.add(entity2);
entityList.add(entity3);
entityList.add(entity4);

List<Entity> response = datastore.add(entity1, entity2, entity3, entity4);
assertEquals(entityList, response);

Span rootSpan = getNewRootSpanWithContext();
try (Scope ignored = rootSpan.makeCurrent()) {
PropertyFilter mediumSpicyFilters = PropertyFilter.lt("max_scoville_level", 100000);
StructuredQuery<Entity> mediumSpicyQuery =
Query.newEntityQueryBuilder()
.setKind(KEY1.getKind())
.setFilter(mediumSpicyFilters)
.build();
AggregationQuery countSpicyPeppers =
Query.newAggregationQueryBuilder()
.addAggregation(count().as("count"))
.over(mediumSpicyQuery)
.build();
AggregationResults results = datastore.runAggregation(countSpicyPeppers);
assertThat(results.size()).isEqualTo(1);
AggregationResult result = results.get(0);
assertThat(result.getLong("count")).isEqualTo(2L);
} finally {
rootSpan.end();
}

waitForTracesToComplete();

fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_AGGREGATION_QUERY);
}
}
Loading