Skip to content

Commit

Permalink
feat: add instrumentation for a couple OpenTelemetry metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawal-siddharth committed May 16, 2024
1 parent cc9fdfd commit c99bd10
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 1 deletion.
4 changes: 4 additions & 0 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
<artifactId>google-auth-library-credentials</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -250,6 +258,23 @@ class ConnectionWorker implements AutoCloseable {
private static String projectMatching = "projects/[^/]+/";
private static Pattern streamPatternProject = Pattern.compile(projectMatching);

private Meter writeMeter;
static AttributeKey<String> telemetryKeyStreamId = AttributeKey.stringKey("streamId");
static AttributeKey<String> telemetryKeyWriterId = AttributeKey.stringKey("writerId");
static List<AttributeKey<String>> telemetryKeysTraceId =
new ArrayList<AttributeKey<String>>() {
{
add(AttributeKey.stringKey("traceField0"));
add(AttributeKey.stringKey("traceField1"));
add(AttributeKey.stringKey("traceField2"));
add(AttributeKey.stringKey("traceField3"));
add(AttributeKey.stringKey("traceField4"));
}
};
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;

static final Pattern DEFAULT_STREAM_PATTERN =
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");

Expand Down Expand Up @@ -278,6 +303,83 @@ static String getRoutingHeader(String streamName, String location) {
return project + "locations/" + location;
}

private Attributes buildOpenTelemetryAttributes() {
AttributesBuilder builder = Attributes.builder().put(telemetryKeyStreamId, this.streamName);
builder.put(telemetryKeyWriterId, this.writerId);
if ((this.traceId != null) && !this.traceId.isEmpty()) {
String[] traceIdParts = this.traceId.split(":", 5);
for (int i = 0; i < traceIdParts.length; i++) {
builder.put(telemetryKeysTraceId.get(i), traceIdParts[i]);
}
}
return builder.build();
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
// streamName can change due to multiplexing. If it has changed, update the attributes.
String originalStreamNameAttribute = telemetryAttributes.get(telemetryKeyStreamId);
if ((originalStreamNameAttribute != null)
&& !originalStreamNameAttribute.equals(this.streamName)) {
AttributesBuilder builder = telemetryAttributes.toBuilder();
builder.put("streamId", this.streamName);
telemetryAttributes = builder.build();
}
return telemetryAttributes;
}

private void registerOpenTelemetryMetrics() {
MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider();
writeMeter =
meterProvider
.meterBuilder("com.google.cloud.bigquery.storage.v1.write")
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
writeMeter
.counterBuilder("incoming-request-count")
.setDescription("Counts number of incoming requests")
.build();
instrumentIncomingRequestSize =
writeMeter
.counterBuilder("incoming-request-size")
.setDescription("Counts byte size of incoming requests")
.build();
writeMeter
.gaugeBuilder("waiting-queue-length")
.ofLongs()
.setDescription(
"Reports length of waiting queue. This queue contains requests buffered in the client and not yet sent to the server.")
.buildWithCallback(
result -> {
long waitQueueSize = 0;
this.lock.lock();
try {
waitQueueSize = this.waitingRequestQueue.size();
} finally {
this.lock.unlock();
}
result.record(waitQueueSize, getTelemetryAttributes());
});
writeMeter
.gaugeBuilder("inflight-queue-length")
.ofLongs()
.setDescription(
"Reports length of inflight queue. This queue contains sent append requests waiting for response from server.")
.buildWithCallback(
result -> {
long inflightQueueSize = 0;
this.lock.lock();
try {
inflightQueueSize = this.inflightRequestQueue.size();
} finally {
this.lock.unlock();
}
result.record(inflightQueueSize, getTelemetryAttributes());
});
}

public ConnectionWorker(
String streamName,
String location,
Expand Down Expand Up @@ -312,6 +414,9 @@ public ConnectionWorker(
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
this.retrySettings = retrySettings;
this.telemetryAttributes = buildOpenTelemetryAttributes();
registerOpenTelemetryMetrics();

// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
Expand Down Expand Up @@ -506,6 +611,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
this.lock.lock();
try {
if (userClosed) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import java.util.logging.Logger;

/** Container for global singleton objects. */
class Singletons {

private static final Logger log = Logger.getLogger(Singletons.class.getName());

// Global OpenTelemetry instance
private static OpenTelemetry openTelemetry = null;

static OpenTelemetry getOpenTelemetry() {
if (openTelemetry == null) {
openTelemetry = GlobalOpenTelemetry.get();
}
return openTelemetry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -741,6 +742,63 @@ public void testLongTimeIdleWontFail() throws Exception {
}
}

@Test
public void testOpenTelemetryAttributesWithTraceId() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
null,
createProtoSchema("foo"),
100000,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"A:B:C",
null,
client.getSettings(),
retrySettings);

Attributes attributes = connectionWorker.getTelemetryAttributes();
String attributeStreamId = attributes.get(ConnectionWorker.telemetryKeyStreamId);
assertEquals(attributeStreamId, TEST_STREAM_1);
String attributesWriterId = attributes.get(ConnectionWorker.telemetryKeyWriterId);
assertEquals(attributesWriterId, connectionWorker.getWriterId());
String attributesTraceId0 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(0));
assertEquals(attributesTraceId0, "A");
String attributesTraceId1 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(1));
assertEquals(attributesTraceId1, "B");
String attributesTraceId2 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(2));
assertEquals(attributesTraceId2, "C");
String attributesTraceId3 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(3));
assertEquals(attributesTraceId3, null);
}

@Test
public void testOpenTelemetryAttributesWithoutTraceId() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
null,
createProtoSchema("foo"),
100000,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
null,
null,
client.getSettings(),
retrySettings);
Attributes attributes = connectionWorker.getTelemetryAttributes();
String attributesTraceId0 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(0));
assertEquals(attributesTraceId0, null);
}

@Test
public void testLocationName() throws Exception {
assertEquals(
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@
<artifactId>json</artifactId>
<version>20240303</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.38.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down

0 comments on commit c99bd10

Please sign in to comment.