diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index e139220de2..5a16b6c299 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -201,8 +201,7 @@ public void close() throws Exception { @Override public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create(Bucket, BucketTargetOption"); + OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create"); Opts opts = Opts.unwrap(options).resolveFrom(bucketInfo).prepend(defaultOpts); GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); @@ -248,13 +247,28 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option public Blob create( BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - return internalDirectUpload(blobInfo, opts, ByteBuffer.wrap(content, offset, length)) - .asBlob(this); + // Start the otel span to retain information of the origin of the request + OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create"); + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { + return internalDirectUpload( + blobInfo, + opts, + ByteBuffer.wrap(content, offset, length), + openTelemetryTraceUtil.currentContext()) + .asBlob(this); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } finally { + otelSpan.end(); + } } @Override public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) { - try { + OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create"); + try (OpenTelemetryTraceUtil.Scope ununsed = otelSpan.makeCurrent()) { requireNonNull(blobInfo, "blobInfo must be non null"); InputStream inputStreamParam = firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)); @@ -281,7 +295,11 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op ApiFuture responseApiFuture = session.getResult(); return this.getBlob(responseApiFuture); } catch (IOException | ApiException e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); + } finally { + otelSpan.end(); } } @@ -800,8 +818,19 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options @Override public BlobInfo internalDirectUpload( BlobInfo blobInfo, Opts opts, ByteBuffer buf) { + return internalDirectUpload(blobInfo, opts, buf, null); + } + + @Override + public BlobInfo internalDirectUpload( + BlobInfo blobInfo, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { requireNonNull(blobInfo, "blobInfo must be non null"); requireNonNull(buf, "content must be non null"); + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("internalDirectUpload(BlobInfo)", ctx); Opts optsWithDefaults = opts.prepend(defaultOpts); GrpcCallContext grpcCallContext = optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); @@ -809,28 +838,36 @@ public BlobInfo internalDirectUpload( Hasher hasher = Hasher.enabled(); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); RewindableContent content = RewindableContent.of(buf); - return Retrying.run( - getOptions(), - retryAlgorithmManager.getFor(req), - () -> { - content.rewindTo(0); - UnbufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .setHasher(hasher) - .direct() - .unbuffered() - .setRequest(req) - .build(); - - try (UnbufferedWritableByteChannel c = session.open()) { - content.writeTo(c); - } - return session.getResult(); - }, - this::getBlob); + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { + return Retrying.run( + getOptions(), + retryAlgorithmManager.getFor(req), + () -> { + content.rewindTo(0); + UnbufferedWritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) + .setByteStringStrategy(ByteStringStrategy.noCopy()) + .setHasher(hasher) + .direct() + .unbuffered() + .setRequest(req) + .build(); + + try (UnbufferedWritableByteChannel c = session.open()) { + content.writeTo(c); + } + return session.getResult(); + }, + this::getBlob); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } finally { + otelSpan.end(); + } } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java index db807ce6ce..0d08759d1f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java @@ -219,6 +219,7 @@ public void close() throws IOException { // We never created any parts // create an empty object try { + // TODO: Add in Otel context when available BlobInfo blobInfo = storage.internalDirectUpload(ultimateObject, opts, Buffers.allocate(0)); finalObject.set(blobInfo); return; @@ -285,6 +286,7 @@ private void internalFlush(ByteBuffer buf) { ApiFutures.immediateFuture(partInfo), info -> { try { + // TODO: Add in Otel context when available return storage.internalDirectUpload(info, partOpts, buf); } catch (StorageException e) { // a precondition failure usually means the part was created, but we didn't get the diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index ee538bcde8..1726660ca9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -50,6 +50,7 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest; import com.google.common.base.CharMatcher; @@ -1737,7 +1738,11 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, ByteBuffer buf) { + public BlobInfo internalDirectUpload( + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { BlobInfo.Builder builder = info.toBuilder() diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java index 0d700c46df..403f03e9ee 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java @@ -20,6 +20,7 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; @@ -31,7 +32,16 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, ByteBuffer buf) { + default BlobInfo internalDirectUpload( + BlobInfo blobInfo, Opts opts, ByteBuffer buf) { + throw new UnsupportedOperationException("not implemented"); + } + + default BlobInfo internalDirectUpload( + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { throw new UnsupportedOperationException("not implemented"); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java index 9c71f4842a..d7ce734c07 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java @@ -156,7 +156,6 @@ public Scope makeCurrent() { public OpenTelemetryTraceUtil.Span startSpan(String methodName) { String formatSpanName = String.format("%s.%s/%s", "storage", "client", methodName); SpanBuilder spanBuilder = tracer.spanBuilder(formatSpanName).setSpanKind(SpanKind.CLIENT); - spanBuilder.setAttribute("rpc.system", transport); io.opentelemetry.api.trace.Span span = addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); return new Span(span, formatSpanName); @@ -164,16 +163,17 @@ public OpenTelemetryTraceUtil.Span startSpan(String methodName) { @Override public OpenTelemetryTraceUtil.Span startSpan( - String spanName, OpenTelemetryTraceUtil.Context parent) { + String methodName, OpenTelemetryTraceUtil.Context parent) { assert (parent instanceof OpenTelemetryInstance.Context); + String formatSpanName = String.format("%s.%s/%s", "storage", "client", methodName); SpanBuilder spanBuilder = tracer - .spanBuilder(spanName) + .spanBuilder(formatSpanName) .setSpanKind(SpanKind.CLIENT) .setParent(((OpenTelemetryInstance.Context) parent).context); io.opentelemetry.api.trace.Span span = addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); - return new Span(span, spanName); + return new Span(span, formatSpanName); } @Nonnull @@ -196,6 +196,7 @@ private SpanBuilder addSettingsAttributesToCurrentSpan(SpanBuilder spanBuilder) .put("gcp.client.version", GaxProperties.getLibraryVersion(this.getClass())) .put("gcp.client.repo", "googleapis/java-storage") .put("gcp.client.artifact", "com.google.cloud.google-cloud-storage") + .put("rpc.system", transport) .build()); return spanBuilder; } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java new file mode 100644 index 0000000000..98d24ba159 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.cloud.NoCredentials; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.annotations.SingleBackend; +import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.cloud.storage.it.runner.registry.TestBench; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.List; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@SingleBackend(Backend.TEST_BENCH) +public class ITGrpcOpenTelemetryTest { + @Inject public TestBench testBench; + private StorageOptions options; + private SpanExporter exporter; + private Storage storage; + @Inject public Generator generator; + @Inject public BucketInfo testBucket; + + @Before + public void setUp() { + exporter = new TestExporter(); + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build()) + .build(); + options = + StorageOptions.grpc() + .setHost(testBench.getGRPCBaseUri()) + .setProjectId("projectId") + .setCredentials(NoCredentials.getInstance()) + .setOpenTelemetrySdk(openTelemetrySdk) + .build(); + storage = options.getService(); + } + + @Test + public void runCreateBucket() { + String bucket = "random-bucket"; + storage.create(BucketInfo.of(bucket)); + TestExporter testExported = (TestExporter) exporter; + SpanData spanData = testExported.getExportedSpans().get(0); + Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service")); + Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo")); + Assert.assertEquals( + "com.google.cloud.google-cloud-storage", + getAttributeValue(spanData, "gcp.client.artifact")); + Assert.assertEquals("grpc", getAttributeValue(spanData, "rpc.system")); + } + + @Test + public void runCreateBlob() { + byte[] content = "Hello, World!".getBytes(UTF_8); + BlobId toCreate = BlobId.of(testBucket.getName(), generator.randomObjectName()); + storage.create(BlobInfo.newBuilder(toCreate).build(), content); + TestExporter testExported = (TestExporter) exporter; + List spanData = testExported.getExportedSpans(); + for (SpanData span : spanData) { + Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); + Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo")); + Assert.assertEquals( + "com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact")); + Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system")); + } + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("create"))); + Assert.assertTrue( + spanData.stream().anyMatch(x -> x.getName().contains("internalDirectUpload"))); + Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext()); + } + + private String getAttributeValue(SpanData spanData, String key) { + return spanData.getAttributes().get(AttributeKey.stringKey(key)).toString(); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/otel/TestExporter.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/otel/TestExporter.java new file mode 100644 index 0000000000..93ac4b3483 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/otel/TestExporter.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.otel; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +class TestExporter implements SpanExporter { + + public final List exportedSpans = Collections.synchronizedList(new ArrayList<>()); + + @Override + public CompletableResultCode export(Collection spans) { + exportedSpans.addAll(spans); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return null; + } + + @Override + public CompletableResultCode shutdown() { + return null; + } + + public List getExportedSpans() { + return exportedSpans; + } +}