Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RunAggregationQuery instrumentation #1447

Merged
merged 12 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.datastore.v1.ExplainOptions;
import com.google.datastore.v1.ReadOptions;
import com.google.datastore.v1.ReserveIdsRequest;
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.TransactionOptions;
import com.google.protobuf.ByteString;
import io.opencensus.common.Scope;
Expand Down Expand Up @@ -240,20 +241,34 @@ public AggregationResults runAggregation(

com.google.datastore.v1.RunQueryResponse runQuery(
final com.google.datastore.v1.RunQueryRequest requestPb) {
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_RUNQUERY);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.runQuery(requestPb),
retrySettings,
requestPb.getReadOptions().getTransaction().isEmpty()
? EXCEPTION_HANDLER
: TRANSACTION_OPERATION_EXCEPTION_HANDLER,
getOptions().getClock());
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY);
ReadOptions readOptions = requestPb.getReadOptions();
span.setAttribute(
"isTransactional", readOptions.hasTransaction() || readOptions.hasNewTransaction());
span.setAttribute("readConsistency", readOptions.getReadConsistency().toString());

try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
RunQueryResponse response =
RetryHelper.runWithRetries(
() -> datastoreRpc.runQuery(requestPb),
retrySettings,
requestPb.getReadOptions().getTransaction().isEmpty()
? EXCEPTION_HANDLER
: TRANSACTION_OPERATION_EXCEPTION_HANDLER,
getOptions().getClock());
span.addEvent(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY + ": Completed",
new ImmutableMap.Builder<String, Object>()
.put("Received", response.getBatch().getEntityResultsCount())
.put("More results", response.getBatch().getMoreResults().toString())
.build());
return response;
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.datastore;

import static com.google.cloud.BaseService.EXCEPTION_HANDLER;
import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetrySettings;
Expand All @@ -39,9 +38,6 @@
import com.google.datastore.v1.RunAggregationQueryResponse;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.concurrent.Callable;

