From d8361c2cb0beb5ceb8934a7c7cfd51c6b1bba792 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Tue, 12 Nov 2024 11:29:19 -0800 Subject: [PATCH 1/3] feat: Instrument ReadAllBytes and CreateFrom --- .../google/cloud/storage/GrpcStorageImpl.java | 94 ++++++++++++------- .../google/cloud/storage/StorageInternal.java | 4 + 2 files changed, 66 insertions(+), 32 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index cf58235678..d03d15734f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -315,18 +315,30 @@ public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) @Override public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) throws IOException { - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); - return internalCreateFrom(path, blobInfo, opts); + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName()); + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + return internalCreateFrom(path, blobInfo, opts, openTelemetryTraceUtil.currentContext()); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } finally { + otelSpan.end(); + } } @Override - public Blob internalCreateFrom(Path path, BlobInfo info, Opts opts) + public Blob internalCreateFrom(Path path, BlobInfo info, Opts opts, OpenTelemetryTraceUtil.Context ctx) throws IOException { + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("internalCreateFrom", this.getClass().getName(), ctx); requireNonNull(path, "path must be non null"); if (Files.isDirectory(path)) { throw new StorageException(0, path + " is a directory"); } - + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); WriteObjectRequest req = getWriteObjectRequest(info, opts); @@ -347,7 +359,6 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts o rw, Hasher.noop()), MoreExecutors.directExecutor()); - try { GrpcResumableSession got = session2.get(); ResumableOperationResult<@Nullable Object> put = got.put(RewindableContent.of(path)); Object object = put.getObject(); @@ -358,7 +369,11 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts o } return codecs.blobInfo().decode(object).asBlob(this); } catch (InterruptedException | ExecutionException e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); + } finally { + otelSpan.end(); } } @@ -372,37 +387,45 @@ public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption.. public Blob createFrom( BlobInfo blobInfo, InputStream in, int bufferSize, BlobWriteOption... options) throws IOException { - requireNonNull(blobInfo, "blobInfo must be non null"); + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName()); + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { + requireNonNull(blobInfo, "blobInfo must be non null"); - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - ApiFuture start = startResumableWrite(grpcCallContext, req, opts); + ApiFuture start = startResumableWrite(grpcCallContext, req, opts); - BufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel( - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) - .setHasher(Hasher.noop()) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .resumable() - .withRetryConfig(getOptions(), retryAlgorithmManager.idempotent()) - .buffered(Buffers.allocateAligned(bufferSize, _256KiB)) - .setStartAsync(start) - .build(); + BufferedWritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel( + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) + .setHasher(Hasher.noop()) + .setByteStringStrategy(ByteStringStrategy.noCopy()) + .resumable() + .withRetryConfig(getOptions(), retryAlgorithmManager.idempotent()) + .buffered(Buffers.allocateAligned(bufferSize, _256KiB)) + .setStartAsync(start) + .build(); - // Specifically not in the try-with, so we don't close the provided stream - ReadableByteChannel src = - Channels.newChannel(firstNonNull(in, new ByteArrayInputStream(ZERO_BYTES))); - try (BufferedWritableByteChannel dst = session.open()) { - ByteStreams.copy(src, dst); - } catch (Exception e) { - throw StorageException.coalesce(e); + // Specifically not in the try-with, so we don't close the provided stream + ReadableByteChannel src = + Channels.newChannel(firstNonNull(in, new ByteArrayInputStream(ZERO_BYTES))); + try (BufferedWritableByteChannel dst = session.open()) { + ByteStreams.copy(src, dst); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } + return getBlob(session.getResult()); + } finally { + otelSpan.end(); } - return getBlob(session.getResult()); } @Override @@ -732,14 +755,21 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio @Override public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("readAllBytes", this.getClass().getName()); UnbufferedReadableByteChannelSession session = unbufferedReadSession(blob, options); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (UnbufferedReadableByteChannel r = session.open(); + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent(); + UnbufferedReadableByteChannel r = session.open(); WritableByteChannel w = Channels.newChannel(baos)) { ByteStreams.copy(r, w); } catch (ApiException | IOException e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); + } finally { + otelSpan.end(); } return baos.toByteArray(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java index 403f03e9ee..6f38190031 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java @@ -31,6 +31,10 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, OpenTelemetryTraceUtil.Context ctx) + throws IOException { + throw new UnsupportedOperationException("not implemented"); + } default BlobInfo internalDirectUpload( BlobInfo blobInfo, Opts opts, ByteBuffer buf) { From 43dbb2ce92ef2f85b07c6123b38fc09e551c5670 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Tue, 12 Nov 2024 12:19:31 -0800 Subject: [PATCH 2/3] Adding tests --- .../storage/ITGrpcOpenTelemetryTest.java | 50 +++++++++++++++---- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java index 98d24ba159..5d426902a5 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java @@ -31,6 +31,9 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; import java.util.List; import org.junit.Assert; import org.junit.Before; @@ -44,6 +47,8 @@ public class ITGrpcOpenTelemetryTest { private StorageOptions options; private SpanExporter exporter; private Storage storage; + private static final byte[] helloWorldTextBytes = "hello world".getBytes(); + private BlobId blobId; @Inject public Generator generator; @Inject public BucketInfo testBucket; @@ -65,6 +70,8 @@ public void setUp() { .setOpenTelemetrySdk(openTelemetrySdk) .build(); storage = options.getService(); + String objectString = generator.randomObjectName(); + blobId = BlobId.of(testBucket.getName(), objectString); } @Test @@ -72,13 +79,8 @@ public void runCreateBucket() { String bucket = "random-bucket"; storage.create(BucketInfo.of(bucket)); TestExporter testExported = (TestExporter) exporter; - SpanData spanData = testExported.getExportedSpans().get(0); - Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service")); - Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo")); - Assert.assertEquals( - "com.google.cloud.google-cloud-storage", - getAttributeValue(spanData, "gcp.client.artifact")); - Assert.assertEquals("grpc", getAttributeValue(spanData, "rpc.system")); + List spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); } @Test @@ -88,6 +90,36 @@ public void runCreateBlob() { storage.create(BlobInfo.newBuilder(toCreate).build(), content); TestExporter testExported = (TestExporter) exporter; List spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("create"))); + Assert.assertTrue( + spanData.stream().anyMatch(x -> x.getName().contains("internalDirectUpload"))); + Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext()); + } + + @Test + public void runReadAllBytes() { + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build(); + storage.create(blobInfo, helloWorldTextBytes); + byte[] read = storage.readAllBytes(blobId); + TestExporter testExported = (TestExporter) exporter; + List spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("readAllBytes"))); + } + + @Test + public void createFrom() throws IOException { + Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath(); + storage.createFrom(BlobInfo.newBuilder(blobId).build(), helloWorldTxtGz); + TestExporter testExported = (TestExporter) exporter; + List spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom"))); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("internalCreateFrom"))); + } + + private void checkCommonAttributes(List spanData) { for (SpanData span : spanData) { Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo")); @@ -95,10 +127,6 @@ public void runCreateBlob() { "com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact")); Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system")); } - Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("create"))); - Assert.assertTrue( - spanData.stream().anyMatch(x -> x.getName().contains("internalDirectUpload"))); - Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext()); } private String getAttributeValue(SpanData spanData, String key) { From dbbb26b954180ed84f12ca373d9e2a92e19bd612 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Tue, 12 Nov 2024 12:22:13 -0800 Subject: [PATCH 3/3] linter --- .../google/cloud/storage/GrpcStorageImpl.java | 46 ++++++++++--------- .../google/cloud/storage/StorageInternal.java | 4 +- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index d03d15734f..2c00fdf300 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -330,7 +330,8 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp } @Override - public Blob internalCreateFrom(Path path, BlobInfo info, Opts opts, OpenTelemetryTraceUtil.Context ctx) + public Blob internalCreateFrom( + Path path, BlobInfo info, Opts opts, OpenTelemetryTraceUtil.Context ctx) throws IOException { OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("internalCreateFrom", this.getClass().getName(), ctx); @@ -339,26 +340,26 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts o throw new StorageException(0, path + " is a directory"); } try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(info, opts); - - ClientStreamingCallable write = - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); - - ApiFuture start = startResumableWrite(grpcCallContext, req, opts); - ApiFuture session2 = - ApiFutures.transform( - start, - rw -> - ResumableSession.grpc( - getOptions(), - retryAlgorithmManager.idempotent(), - write, - storageClient.queryWriteStatusCallable(), - rw, - Hasher.noop()), - MoreExecutors.directExecutor()); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = getWriteObjectRequest(info, opts); + + ClientStreamingCallable write = + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); + + ApiFuture start = startResumableWrite(grpcCallContext, req, opts); + ApiFuture session2 = + ApiFutures.transform( + start, + rw -> + ResumableSession.grpc( + getOptions(), + retryAlgorithmManager.idempotent(), + write, + storageClient.queryWriteStatusCallable(), + rw, + Hasher.noop()), + MoreExecutors.directExecutor()); GrpcResumableSession got = session2.get(); ResumableOperationResult<@Nullable Object> put = got.put(RewindableContent.of(path)); Object object = put.getObject(); @@ -419,7 +420,8 @@ public Blob createFrom( ByteStreams.copy(src, dst); } catch (Exception e) { otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + otelSpan.setStatus( + io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); } return getBlob(session.getResult()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java index 6f38190031..8fd230a19c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java @@ -31,7 +31,9 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, OpenTelemetryTraceUtil.Context ctx) + + default BlobInfo internalCreateFrom( + Path path, BlobInfo info, Opts opts, OpenTelemetryTraceUtil.Context ctx) throws IOException { throw new UnsupportedOperationException("not implemented"); }