Skip to content

Commit

Permalink
feat: RunAggregationQuery instrumentation (#1447)
Browse files Browse the repository at this point in the history
* feat: RunQuery trace instrumentation

* Formatting

* Formatting

* Refactor: s/RUNQUERY/RUN_QUERY

* feat: RunAggregationQuery Trace Instrumentation

* Build: retiring test assertions for OpenCensus spans - will be replacing this in hermetic integration tests for OpenTelemetry using in-memory span exports (in addition to ITE2ETraceTest.java).

* Formatting

* Fixing @test annotation missed after merge

* Formatting
  • Loading branch information
jimit-j-shah committed Jun 18, 2024
1 parent fcce234 commit 4fd4c16
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.datastore;

import static com.google.cloud.BaseService.EXCEPTION_HANDLER;
import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetrySettings;
Expand All @@ -39,9 +38,6 @@
import com.google.datastore.v1.RunAggregationQueryResponse;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.concurrent.Callable;

/**
Expand All @@ -52,7 +48,7 @@
public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {

private final DatastoreRpc datastoreRpc;
private final TraceUtil traceUtil;
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
private final RetrySettings retrySettings;
private final DatastoreOptions datastoreOptions;

Expand All @@ -62,9 +58,9 @@ public RetryAndTraceDatastoreRpcDecorator(
RetrySettings retrySettings,
DatastoreOptions datastoreOptions) {
this.datastoreRpc = datastoreRpc;
this.traceUtil = traceUtil;
this.retrySettings = retrySettings;
this.datastoreOptions = datastoreOptions;
this.otelTraceUtil = datastoreOptions.getTraceUtil();
}

@Override
Expand Down Expand Up @@ -106,19 +102,20 @@ public RunQueryResponse runQuery(RunQueryRequest request) {
@Override
public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) {
return invokeRpc(
() -> datastoreRpc.runAggregationQuery(request), SPAN_NAME_RUN_AGGREGATION_QUERY);
() -> datastoreRpc.runAggregationQuery(request),
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY);
}

public <O> O invokeRpc(Callable<O> block, String startSpan) {
Span span = traceUtil.startSpan(startSpan);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface TraceUtil {
static final String SPAN_NAME_LOOKUP = "Lookup";
static final String SPAN_NAME_COMMIT = "Commit";
static final String SPAN_NAME_RUN_QUERY = "RunQuery";
static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery";

/**
* Creates and returns an instance of the TraceUtil class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package com.google.cloud.datastore;

import static com.google.cloud.datastore.TraceUtil.END_SPAN_OPTIONS;
import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.common.truth.Truth.assertThat;
import static com.google.rpc.Code.UNAVAILABLE;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
Expand All @@ -29,8 +26,6 @@
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.datastore.v1.RunAggregationQueryRequest;
import com.google.datastore.v1.RunAggregationQueryResponse;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -49,15 +44,13 @@ public class RetryAndTraceDatastoreRpcDecoratorTest {
@Before
public void setUp() throws Exception {
mockDatastoreRpc = createStrictMock(DatastoreRpc.class);
mockTraceUtil = createStrictMock(TraceUtil.class);
datastoreRpcDecorator =
new RetryAndTraceDatastoreRpcDecorator(
mockDatastoreRpc, mockTraceUtil, retrySettings, datastoreOptions);
}

@Test
public void testRunAggregationQuery() {
Span mockSpan = createStrictMock(Span.class);
RunAggregationQueryRequest aggregationQueryRequest =
RunAggregationQueryRequest.getDefaultInstance();
RunAggregationQueryResponse aggregationQueryResponse =
Expand All @@ -69,16 +62,13 @@ public void testRunAggregationQuery() {
UNAVAILABLE.getNumber(), "API not accessible currently", UNAVAILABLE.name()))
.times(2)
.andReturn(aggregationQueryResponse);
expect(mockTraceUtil.startSpan(SPAN_NAME_RUN_AGGREGATION_QUERY)).andReturn(mockSpan);
expect(mockTraceUtil.getTracer()).andReturn(createNiceMock(Tracer.class));
mockSpan.end(END_SPAN_OPTIONS);

replay(mockDatastoreRpc, mockTraceUtil, mockSpan);
replay(mockDatastoreRpc);

RunAggregationQueryResponse actualAggregationQueryResponse =
datastoreRpcDecorator.runAggregationQuery(aggregationQueryRequest);

assertThat(actualAggregationQueryResponse).isSameInstanceAs(aggregationQueryResponse);
verify(mockDatastoreRpc, mockTraceUtil, mockSpan);
verify(mockDatastoreRpc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.google.cloud.datastore.it;

import static com.google.cloud.datastore.aggregation.Aggregation.count;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY;
import static com.google.common.truth.Truth.assertThat;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -27,12 +30,16 @@
import static org.junit.Assert.assertTrue;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.datastore.AggregationQuery;
import com.google.cloud.datastore.AggregationResult;
import com.google.cloud.datastore.AggregationResults;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.QueryResults;
import com.google.cloud.datastore.StructuredQuery;
import com.google.cloud.datastore.StructuredQuery.PropertyFilter;
import com.google.cloud.datastore.testing.RemoteDatastoreHelper;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
Expand Down Expand Up @@ -95,6 +102,7 @@
// 5. Traces are read-back using TraceServiceClient and verified against expected Call Stacks.
@RunWith(TestParameterInjector.class)
public class ITE2ETracingTest {

protected boolean isUsingGlobalOpenTelemetrySDK() {
return useGlobalOpenTelemetrySDK;
}
Expand Down Expand Up @@ -214,6 +222,10 @@ private boolean dfsContainsCallStack(long spanId, List<String> expectedCallStack

private static Key KEY2;

private static Key KEY3;

private static Key KEY4;

// Random int generator for trace ID and span ID
private static Random random;

Expand Down Expand Up @@ -309,10 +321,17 @@ public void before() throws Exception {
.setNamespace(options.getNamespace())
.build();
KEY2 =
Key.newBuilder(projectId, kind1, "key3", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();
KEY3 =
Key.newBuilder(projectId, kind1, "key4", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();
KEY4 =
Key.newBuilder(projectId, kind1, "key2", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();

// Set up the tracer for custom TraceID injection
rootSpanName =
String.format("%s%d", this.getClass().getSimpleName(), System.currentTimeMillis());
Expand Down Expand Up @@ -658,4 +677,62 @@ public void runQueryTraceTest() throws Exception {

fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_QUERY);
}

@Test
public void runAggregationQueryTraceTest() throws Exception {
Entity entity1 =
Entity.newBuilder(KEY1)
.set("pepper_name", "jalapeno")
.set("max_scoville_level", 10000)
.build();
Entity entity2 =
Entity.newBuilder(KEY2)
.set("pepper_name", "serrano")
.set("max_scoville_level", 25000)
.build();
Entity entity3 =
Entity.newBuilder(KEY3)
.set("pepper_name", "habanero")
.set("max_scoville_level", 350000)
.build();
Entity entity4 =
Entity.newBuilder(KEY4)
.set("pepper_name", "ghost")
.set("max_scoville_level", 1500000)
.build();

List<Entity> entityList = new ArrayList<>();
entityList.add(entity1);
entityList.add(entity2);
entityList.add(entity3);
entityList.add(entity4);

List<Entity> response = datastore.add(entity1, entity2, entity3, entity4);
assertEquals(entityList, response);

Span rootSpan = getNewRootSpanWithContext();
try (Scope ignored = rootSpan.makeCurrent()) {
PropertyFilter mediumSpicyFilters = PropertyFilter.lt("max_scoville_level", 100000);
StructuredQuery<Entity> mediumSpicyQuery =
Query.newEntityQueryBuilder()
.setKind(KEY1.getKind())
.setFilter(mediumSpicyFilters)
.build();
AggregationQuery countSpicyPeppers =
Query.newAggregationQueryBuilder()
.addAggregation(count().as("count"))
.over(mediumSpicyQuery)
.build();
AggregationResults results = datastore.runAggregation(countSpicyPeppers);
assertThat(results.size()).isEqualTo(1);
AggregationResult result = results.get(0);
assertThat(result.getLong("count")).isEqualTo(2L);
} finally {
rootSpan.end();
}

waitForTracesToComplete();

fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_AGGREGATION_QUERY);
}
}

0 comments on commit 4fd4c16

Please sign in to comment.