/**
Expand All @@ -52,7 +48,7 @@
public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {

private final DatastoreRpc datastoreRpc;
private final TraceUtil traceUtil;
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
private final RetrySettings retrySettings;
private final DatastoreOptions datastoreOptions;

Expand All @@ -62,9 +58,9 @@ public RetryAndTraceDatastoreRpcDecorator(
RetrySettings retrySettings,
DatastoreOptions datastoreOptions) {
this.datastoreRpc = datastoreRpc;
this.traceUtil = traceUtil;
this.retrySettings = retrySettings;
this.datastoreOptions = datastoreOptions;
this.otelTraceUtil = datastoreOptions.getTraceUtil();
}

@Override
Expand Down Expand Up @@ -106,19 +102,20 @@ public RunQueryResponse runQuery(RunQueryRequest request) {
@Override
public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) {
return invokeRpc(
() -> datastoreRpc.runAggregationQuery(request), SPAN_NAME_RUN_AGGREGATION_QUERY);
() -> datastoreRpc.runAggregationQuery(request),
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY);
}

public <O> O invokeRpc(Callable<O> block, String startSpan) {
Span span = traceUtil.startSpan(startSpan);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public interface TraceUtil {
static final String ATTRIBUTE_SERVICE_PREFIX = "gcp.datastore.";
static final String ENABLE_TRACING_ENV_VAR = "DATASTORE_ENABLE_TRACING";
static final String LIBRARY_NAME = "com.google.cloud.datastore";

static final String SPAN_NAME_LOOKUP = "Lookup";

static final String SPAN_NAME_COMMIT = "Commit";
static final String SPAN_NAME_RUN_QUERY = "RunQuery";
static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery";

/**
* Creates and returns an instance of the TraceUtil class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package com.google.cloud.datastore;

import static com.google.cloud.datastore.TraceUtil.END_SPAN_OPTIONS;
import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.common.truth.Truth.assertThat;
import static com.google.rpc.Code.UNAVAILABLE;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
Expand All @@ -29,8 +26,6 @@
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.datastore.v1.RunAggregationQueryRequest;
import com.google.datastore.v1.RunAggregationQueryResponse;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -49,15 +44,13 @@ public class RetryAndTraceDatastoreRpcDecoratorTest {
@Before
public void setUp() throws Exception {
mockDatastoreRpc = createStrictMock(DatastoreRpc.class);
mockTraceUtil = createStrictMock(TraceUtil.class);
datastoreRpcDecorator =
new RetryAndTraceDatastoreRpcDecorator(
mockDatastoreRpc, mockTraceUtil, retrySettings, datastoreOptions);
}

@Test
public void testRunAggregationQuery() {
Span mockSpan = createStrictMock(Span.class);
RunAggregationQueryRequest aggregationQueryRequest =
RunAggregationQueryRequest.getDefaultInstance();
RunAggregationQueryResponse aggregationQueryResponse =
Expand All @@ -69,16 +62,13 @@ public void testRunAggregationQuery() {
UNAVAILABLE.getNumber(), "API not accessible currently", UNAVAILABLE.name()))
.times(2)
.andReturn(aggregationQueryResponse);
expect(mockTraceUtil.startSpan(SPAN_NAME_RUN_AGGREGATION_QUERY)).andReturn(mockSpan);
expect(mockTraceUtil.getTracer()).andReturn(createNiceMock(Tracer.class));
mockSpan.end(END_SPAN_OPTIONS);

replay(mockDatastoreRpc, mockTraceUtil, mockSpan);
replay(mockDatastoreRpc);

RunAggregationQueryResponse actualAggregationQueryResponse =
datastoreRpcDecorator.runAggregationQuery(aggregationQueryRequest);

assertThat(actualAggregationQueryResponse).isSameInstanceAs(aggregationQueryResponse);
verify(mockDatastoreRpc, mockTraceUtil, mockSpan);
verify(mockDatastoreRpc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package com.google.cloud.datastore.it;

import static com.google.cloud.datastore.aggregation.Aggregation.count;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY;
import static com.google.common.truth.Truth.assertThat;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -26,10 +30,17 @@
import static org.junit.Assert.assertTrue;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.datastore.AggregationQuery;
import com.google.cloud.datastore.AggregationResult;
import com.google.cloud.datastore.AggregationResults;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.QueryResults;
import com.google.cloud.datastore.StructuredQuery;
import com.google.cloud.datastore.StructuredQuery.PropertyFilter;
import com.google.cloud.datastore.testing.RemoteDatastoreHelper;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
Expand Down Expand Up @@ -91,6 +102,7 @@
// 5. Traces are read-back using TraceServiceClient and verified against expected Call Stacks.
@RunWith(TestParameterInjector.class)
public class ITE2ETracingTest {

protected boolean isUsingGlobalOpenTelemetrySDK() {
return useGlobalOpenTelemetrySDK;
}
Expand Down Expand Up @@ -210,6 +222,10 @@ private boolean dfsContainsCallStack(long spanId, List<String> expectedCallStack

private static Key KEY2;

private static Key KEY3;

private static Key KEY4;

// Random int generator for trace ID and span ID
private static Random random;

Expand Down Expand Up @@ -301,14 +317,21 @@ public void before() throws Exception {
String projectId = options.getProjectId();
String kind1 = "kind1";
KEY1 =
Key.newBuilder(projectId, kind1, "name1", options.getDatabaseId())
Key.newBuilder(projectId, kind1, "key1", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();
KEY2 =
Key.newBuilder(projectId, kind1, "name2", options.getDatabaseId())
Key.newBuilder(projectId, kind1, "key3", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();
KEY3 =
Key.newBuilder(projectId, kind1, "key4", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();
KEY4 =
Key.newBuilder(projectId, kind1, "key2", options.getDatabaseId())
.setNamespace(options.getNamespace())
.build();

// Set up the tracer for custom TraceID injection
rootSpanName =
String.format("%s%d", this.getClass().getSimpleName(), System.currentTimeMillis());
Expand Down Expand Up @@ -594,7 +617,6 @@ public void updateTraceTest() throws Exception {
assertEquals(entityList, response);

Span rootSpan = getNewRootSpanWithContext();

try (Scope ignored = rootSpan.makeCurrent()) {
Entity entity1_update =
Entity.newBuilder(entity1).set("test_field", "new_test_value1").build();
Expand Down Expand Up @@ -625,7 +647,92 @@ public void deleteTraceTest() throws Exception {
rootSpan.end();
}
waitForTracesToComplete();

fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_COMMIT);
}

@Test
public void runQueryTraceTest() throws Exception {
Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build();
Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build();
List<Entity> entityList = new ArrayList<>();
entityList.add(entity1);
entityList.add(entity2);

List<Entity> response = datastore.add(entity1, entity2);
assertEquals(entityList, response);

Span rootSpan = getNewRootSpanWithContext();
try (Scope ignored = rootSpan.makeCurrent()) {
PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field"));
Query<Entity> query =
Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build();
QueryResults<Entity> queryResults = datastore.run(query);
assertTrue(queryResults.hasNext());
assertEquals(entity1, queryResults.next());
assertFalse(queryResults.hasNext());
} finally {
rootSpan.end();
}
waitForTracesToComplete();

fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_QUERY);
}

@Test
public void runAggregationQueryTraceTest() throws Exception {
Entity entity1 =
Entity.newBuilder(KEY1)
.set("pepper_name", "jalapeno")
.set("max_scoville_level", 10000)
.build();
Entity entity2 =
Entity.newBuilder(KEY2)
.set("pepper_name", "serrano")
.set("max_scoville_level", 25000)
.build();
Entity entity3 =
Entity.newBuilder(KEY3)
.set("pepper_name", "habanero")
.set("max_scoville_level", 350000)
.build();
Entity entity4 =
Entity.newBuilder(KEY4)
.set("pepper_name", "ghost")
.set("max_scoville_level", 1500000)
.build();

List<Entity> entityList = new ArrayList<>();
entityList.add(entity1);
entityList.add(entity2);
entityList.add(entity3);
entityList.add(entity4);

List<Entity> response = datastore.add(entity1, entity2, entity3, entity4);
assertEquals(entityList, response);

Span rootSpan = getNewRootSpanWithContext();
try (Scope ignored = rootSpan.makeCurrent()) {
PropertyFilter mediumSpicyFilters = PropertyFilter.lt("max_scoville_level", 100000);
StructuredQuery<Entity> mediumSpicyQuery =
Query.newEntityQueryBuilder()
.setKind(KEY1.getKind())
.setFilter(mediumSpicyFilters)
.build();
AggregationQuery countSpicyPeppers =
Query.newAggregationQueryBuilder()
.addAggregation(count().as("count"))
.over(mediumSpicyQuery)
.build();
AggregationResults results = datastore.runAggregation(countSpicyPeppers);
assertThat(results.size()).isEqualTo(1);
AggregationResult result = results.get(0);
assertThat(result.getLong("count")).isEqualTo(2L);
} finally {
rootSpan.end();
}

waitForTracesToComplete();

fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_AGGREGATION_QUERY);
}
}
Loading