Skip to content

Commit

Permalink
feat: add instrumentation for a couple OpenTelemetry metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawal-siddharth committed May 25, 2024
1 parent 05ebe17 commit ff5529a
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 1 deletion.
4 changes: 4 additions & 0 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
<artifactId>google-auth-library-credentials</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -253,6 +261,23 @@ class ConnectionWorker implements AutoCloseable {
static final Pattern DEFAULT_STREAM_PATTERN =
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");

private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private Meter writeMeter;
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
static List<AttributeKey<String>> telemetryKeysTraceId =
new ArrayList<AttributeKey<String>>() {
{
add(AttributeKey.stringKey("trace_field_1"));
add(AttributeKey.stringKey("trace_field_2"));
add(AttributeKey.stringKey("trace_field_3"));
}
};
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;
private LongCounter instrumentIncomingRequestRows;

public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
return matcher.matches();
Expand All @@ -278,6 +303,88 @@ static String getRoutingHeader(String streamName, String location) {
return project + "locations/" + location;
}

private String getTableName() {
Matcher tableMatcher = streamPatternTable.matcher(this.streamName);
return tableMatcher.find() ? tableMatcher.group(1) : "";
}

private void setTraceIdAttributesPart(
AttributesBuilder builder,
String[] traceIdParts,
int indexPartsToCheck,
int indexTelemetryKeysToUse) {
if ((indexPartsToCheck < traceIdParts.length) && !traceIdParts[indexPartsToCheck].isEmpty()) {
builder.put(
telemetryKeysTraceId.get(indexTelemetryKeysToUse), traceIdParts[indexPartsToCheck]);
}
}

private void setTraceIdAttributes(AttributesBuilder builder) {
if ((this.traceId != null) && !this.traceId.isEmpty()) {
if (this.traceId.toLowerCase().indexOf("dataflow:") >= 0) {
String[] traceIdParts = this.traceId.split(":", 8);
for (int i = 0; i < traceIdParts.length; i++) {
if (traceIdParts[i].toLowerCase().indexOf("dataflow") >= 0) {
setTraceIdAttributesPart(builder, traceIdParts, i + 1, 0);
setTraceIdAttributesPart(builder, traceIdParts, i + 2, 1);
setTraceIdAttributesPart(builder, traceIdParts, i + 3, 2);
break;
}
}
}
}
}

private Attributes buildOpenTelemetryAttributes() {
AttributesBuilder builder = Attributes.builder();
String tableName = getTableName();
if (!tableName.isEmpty()) {
builder.put(telemetryKeyTableId, tableName);
}
setTraceIdAttributes(builder);
return builder.build();
}

private void refreshOpenTelemetryTableNameAttributes() {
String tableName = getTableName();
if (!tableName.isEmpty()
&& !tableName.equals(getTelemetryAttributes().get(telemetryKeyTableId))) {
AttributesBuilder builder = getTelemetryAttributes().toBuilder();
builder.put(telemetryKeyTableId, tableName);
this.telemetryAttributes = builder.build();
}
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return telemetryAttributes;
}

private void registerOpenTelemetryMetrics() {
MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider();
writeMeter =
meterProvider
.meterBuilder("com.google.cloud.bigquery.storage.v1.write")
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
writeMeter
.counterBuilder("append_requests")
.setDescription("Counts number of incoming requests")
.build();
instrumentIncomingRequestSize =
writeMeter
.counterBuilder("append_request_bytes")
.setDescription("Counts byte size of incoming requests")
.build();
instrumentIncomingRequestRows =
writeMeter
.counterBuilder("append_rows")
.setDescription("Counts number of incoming request rows")
.build();
}

public ConnectionWorker(
String streamName,
String location,
Expand Down Expand Up @@ -312,6 +419,9 @@ public ConnectionWorker(
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
this.retrySettings = retrySettings;
this.telemetryAttributes = buildOpenTelemetryAttributes();
registerOpenTelemetryMetrics();

// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
Expand Down Expand Up @@ -507,6 +617,9 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -783,6 +896,7 @@ private void appendLoop() {
|| (originalRequest.getProtoRows().hasWriterSchema()
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
streamName = originalRequest.getWriteStream();
refreshOpenTelemetryTableNameAttributes();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForTableOrSchemaSwitch = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import java.util.logging.Logger;

/** Container for global singleton objects. */
class Singletons {

private static final Logger log = Logger.getLogger(Singletons.class.getName());

// Global OpenTelemetry instance
private static OpenTelemetry openTelemetry = null;

static OpenTelemetry getOpenTelemetry() {
if (openTelemetry == null) {
openTelemetry = GlobalOpenTelemetry.get();
}
return openTelemetry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -754,6 +755,97 @@ public void testLongTimeIdleWontFail() throws Exception {
}
}

private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, String expected)
throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
null,
schema1,
100000,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
null,
null,
client.getSettings(),
retrySettings);

Attributes attributes = connectionWorker.getTelemetryAttributes();
String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId);
assertEquals(expected, attributesTableId);
}

@Test
public void testOpenTelemetryAttributesWithStreamNames() throws Exception {
exerciseOpenTelemetryAttributesWithStreamNames(
"projects/my_project/datasets/my_dataset/tables/my_table/streams/my_stream",
"projects/my_project/datasets/my_dataset/tables/my_table");
exerciseOpenTelemetryAttributesWithStreamNames(
"projects/my_project/datasets/my_dataset/tables/my_table/",
"projects/my_project/datasets/my_dataset/tables/my_table");
exerciseOpenTelemetryAttributesWithStreamNames(
"projects/my_project/datasets/my_dataset/tables/", null);
}

void checkOpenTelemetryTraceIdAttribute(Attributes attributes, int index, String expected) {
String attributesTraceId = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(index));
assertEquals(expected, attributesTraceId);
}

void exerciseOpenTelemetryAttributesWithTraceId(
String traceId, String expectedField1, String expectedField2, String expectedField3)
throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
null,
schema1,
100000,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
traceId,
null,
client.getSettings(),
retrySettings);

Attributes attributes = connectionWorker.getTelemetryAttributes();
checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);
checkOpenTelemetryTraceIdAttribute(attributes, 1, expectedField2);
checkOpenTelemetryTraceIdAttribute(attributes, 2, expectedField3);
}

@Test
public void testOpenTelemetryAttributesWithTraceId() throws Exception {
exerciseOpenTelemetryAttributesWithTraceId(null, null, null, null);
exerciseOpenTelemetryAttributesWithTraceId("a:b:c", null, null, null);
exerciseOpenTelemetryAttributesWithTraceId(
"java-streamwriter:HEAD+20240508-1544 Dataflow:monorail-c-multi:2024-05-08_11_44_34-6968230696879535523:1972585693681960752",
"monorail-c-multi",
"2024-05-08_11_44_34-6968230696879535523",
"1972585693681960752");
exerciseOpenTelemetryAttributesWithTraceId(
"Dataflow:2024-04-26_23_19_08-12221961051154168466",
"2024-04-26_23_19_08-12221961051154168466",
null,
null);
exerciseOpenTelemetryAttributesWithTraceId(
"Dataflow:writeapi3:2024-04-03_03_49_33-845412829237675723:63737042897365355",
"writeapi3",
"2024-04-03_03_49_33-845412829237675723",
"63737042897365355");
exerciseOpenTelemetryAttributesWithTraceId(
"java-streamwriter Dataflow:pubsub-to-bq-staging-tongruil-1024-static:2024-05-14_15_13_14-5530509399715326669:4531186922674871499",
"pubsub-to-bq-staging-tongruil-1024-static",
"2024-05-14_15_13_14-5530509399715326669",
"4531186922674871499");
exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow :c", null, null, null);
exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow:c:d", "c", "d", null);
}

@Test
public void testLocationName() throws Exception {
assertEquals(
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@
<artifactId>json</artifactId>
<version>20240303</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.38.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down

0 comments on commit ff5529a

Please sign in to comment.