diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index 03e37c174e..0b19230fdd 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -99,13 +99,13 @@ 7013 com/google/cloud/storage/StorageOptions$Builder - com.google.cloud.storage.StorageOptions$Builder setOpenTelemetrySdk(io.opentelemetry.sdk.OpenTelemetrySdk) + com.google.cloud.storage.StorageOptions$Builder setOpenTelemetry(io.opentelemetry.api.OpenTelemetry) 7013 com/google/cloud/storage/StorageOptions - io.opentelemetry.sdk.OpenTelemetrySdk getOpenTelemetrySdk() + io.opentelemetry.api.OpenTelemetry getOpenTelemetry() diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 41f08713af..d7472f5ff9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -26,7 +26,6 @@ import static com.google.cloud.storage.StorageV2ProtoUtils.objectAclEntityOrAltEq; import static com.google.cloud.storage.Utils.bucketNameCodec; import static com.google.cloud.storage.Utils.ifNonNull; -import static com.google.cloud.storage.otel.OpenTelemetryTraceUtil.MODULE_STORAGE; import static com.google.common.base.MoreObjects.firstNonNull; import static java.util.Objects.requireNonNull; @@ -69,9 +68,6 @@ import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.cloud.storage.UnifiedOpts.ProjectId; import com.google.cloud.storage.UnifiedOpts.UserProject; -import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; -import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope; -import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -171,7 +167,6 @@ final class GrpcStorageImpl extends BaseService // workaround for https://github.com/googleapis/java-storage/issues/1736 private final Opts defaultOpts; @Deprecated private final ProjectId defaultProjectId; - private final OpenTelemetryTraceUtil openTelemetryTraceUtil; GrpcStorageImpl( GrpcStorageOptions options, @@ -188,7 +183,6 @@ final class GrpcStorageImpl extends BaseService this.retryAlgorithmManager = options.getRetryAlgorithmManager(); this.syntaxDecoders = new SyntaxDecoders(); this.defaultProjectId = UnifiedOpts.projectId(options.getProjectId()); - this.openTelemetryTraceUtil = OpenTelemetryTraceUtil.getInstance(options); } @Override @@ -202,8 +196,6 @@ public void close() throws Exception { @Override public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create", MODULE_STORAGE); Opts opts = Opts.unwrap(options).resolveFrom(bucketInfo).prepend(defaultOpts); GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); @@ -218,20 +210,11 @@ public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) { .setParent("projects/_"); CreateBucketRequest req = opts.createBucketsRequest().apply(builder).build(); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - return Retrying.run( - getOptions(), - retryAlgorithmManager.getFor(req), - () -> storageClient.createBucketCallable().call(req, merge), - syntaxDecoders.bucket); - } catch (Exception ex) { - otelSpan.recordException(ex); - otelSpan.setStatus( - io.opentelemetry.api.trace.StatusCode.ERROR, ex.getClass().getSimpleName()); - throw StorageException.coalesce(ex); - } finally { - otelSpan.end(); - } + return Retrying.run( + getOptions(), + retryAlgorithmManager.getFor(req), + () -> storageClient.createBucketCallable().call(req, merge), + syntaxDecoders.bucket); } @Override @@ -249,26 +232,13 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option public Blob create( BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - // Start the otel span to retain information of the origin of the request - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - return internalDirectUpload(blobInfo, opts, ByteBuffer.wrap(content, offset, length)) - .asBlob(this); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw StorageException.coalesce(e); - } finally { - otelSpan.end(); - } + return internalDirectUpload(blobInfo, opts, ByteBuffer.wrap(content, offset, length)) + .asBlob(this); } @Override public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { + try { requireNonNull(blobInfo, "blobInfo must be non null"); InputStream inputStreamParam = firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)); @@ -295,11 +265,7 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op ApiFuture responseApiFuture = session.getResult(); return this.getBlob(responseApiFuture); } catch (IOException | ApiException e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); - } finally { - otelSpan.end(); } } @@ -312,50 +278,39 @@ public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) @Override public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) throws IOException { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("createFrom", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); - return internalCreateFrom(path, blobInfo, opts); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + return internalCreateFrom(path, blobInfo, opts); } @Override public Blob internalCreateFrom(Path path, BlobInfo info, Opts opts) throws IOException { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("internalCreateFrom", MODULE_STORAGE); requireNonNull(path, "path must be non null"); if (Files.isDirectory(path)) { throw new StorageException(0, path + " is a directory"); } - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(info, opts); - - ClientStreamingCallable write = - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); - - ApiFuture start = startResumableWrite(grpcCallContext, req, opts); - ApiFuture session2 = - ApiFutures.transform( - start, - rw -> - ResumableSession.grpc( - getOptions(), - retryAlgorithmManager.idempotent(), - write, - storageClient.queryWriteStatusCallable(), - rw, - Hasher.noop()), - MoreExecutors.directExecutor()); + + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = getWriteObjectRequest(info, opts); + + ClientStreamingCallable write = + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); + + ApiFuture start = startResumableWrite(grpcCallContext, req, opts); + ApiFuture session2 = + ApiFutures.transform( + start, + rw -> + ResumableSession.grpc( + getOptions(), + retryAlgorithmManager.idempotent(), + write, + storageClient.queryWriteStatusCallable(), + rw, + Hasher.noop()), + MoreExecutors.directExecutor()); + try { GrpcResumableSession got = session2.get(); ResumableOperationResult<@Nullable Object> put = got.put(RewindableContent.of(path)); Object object = put.getObject(); @@ -366,11 +321,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts o } return codecs.blobInfo().decode(object).asBlob(this); } catch (InterruptedException | ExecutionException e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); - } finally { - otelSpan.end(); } } @@ -384,46 +335,37 @@ public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption.. public Blob createFrom( BlobInfo blobInfo, InputStream in, int bufferSize, BlobWriteOption... options) throws IOException { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("createFrom", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - requireNonNull(blobInfo, "blobInfo must be non null"); - - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - - ApiFuture start = startResumableWrite(grpcCallContext, req, opts); + requireNonNull(blobInfo, "blobInfo must be non null"); - BufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel( - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) - .setHasher(Hasher.noop()) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .resumable() - .withRetryConfig(getOptions(), retryAlgorithmManager.idempotent()) - .buffered(Buffers.allocateAligned(bufferSize, _256KiB)) - .setStartAsync(start) - .build(); + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); + + ApiFuture start = startResumableWrite(grpcCallContext, req, opts); + + BufferedWritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel( + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) + .setHasher(Hasher.noop()) + .setByteStringStrategy(ByteStringStrategy.noCopy()) + .resumable() + .withRetryConfig(getOptions(), retryAlgorithmManager.idempotent()) + .buffered(Buffers.allocateAligned(bufferSize, _256KiB)) + .setStartAsync(start) + .build(); - // Specifically not in the try-with, so we don't close the provided stream - ReadableByteChannel src = - Channels.newChannel(firstNonNull(in, new ByteArrayInputStream(ZERO_BYTES))); - try (BufferedWritableByteChannel dst = session.open()) { - ByteStreams.copy(src, dst); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus( - io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw StorageException.coalesce(e); - } - return getBlob(session.getResult()); - } finally { - otelSpan.end(); + // Specifically not in the try-with, so we don't close the provided stream + ReadableByteChannel src = + Channels.newChannel(firstNonNull(in, new ByteArrayInputStream(ZERO_BYTES))); + try (BufferedWritableByteChannel dst = session.open()) { + ByteStreams.copy(src, dst); + } catch (Exception e) { + throw StorageException.coalesce(e); } + return getBlob(session.getResult()); } @Override @@ -694,66 +636,56 @@ public Blob compose(ComposeRequest composeRequest) { @Override public CopyWriter copy(CopyRequest copyRequest) { - Span otelSpan = openTelemetryTraceUtil.startSpan("copy", MODULE_STORAGE); - try (Scope ignored = otelSpan.makeCurrent()) { - BlobId src = copyRequest.getSource(); - BlobInfo dst = copyRequest.getTarget(); - Opts srcOpts = - Opts.unwrap(copyRequest.getSourceOptions()) - .projectAsSource() - .resolveFrom(src) - .prepend(defaultOpts); - Opts dstOpts = - Opts.unwrap(copyRequest.getTargetOptions()).resolveFrom(dst).prepend(defaultOpts); - - Mapper mapper = - srcOpts.rewriteObjectsRequest().andThen(dstOpts.rewriteObjectsRequest()); - - Object srcProto = codecs.blobId().encode(src); - Object dstProto = codecs.blobInfo().encode(dst); - - RewriteObjectRequest.Builder b = - RewriteObjectRequest.newBuilder() - .setDestinationName(dstProto.getName()) - .setDestinationBucket(dstProto.getBucket()) - // destination_kms_key comes from dstOpts - // according to the docs in the protos, it is illegal to populate the following - // fields, - // clear them out if they are set - // destination_predefined_acl comes from dstOpts - // if_*_match come from srcOpts and dstOpts - // copy_source_encryption_* come from srcOpts - // common_object_request_params come from dstOpts - .setDestination(dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build()) - .setSourceBucket(srcProto.getBucket()) - .setSourceObject(srcProto.getName()); - - if (src.getGeneration() != null) { - b.setSourceGeneration(src.getGeneration()); - } - - if (copyRequest.getMegabytesCopiedPerChunk() != null) { - b.setMaxBytesRewrittenPerCall(copyRequest.getMegabytesCopiedPerChunk() * _1MiB); - } + BlobId src = copyRequest.getSource(); + BlobInfo dst = copyRequest.getTarget(); + Opts srcOpts = + Opts.unwrap(copyRequest.getSourceOptions()) + .projectAsSource() + .resolveFrom(src) + .prepend(defaultOpts); + Opts dstOpts = + Opts.unwrap(copyRequest.getTargetOptions()).resolveFrom(dst).prepend(defaultOpts); + + Mapper mapper = + srcOpts.rewriteObjectsRequest().andThen(dstOpts.rewriteObjectsRequest()); + + Object srcProto = codecs.blobId().encode(src); + Object dstProto = codecs.blobInfo().encode(dst); + + RewriteObjectRequest.Builder b = + RewriteObjectRequest.newBuilder() + .setDestinationName(dstProto.getName()) + .setDestinationBucket(dstProto.getBucket()) + // destination_kms_key comes from dstOpts + // according to the docs in the protos, it is illegal to populate the following fields, + // clear them out if they are set + // destination_predefined_acl comes from dstOpts + // if_*_match come from srcOpts and dstOpts + // copy_source_encryption_* come from srcOpts + // common_object_request_params come from dstOpts + .setDestination(dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build()) + .setSourceBucket(srcProto.getBucket()) + .setSourceObject(srcProto.getName()); + + if (src.getGeneration() != null) { + b.setSourceGeneration(src.getGeneration()); + } - RewriteObjectRequest req = mapper.apply(b).build(); - GrpcCallContext grpcCallContext = - srcOpts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - UnaryCallable callable = - storageClient.rewriteObjectCallable().withDefaultCallContext(grpcCallContext); - GrpcCallContext retryContext = Retrying.newCallContext(); - return Retrying.run( - getOptions(), - retryAlgorithmManager.getFor(req), - () -> callable.call(req, retryContext), - (resp) -> new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), resp)); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw StorageException.coalesce(e); - } finally { - otelSpan.end(); + if (copyRequest.getMegabytesCopiedPerChunk() != null) { + b.setMaxBytesRewrittenPerCall(copyRequest.getMegabytesCopiedPerChunk() * _1MiB); } + + RewriteObjectRequest req = mapper.apply(b).build(); + GrpcCallContext grpcCallContext = + srcOpts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + UnaryCallable callable = + storageClient.rewriteObjectCallable().withDefaultCallContext(grpcCallContext); + GrpcCallContext retryContext = Retrying.newCallContext(); + return Retrying.run( + getOptions(), + retryAlgorithmManager.getFor(req), + () -> callable.call(req, retryContext), + (resp) -> new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), resp)); } @Override @@ -763,21 +695,14 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio @Override public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("readAllBytes", MODULE_STORAGE); UnbufferedReadableByteChannelSession session = unbufferedReadSession(blob, options); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent(); - UnbufferedReadableByteChannel r = session.open(); + try (UnbufferedReadableByteChannel r = session.open(); WritableByteChannel w = Channels.newChannel(baos)) { ByteStreams.copy(r, w); } catch (ApiException | IOException e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); - } finally { - otelSpan.end(); } return baos.toByteArray(); } @@ -794,97 +719,66 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption.. @Override public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("reader", MODULE_STORAGE); - try (Scope ignore = otelSpan.makeCurrent()) { - Opts opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); - ReadObjectRequest request = getReadObjectRequest(blob, opts); - GrpcCallContext grpcCallContext = Retrying.newCallContext(); - - return new GrpcBlobReadChannel( - storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), - getOptions(), - retryAlgorithmManager.getFor(request), - responseContentLifecycleManager, - request, - !opts.autoGzipDecompression()); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw StorageException.coalesce(e); - } finally { - otelSpan.end(); - } + Opts opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); + ReadObjectRequest request = getReadObjectRequest(blob, opts); + GrpcCallContext grpcCallContext = Retrying.newCallContext(); + + return new GrpcBlobReadChannel( + storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), + getOptions(), + retryAlgorithmManager.getFor(request), + responseContentLifecycleManager, + request, + !opts.autoGzipDecompression()); } @Override public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("downloadTo", MODULE_STORAGE); UnbufferedReadableByteChannelSession session = unbufferedReadSession(blob, options); - try (Scope ignored = otelSpan.makeCurrent(); - UnbufferedReadableByteChannel r = session.open(); + try (UnbufferedReadableByteChannel r = session.open(); WritableByteChannel w = Files.newByteChannel(path, WRITE_OPS)) { ByteStreams.copy(r, w); } catch (ApiException | IOException e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); - } finally { - otelSpan.end(); } } @Override public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("downloadTo", MODULE_STORAGE); UnbufferedReadableByteChannelSession session = unbufferedReadSession(blob, options); - try (Scope ignored = otelSpan.makeCurrent(); - UnbufferedReadableByteChannel r = session.open(); + try (UnbufferedReadableByteChannel r = session.open(); WritableByteChannel w = Channels.newChannel(outputStream)) { ByteStreams.copy(r, w); } catch (ApiException | IOException e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); - } finally { - otelSpan.end(); } } @Override public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("writer", MODULE_STORAGE); - try (Scope ignore = otelSpan.makeCurrent()) { - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - Hasher hasher = Hasher.noop(); - // in JSON, the starting of the resumable session happens before the invocation of write can - // happen. Emulate the same thing here. - // 1. create the future - ApiFuture startResumableWrite = - startResumableWrite(grpcCallContext, req, opts); - // 2. await the result of the future - ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite); - // 3. wrap the result in another future container before constructing the BlobWriteChannel - ApiFuture wrapped = ApiFutures.immediateFuture(resumableWrite); - return new GrpcBlobWriteChannel( - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext), - getOptions(), - retryAlgorithmManager.idempotent(), - () -> wrapped, - hasher); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw StorageException.coalesce(e); - } finally { - otelSpan.end(); - } + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); + Hasher hasher = Hasher.noop(); + // in JSON, the starting of the resumable session happens before the invocation of write can + // happen. Emulate the same thing here. + // 1. create the future + ApiFuture startResumableWrite = startResumableWrite(grpcCallContext, req, opts); + // 2. await the result of the future + ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite); + // 3. wrap the result in another future container before constructing the BlobWriteChannel + ApiFuture wrapped = ApiFutures.immediateFuture(resumableWrite); + return new GrpcBlobWriteChannel( + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext), + getOptions(), + retryAlgorithmManager.idempotent(), + () -> wrapped, + hasher); } @Override @@ -892,8 +786,6 @@ public BlobInfo internalDirectUpload( BlobInfo blobInfo, Opts opts, ByteBuffer buf) { requireNonNull(blobInfo, "blobInfo must be non null"); requireNonNull(buf, "content must be non null"); - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("internalDirectUpload", MODULE_STORAGE); Opts optsWithDefaults = opts.prepend(defaultOpts); GrpcCallContext grpcCallContext = optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); @@ -901,36 +793,28 @@ public BlobInfo internalDirectUpload( Hasher hasher = Hasher.enabled(); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); RewindableContent content = RewindableContent.of(buf); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - return Retrying.run( - getOptions(), - retryAlgorithmManager.getFor(req), - () -> { - content.rewindTo(0); - UnbufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .setHasher(hasher) - .direct() - .unbuffered() - .setRequest(req) - .build(); - - try (UnbufferedWritableByteChannel c = session.open()) { - content.writeTo(c); - } - return session.getResult(); - }, - this::getBlob); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw StorageException.coalesce(e); - } finally { - otelSpan.end(); - } + return Retrying.run( + getOptions(), + retryAlgorithmManager.getFor(req), + () -> { + content.rewindTo(0); + UnbufferedWritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) + .setByteStringStrategy(ByteStringStrategy.noCopy()) + .setHasher(hasher) + .direct() + .unbuffered() + .setRequest(req) + .build(); + + try (UnbufferedWritableByteChannel c = session.open()) { + content.writeTo(c); + } + return session.getResult(); + }, + this::getBlob); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 0e2a75f922..279418c5f3 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -81,10 +81,11 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.protobuf.ProtoUtils; -import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.api.OpenTelemetry; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; import java.io.Serializable; import java.net.URI; import java.nio.ByteBuffer; @@ -120,7 +121,7 @@ public final class GrpcStorageOptions extends StorageOptions private final boolean grpcClientMetricsManuallyEnabled; private final GrpcInterceptorProvider grpcInterceptorProvider; private final BlobWriteSessionConfig blobWriteSessionConfig; - private final OpenTelemetrySdk openTelemetrySdk; + private transient OpenTelemetry openTelemetry; private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults) { super(builder, serviceDefaults); @@ -137,7 +138,7 @@ private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults) this.grpcClientMetricsManuallyEnabled = builder.grpcMetricsManuallyEnabled; this.grpcInterceptorProvider = builder.grpcInterceptorProvider; this.blobWriteSessionConfig = builder.blobWriteSessionConfig; - this.openTelemetrySdk = builder.openTelemetrySdk; + this.openTelemetry = builder.openTelemetry; } @Override @@ -160,6 +161,11 @@ StorageSettings getStorageSettings() throws IOException { return resolveSettingsAndOpts().x(); } + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.openTelemetry = HttpStorageOptions.getDefaultInstance().getOpenTelemetry(); + } + /** * We have to perform several introspections and detections to cross-wire/support several features * that are either gapic primitives, ServiceOption primitives or GCS semantic requirements. @@ -352,9 +358,11 @@ private Tuple> resolveSettingsAndOpts() throw return Tuple.of(builder.build(), defaultOpts); } + /** @since 2.47.0 This new api is in preview and is subject to breaking changes. */ + @BetaApi @Override - public OpenTelemetrySdk getOpenTelemetrySdk() { - return openTelemetrySdk; + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; } /** @since 2.14.0 */ @@ -372,6 +380,7 @@ public int hashCode() { enableGrpcClientMetrics, grpcInterceptorProvider, blobWriteSessionConfig, + openTelemetry, baseHashCode()); } @@ -390,6 +399,7 @@ public boolean equals(Object o) { && Objects.equals(terminationAwaitDuration, that.terminationAwaitDuration) && Objects.equals(grpcInterceptorProvider, that.grpcInterceptorProvider) && Objects.equals(blobWriteSessionConfig, that.blobWriteSessionConfig) + && Objects.equals(openTelemetry, that.openTelemetry) && this.baseEquals(that); } @@ -431,11 +441,10 @@ public static final class Builder extends StorageOptions.Builder { GrpcStorageDefaults.INSTANCE.grpcInterceptorProvider(); private BlobWriteSessionConfig blobWriteSessionConfig = GrpcStorageDefaults.INSTANCE.getDefaultStorageWriterConfig(); + private OpenTelemetry openTelemetry = GrpcStorageDefaults.INSTANCE.getDefaultOpenTelemetry(); private boolean grpcMetricsManuallyEnabled = false; - private OpenTelemetrySdk openTelemetrySdk; - Builder() {} Builder(StorageOptions options) { @@ -447,7 +456,7 @@ public static final class Builder extends StorageOptions.Builder { this.enableGrpcClientMetrics = gso.enableGrpcClientMetrics; this.grpcInterceptorProvider = gso.grpcInterceptorProvider; this.blobWriteSessionConfig = gso.blobWriteSessionConfig; - this.openTelemetrySdk = gso.openTelemetrySdk; + this.openTelemetry = gso.openTelemetry; } /** @@ -633,13 +642,13 @@ public GrpcStorageOptions.Builder setBlobWriteSessionConfig( /** * Enable OpenTelemetry Tracing and provide an instance for the client to use. * - * @param openTelemetrySdk User defined instance of OpenTelemetry SDK to be used by the library - * @since 2.46.1 This new api is in preview and is subject to breaking changes. + * @param openTelemetry User defined instance of OpenTelemetry to be used by the library + * @since 2.47.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - public GrpcStorageOptions.Builder setOpenTelemetrySdk(OpenTelemetrySdk openTelemetrySdk) { - requireNonNull(openTelemetrySdk, "openTelemetry must be non null"); - this.openTelemetrySdk = openTelemetrySdk; + public GrpcStorageOptions.Builder setOpenTelemetry(OpenTelemetry openTelemetry) { + requireNonNull(openTelemetry, "openTelemetry must be non null"); + this.openTelemetry = openTelemetry; return this; } @@ -719,6 +728,12 @@ public GrpcInterceptorProvider grpcInterceptorProvider() { public BlobWriteSessionConfig getDefaultStorageWriterConfig() { return BlobWriteSessionConfigs.getDefault(); } + + /** @since 2.47.0 This new api is in preview and is subject to breaking changes. */ + @BetaApi + public OpenTelemetry getDefaultOpenTelemetry() { + return OpenTelemetry.noop(); + } } /** @@ -775,22 +790,27 @@ public Storage create(StorageOptions options) { new InternalZeroCopyGrpcStorageStub( stubSettings, clientContext, grpcStorageCallableFactory); StorageClient client = new InternalStorageClient(stub); - return new GrpcStorageImpl( - grpcStorageOptions, - client, - stub.getObjectMediaResponseMarshaller, - grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), - defaultOpts); + GrpcStorageImpl grpcStorage = + new GrpcStorageImpl( + grpcStorageOptions, + client, + stub.getObjectMediaResponseMarshaller, + grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), + defaultOpts); + return OtelStorageDecorator.decorate( + grpcStorage, options.getOpenTelemetry(), Transport.GRPC); } else { StorageClient client = StorageClient.create(storageSettings); - return new GrpcStorageImpl( - grpcStorageOptions, - client, - ResponseContentLifecycleManager.noop(), - grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), - defaultOpts); + GrpcStorageImpl grpcStorage = + new GrpcStorageImpl( + grpcStorageOptions, + client, + ResponseContentLifecycleManager.noop(), + grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), + defaultOpts); + return OtelStorageDecorator.decorate( + grpcStorage, options.getOpenTelemetry(), Transport.GRPC); } - } catch (IOException e) { throw new IllegalStateException( "Unable to instantiate gRPC com.google.cloud.storage.Storage client.", e); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java index 69fe141a67..e440765fe0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java @@ -38,7 +38,7 @@ import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; -import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; @@ -60,7 +60,7 @@ public class HttpStorageOptions extends StorageOptions { private transient RetryDependenciesAdapter retryDepsAdapter; private final BlobWriteSessionConfig blobWriteSessionConfig; - private final OpenTelemetrySdk openTelemetrySdk; + private transient OpenTelemetry openTelemetry; private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { super(builder, serviceDefaults); @@ -70,7 +70,7 @@ private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { builder.storageRetryStrategy, defaults().getStorageRetryStrategy())); retryDepsAdapter = new RetryDependenciesAdapter(); blobWriteSessionConfig = builder.blobWriteSessionConfig; - openTelemetrySdk = builder.openTelemetrySdk; + openTelemetry = builder.openTelemetry; } @Override @@ -88,9 +88,11 @@ StorageRpc getStorageRpcV1() { return (StorageRpc) getRpc(); } + /** @since 2.47.0 This new api is in preview and is subject to breaking changes. */ + @BetaApi @Override - public OpenTelemetrySdk getOpenTelemetrySdk() { - return openTelemetrySdk; + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; } @Override @@ -100,7 +102,8 @@ public HttpStorageOptions.Builder toBuilder() { @Override public int hashCode() { - return Objects.hash(retryAlgorithmManager, blobWriteSessionConfig, baseHashCode()); + return Objects.hash( + retryAlgorithmManager, blobWriteSessionConfig, openTelemetry, baseHashCode()); } @Override @@ -114,12 +117,14 @@ public boolean equals(Object o) { HttpStorageOptions that = (HttpStorageOptions) o; return Objects.equals(retryAlgorithmManager, that.retryAlgorithmManager) && Objects.equals(blobWriteSessionConfig, that.blobWriteSessionConfig) + && Objects.equals(openTelemetry, that.openTelemetry) && this.baseEquals(that); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); this.retryDepsAdapter = new RetryDependenciesAdapter(); + this.openTelemetry = HttpStorageOptions.getDefaultInstance().getOpenTelemetry(); } public static HttpStorageOptions.Builder newBuilder() { @@ -144,7 +149,7 @@ public static class Builder extends StorageOptions.Builder { private StorageRetryStrategy storageRetryStrategy; private BlobWriteSessionConfig blobWriteSessionConfig = HttpStorageDefaults.INSTANCE.getDefaultStorageWriterConfig(); - private OpenTelemetrySdk openTelemetrySdk; + private OpenTelemetry openTelemetry = HttpStorageDefaults.INSTANCE.getDefaultOpenTelemetry(); Builder() {} @@ -153,7 +158,7 @@ public static class Builder extends StorageOptions.Builder { HttpStorageOptions hso = (HttpStorageOptions) options; this.storageRetryStrategy = hso.retryAlgorithmManager.retryStrategy; this.blobWriteSessionConfig = hso.blobWriteSessionConfig; - this.openTelemetrySdk = hso.getOpenTelemetrySdk(); + this.openTelemetry = hso.getOpenTelemetry(); } @Override @@ -283,13 +288,13 @@ public HttpStorageOptions build() { /** * Enable OpenTelemetry Tracing and provide an instance for the client to use. * - * @param openTelemetrySdk - * @since 2.46.1 This new api is in preview and is subject to breaking changes. + * @param openTelemetry User defined instance of OpenTelemetry to be used by the library + * @since 2.47.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - public HttpStorageOptions.Builder setOpenTelemetrySdk(OpenTelemetrySdk openTelemetrySdk) { - requireNonNull(openTelemetrySdk, "openTelemetry must be non null"); - this.openTelemetrySdk = openTelemetrySdk; + public HttpStorageOptions.Builder setOpenTelemetry(OpenTelemetry openTelemetry) { + requireNonNull(openTelemetry, "openTelemetry must be non null"); + this.openTelemetry = openTelemetry; return this; } } @@ -325,6 +330,12 @@ public StorageRetryStrategy getStorageRetryStrategy() { public BlobWriteSessionConfig getDefaultStorageWriterConfig() { return BlobWriteSessionConfigs.getDefault(); } + + /** @since 2.47.0 This new api is in preview and is subject to breaking changes. */ + @BetaApi + public OpenTelemetry getDefaultOpenTelemetry() { + return OpenTelemetry.noop(); + } } /** @@ -365,8 +376,12 @@ public Storage create(StorageOptions options) { HttpStorageOptions httpStorageOptions = (HttpStorageOptions) options; Clock clock = Clock.systemUTC(); try { - return new StorageImpl( - httpStorageOptions, httpStorageOptions.blobWriteSessionConfig.createFactory(clock)); + StorageImpl storage = + new StorageImpl( + httpStorageOptions, + httpStorageOptions.blobWriteSessionConfig.createFactory(clock)); + return OtelStorageDecorator.decorate( + storage, httpStorageOptions.getOpenTelemetry(), Transport.HTTP); } catch (IOException e) { throw new IllegalStateException( "Unable to instantiate HTTP com.google.cloud.storage.Storage client.", e); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java new file mode 100644 index 0000000000..a1813a98c5 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java @@ -0,0 +1,1733 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static java.util.Objects.requireNonNull; + +import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; +import com.google.api.gax.paging.Page; +import com.google.cloud.Policy; +import com.google.cloud.ReadChannel; +import com.google.cloud.RestorableState; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Acl.Entity; +import com.google.cloud.storage.HmacKey.HmacKeyMetadata; +import com.google.cloud.storage.HmacKey.HmacKeyState; +import com.google.cloud.storage.PostPolicyV4.PostConditionsV4; +import com.google.cloud.storage.PostPolicyV4.PostFieldsV4; +import com.google.cloud.storage.TransportCompatibility.Transport; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings("DuplicatedCode") +final class OtelStorageDecorator implements Storage { + + /** Becomes the {@code otel.scope.name} attribute in a span */ + private static final String OTEL_SCOPE_NAME = "cloud.google.com/java/storage"; + + private final Storage delegate; + private final OpenTelemetry otel; + private final Attributes baseAttributes; + private final Tracer tracer; + + private OtelStorageDecorator(Storage delegate, OpenTelemetry otel, Attributes baseAttributes) { + this.delegate = delegate; + this.otel = otel; + this.baseAttributes = baseAttributes; + this.tracer = + TracerDecorator.decorate(null, otel, baseAttributes, Storage.class.getName() + "/"); + } + + @Override + public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) { + Span span = + tracer + .spanBuilder("create") + .setAttribute("gsutil.uri", fmtBucket(bucketInfo.getName())) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.create(bucketInfo, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob create(BlobInfo blobInfo, BlobTargetOption... options) { + Span span = + tracer + .spanBuilder("create") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.create(blobInfo, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options) { + Span span = + tracer + .spanBuilder("create") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.create(blobInfo, content, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob create( + BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) { + Span span = + tracer + .spanBuilder("create") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.create(blobInfo, content, offset, length, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + @Deprecated + public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) { + Span span = + tracer + .spanBuilder("create") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.create(blobInfo, content, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) + throws IOException { + Span span = + tracer + .spanBuilder("createFrom") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createFrom(blobInfo, path, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) + throws IOException { + Span span = + tracer + .spanBuilder("createFrom") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createFrom(blobInfo, path, bufferSize, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) + throws IOException { + Span span = + tracer + .spanBuilder("createFrom") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createFrom(blobInfo, content, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob createFrom( + BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options) + throws IOException { + Span span = + tracer + .spanBuilder("createFrom") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createFrom(blobInfo, content, bufferSize, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Bucket get(String bucket, BucketGetOption... options) { + Span span = tracer.spanBuilder("get").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.get(bucket, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Bucket lockRetentionPolicy(BucketInfo bucket, BucketTargetOption... options) { + Span span = + tracer + .spanBuilder("lockRetentionPolicy") + .setAttribute("gsutil.uri", fmtBucket(bucket.getName())) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.lockRetentionPolicy(bucket, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob get(String bucket, String blob, BlobGetOption... options) { + Span span = + tracer + .spanBuilder("get") + .setAttribute("gsutil.uri", String.format("gs://%s/%s", bucket, blob)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.get(bucket, blob, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob get(BlobId blob, BlobGetOption... options) { + Span span = + tracer + .spanBuilder("get") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.get(blob, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob get(BlobId blob) { + Span span = + tracer + .spanBuilder("get") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.get(blob); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob restore(BlobId blob, BlobRestoreOption... options) { + Span span = + tracer + .spanBuilder("restore") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.restore(blob, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Page list(BucketListOption... options) { + Span span = tracer.spanBuilder("list").setAttribute("gsutil.uri", "gs://").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.list(options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Page list(String bucket, BlobListOption... options) { + Span span = + tracer.spanBuilder("list").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.list(bucket, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Bucket update(BucketInfo bucketInfo, BucketTargetOption... options) { + Span span = + tracer + .spanBuilder("update") + .setAttribute("gsutil.uri", fmtBucket(bucketInfo.getName())) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.update(bucketInfo, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob update(BlobInfo blobInfo, BlobTargetOption... options) { + Span span = + tracer + .spanBuilder("update") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.update(blobInfo, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob update(BlobInfo blobInfo) { + Span span = + tracer + .spanBuilder("update") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.update(blobInfo); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean delete(String bucket, BucketSourceOption... options) { + Span span = + tracer.spanBuilder("delete").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.delete(bucket, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean delete(String bucket, String blob, BlobSourceOption... options) { + Span span = + tracer.spanBuilder("delete").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.delete(bucket, blob, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean delete(BlobId blob, BlobSourceOption... options) { + Span span = + tracer + .spanBuilder("delete") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.delete(blob, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean delete(BlobId blob) { + Span span = + tracer + .spanBuilder("delete") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.delete(blob); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Blob compose(ComposeRequest composeRequest) { + Span span = + tracer + .spanBuilder("compose") + .setAttribute("gsutil.uri", composeRequest.getTarget().getBlobId().toGsUtilUri()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.compose(composeRequest); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public CopyWriter copy(CopyRequest copyRequest) { + Span span = + tracer + .spanBuilder("copy") + .setAttribute("gsutil.uri", copyRequest.getTarget().getBlobId().toGsUtilUri()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + CopyWriter copyWriter = delegate.copy(copyRequest); + return new OtelDecoratedCopyWriter(copyWriter, span); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + span.end(); + throw t; + } + } + + @Override + public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... options) { + Span span = + tracer + .spanBuilder("readAllBytes") + .setAttribute("gsutil.uri", BlobId.of(bucket, blob).toGsUtilUri()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.readAllBytes(bucket, blob, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { + Span span = + tracer + .spanBuilder("readAllBytes") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.readAllBytes(blob, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public StorageBatch batch() { + return delegate.batch(); + } + + @Override + public ReadChannel reader(String bucket, String blob, BlobSourceOption... options) { + Span span = + tracer + .spanBuilder("reader") + .setAttribute("gsutil.uri", BlobId.of(bucket, blob).toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + ReadChannel reader = delegate.reader(bucket, blob, options); + return new OtelDecoratedReadChannel(reader, span); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + span.end(); + throw t; + } + } + + @Override + public ReadChannel reader(BlobId blob, BlobSourceOption... options) { + Span span = + tracer + .spanBuilder("reader") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + ReadChannel reader = delegate.reader(blob, options); + return new OtelDecoratedReadChannel(reader, span); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + span.end(); + throw t; + } + } + + @Override + public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) { + Span span = + tracer + .spanBuilder("downloadTo") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + delegate.downloadTo(blob, path, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) { + Span span = + tracer + .spanBuilder("downloadTo") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + delegate.downloadTo(blob, outputStream, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { + Span sessionSpan = + tracer + .spanBuilder("writer") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = sessionSpan.makeCurrent()) { + WriteChannel writer = delegate.writer(blobInfo, options); + return new OtelDecoratedWriteChannel(writer, sessionSpan); + } catch (Throwable t) { + sessionSpan.recordException(t); + sessionSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + sessionSpan.end(); + throw t; + } + } + + @Override + public WriteChannel writer(URL signedURL) { + Span sessionSpan = tracer.spanBuilder("writer").startSpan(); + try (Scope ignore = sessionSpan.makeCurrent()) { + WriteChannel writer = delegate.writer(signedURL); + return new OtelDecoratedWriteChannel(writer, sessionSpan); + } catch (Throwable t) { + sessionSpan.recordException(t); + sessionSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + sessionSpan.end(); + throw t; + } + } + + @Override + public URL signUrl(BlobInfo blobInfo, long duration, TimeUnit unit, SignUrlOption... options) { + Span span = + tracer + .spanBuilder("signUrl") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.signUrl(blobInfo, duration, unit, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public PostPolicyV4 generateSignedPostPolicyV4( + BlobInfo blobInfo, + long duration, + TimeUnit unit, + PostFieldsV4 fields, + PostConditionsV4 conditions, + PostPolicyV4Option... options) { + Span span = + tracer + .spanBuilder("generateSignedPostPolicyV4") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.generateSignedPostPolicyV4( + blobInfo, duration, unit, fields, conditions, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public PostPolicyV4 generateSignedPostPolicyV4( + BlobInfo blobInfo, + long duration, + TimeUnit unit, + PostFieldsV4 fields, + PostPolicyV4Option... options) { + Span span = + tracer + .spanBuilder("generateSignedPostPolicyV4") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.generateSignedPostPolicyV4(blobInfo, duration, unit, fields, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public PostPolicyV4 generateSignedPostPolicyV4( + BlobInfo blobInfo, + long duration, + TimeUnit unit, + PostConditionsV4 conditions, + PostPolicyV4Option... options) { + Span span = + tracer + .spanBuilder("generateSignedPostPolicyV4") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.generateSignedPostPolicyV4(blobInfo, duration, unit, conditions, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public PostPolicyV4 generateSignedPostPolicyV4( + BlobInfo blobInfo, long duration, TimeUnit unit, PostPolicyV4Option... options) { + Span span = + tracer + .spanBuilder("generateSignedPostPolicyV4") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.generateSignedPostPolicyV4(blobInfo, duration, unit, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List get(BlobId... blobIds) { + Span span = tracer.spanBuilder("get").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.get(blobIds); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List get(Iterable blobIds) { + Span span = tracer.spanBuilder("get").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.get(blobIds); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List update(BlobInfo... blobInfos) { + Span span = tracer.spanBuilder("update").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.update(blobInfos); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List update(Iterable blobInfos) { + Span span = tracer.spanBuilder("update").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.update(blobInfos); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List delete(BlobId... blobIds) { + Span span = tracer.spanBuilder("delete").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.delete(blobIds); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List delete(Iterable blobIds) { + Span span = tracer.spanBuilder("delete").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.delete(blobIds); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl getAcl(String bucket, Entity entity, BucketSourceOption... options) { + Span span = + tracer.spanBuilder("getAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getAcl(bucket, entity, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl getAcl(String bucket, Entity entity) { + Span span = + tracer.spanBuilder("getAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getAcl(bucket, entity); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean deleteAcl(String bucket, Entity entity, BucketSourceOption... options) { + Span span = + tracer.spanBuilder("deleteAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.deleteAcl(bucket, entity, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean deleteAcl(String bucket, Entity entity) { + Span span = + tracer.spanBuilder("deleteAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.deleteAcl(bucket, entity); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl createAcl(String bucket, Acl acl, BucketSourceOption... options) { + Span span = + tracer.spanBuilder("createAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createAcl(bucket, acl, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl createAcl(String bucket, Acl acl) { + Span span = + tracer.spanBuilder("createAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createAcl(bucket, acl); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl updateAcl(String bucket, Acl acl, BucketSourceOption... options) { + Span span = + tracer.spanBuilder("updateAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.updateAcl(bucket, acl, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl updateAcl(String bucket, Acl acl) { + Span span = + tracer.spanBuilder("updateAcl").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.updateAcl(bucket, acl); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List listAcls(String bucket, BucketSourceOption... options) { + Span span = + tracer.spanBuilder("listAcls").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.listAcls(bucket, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List listAcls(String bucket) { + Span span = + tracer.spanBuilder("listAcls").setAttribute("gsutil.uri", fmtBucket(bucket)).startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.listAcls(bucket); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl getDefaultAcl(String bucket, Entity entity) { + Span span = + tracer + .spanBuilder("getDefaultAcl") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getDefaultAcl(bucket, entity); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean deleteDefaultAcl(String bucket, Entity entity) { + Span span = + tracer + .spanBuilder("deleteDefaultAcl") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.deleteDefaultAcl(bucket, entity); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl createDefaultAcl(String bucket, Acl acl) { + Span span = + tracer + .spanBuilder("createDefaultAcl") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createDefaultAcl(bucket, acl); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl updateDefaultAcl(String bucket, Acl acl) { + Span span = + tracer + .spanBuilder("updateDefaultAcl") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.updateDefaultAcl(bucket, acl); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List listDefaultAcls(String bucket) { + Span span = + tracer + .spanBuilder("listDefaultAcls") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.listDefaultAcls(bucket); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl getAcl(BlobId blob, Entity entity) { + Span span = + tracer + .spanBuilder("getAcl") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getAcl(blob, entity); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean deleteAcl(BlobId blob, Entity entity) { + Span span = + tracer + .spanBuilder("deleteAcl") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.deleteAcl(blob, entity); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl createAcl(BlobId blob, Acl acl) { + Span span = + tracer + .spanBuilder("createAcl") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createAcl(blob, acl); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Acl updateAcl(BlobId blob, Acl acl) { + Span span = + tracer + .spanBuilder("updateAcl") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.updateAcl(blob, acl); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List listAcls(BlobId blob) { + Span span = + tracer + .spanBuilder("listAcls") + .setAttribute("gsutil.uri", blob.toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.listAcls(blob); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public HmacKey createHmacKey(ServiceAccount serviceAccount, CreateHmacKeyOption... options) { + Span span = tracer.spanBuilder("createHmacKey").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createHmacKey(serviceAccount, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Page listHmacKeys(ListHmacKeysOption... options) { + Span span = tracer.spanBuilder("listHmacKeys").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.listHmacKeys(options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public HmacKeyMetadata getHmacKey(String accessId, GetHmacKeyOption... options) { + Span span = tracer.spanBuilder("getHmacKey").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getHmacKey(accessId, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public void deleteHmacKey(HmacKeyMetadata hmacKeyMetadata, DeleteHmacKeyOption... options) { + Span span = tracer.spanBuilder("deleteHmacKey").startSpan(); + try (Scope ignore = span.makeCurrent()) { + delegate.deleteHmacKey(hmacKeyMetadata, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public HmacKeyMetadata updateHmacKeyState( + HmacKeyMetadata hmacKeyMetadata, HmacKeyState state, UpdateHmacKeyOption... options) { + Span span = tracer.spanBuilder("updateHmacKeyState").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.updateHmacKeyState(hmacKeyMetadata, state, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Policy getIamPolicy(String bucket, BucketSourceOption... options) { + Span span = + tracer + .spanBuilder("getIamPolicy") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getIamPolicy(bucket, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Policy setIamPolicy(String bucket, Policy policy, BucketSourceOption... options) { + Span span = + tracer + .spanBuilder("setIamPolicy") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.setIamPolicy(bucket, policy, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List testIamPermissions( + String bucket, List permissions, BucketSourceOption... options) { + Span span = + tracer + .spanBuilder("testIamPermissions") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.testIamPermissions(bucket, permissions, options); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public ServiceAccount getServiceAccount(String projectId) { + Span span = tracer.spanBuilder("getServiceAccount").startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getServiceAccount(projectId); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Notification createNotification(String bucket, NotificationInfo notificationInfo) { + Span span = + tracer + .spanBuilder("createNotification") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.createNotification(bucket, notificationInfo); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public Notification getNotification(String bucket, String notificationId) { + Span span = + tracer + .spanBuilder("getNotification") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.getNotification(bucket, notificationId); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public List listNotifications(String bucket) { + Span span = + tracer + .spanBuilder("listNotifications") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.listNotifications(bucket); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public boolean deleteNotification(String bucket, String notificationId) { + Span span = + tracer + .spanBuilder("deleteNotification") + .setAttribute("gsutil.uri", fmtBucket(bucket)) + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + return delegate.deleteNotification(bucket, notificationId); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public void close() throws Exception { + delegate.close(); + } + + @Override + @BetaApi + public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... options) { + Span sessionSpan = + tracer + .spanBuilder("blobWriteSession") + .setAttribute("gsutil.uri", blobInfo.getBlobId().toGsUtilUriWithGeneration()) + .startSpan(); + try (Scope ignore = sessionSpan.makeCurrent()) { + BlobWriteSession session = delegate.blobWriteSession(blobInfo, options); + return new OtelDecoratedBlobWriteSession(session); + } catch (Throwable t) { + sessionSpan.recordException(t); + sessionSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + sessionSpan.end(); + } + } + + @Override + public StorageOptions getOptions() { + return delegate.getOptions(); + } + + static Storage decorate(Storage delegate, OpenTelemetry otel, Transport transport) { + requireNonNull(delegate, "delegate must be non null"); + requireNonNull(otel, "otel must be non null"); + if (otel == OpenTelemetry.noop()) { + return delegate; + } + Attributes baseAttributes = + Attributes.builder() + .put("gcp.client.service", "Storage") + .put("gcp.client.version", StorageOptions.getDefaultInstance().getLibraryVersion()) + .put("gcp.client.repo", "googleapis/java-storage") + .put("gcp.client.artifact", "com.google.cloud:google-cloud-storage") + .put("rpc.system", transport.toString().toLowerCase(Locale.ROOT)) + .put("service.name", "storage.googleapis.com") + .build(); + return new OtelStorageDecorator(delegate, otel, baseAttributes); + } + + private static @NonNull String fmtBucket(String bucket) { + return String.format("gs://%s/", bucket); + } + + private static final class TracerDecorator implements Tracer { + @Nullable private final Context parentContextOverride; + private final Tracer delegate; + private final Attributes baseAttributes; + private final String spanNamePrefix; + + private TracerDecorator( + @Nullable Context parentContextOverride, + Tracer delegate, + Attributes baseAttributes, + String spanNamePrefix) { + this.parentContextOverride = parentContextOverride; + this.delegate = delegate; + this.baseAttributes = baseAttributes; + this.spanNamePrefix = spanNamePrefix; + } + + private static TracerDecorator decorate( + @Nullable Context parentContextOverride, + OpenTelemetry otel, + Attributes baseAttributes, + String spanNamePrefix) { + requireNonNull(otel, "otel must be non null"); + requireNonNull(baseAttributes, "baseAttributes must be non null"); + requireNonNull(spanNamePrefix, "spanNamePrefix must be non null"); + Tracer tracer = + otel.getTracer(OTEL_SCOPE_NAME, StorageOptions.getDefaultInstance().getLibraryVersion()); + return new TracerDecorator(parentContextOverride, tracer, baseAttributes, spanNamePrefix); + } + + @Override + public SpanBuilder spanBuilder(String spanName) { + SpanBuilder spanBuilder = + delegate.spanBuilder(spanNamePrefix + spanName).setAllAttributes(baseAttributes); + if (parentContextOverride != null) { + spanBuilder.setParent(parentContextOverride); + } + return spanBuilder; + } + } + + private static final class OtelDecoratedReadChannel implements ReadChannel { + + private final ReadChannel reader; + private final Span span; + + private OtelDecoratedReadChannel(ReadChannel reader, Span span) { + this.reader = reader; + this.span = span; + } + + @Override + public void seek(long position) throws IOException { + reader.seek(position); + } + + @Override + public void setChunkSize(int chunkSize) { + reader.setChunkSize(chunkSize); + } + + @Override + public RestorableState capture() { + return reader.capture(); + } + + @Override + public ReadChannel limit(long limit) { + return reader.limit(limit); + } + + @Override + public long limit() { + return reader.limit(); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return reader.read(dst); + } + + @Override + public boolean isOpen() { + return reader.isOpen(); + } + + @Override + public void close() { + try { + reader.close(); + } finally { + span.end(); + } + } + } + + private final class OtelDecoratedBlobWriteSession implements BlobWriteSession { + + private final BlobWriteSession delegate; + private final Tracer tracer; + + public OtelDecoratedBlobWriteSession(BlobWriteSession delegate) { + this.delegate = delegate; + this.tracer = + TracerDecorator.decorate( + Context.current(), + otel, + OtelStorageDecorator.this.baseAttributes, + BlobWriteSession.class.getName() + "/"); + } + + @Override + public WritableByteChannel open() throws IOException { + Span openSpan = tracer.spanBuilder("open").startSpan(); + try (Scope ignore = openSpan.makeCurrent()) { + WritableByteChannel delegate = this.delegate.open(); + return new OtelDecoratingWritableByteChannel(delegate, openSpan); + } catch (Throwable t) { + openSpan.recordException(t); + openSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } + } + + @Override + public ApiFuture getResult() { + return delegate.getResult(); + } + + private class OtelDecoratingWritableByteChannel implements WritableByteChannel { + + private final WritableByteChannel delegate; + private final Span openSpan; + + private OtelDecoratingWritableByteChannel(WritableByteChannel delegate, Span openSpan) { + this.delegate = delegate; + this.openSpan = openSpan; + } + + @Override + public int write(ByteBuffer src) throws IOException { + return delegate.write(src); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public void close() throws IOException { + try { + delegate.close(); + } finally { + openSpan.end(); + } + } + } + } + + private static final class OtelDecoratedWriteChannel implements WriteChannel { + private final WriteChannel delegate; + private final Span openSpan; + + private OtelDecoratedWriteChannel(WriteChannel delegate, Span openSpan) { + this.delegate = delegate; + this.openSpan = openSpan; + } + + @Override + public void setChunkSize(int chunkSize) { + delegate.setChunkSize(chunkSize); + } + + @Override + public RestorableState capture() { + return delegate.capture(); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return delegate.write(src); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public void close() throws IOException { + try { + delegate.close(); + } finally { + openSpan.end(); + } + } + } + + private final class OtelDecoratedCopyWriter extends CopyWriter { + + private final CopyWriter copyWriter; + private final Span span; + private final Context parentContext; + private final Tracer tracer; + + public OtelDecoratedCopyWriter(CopyWriter copyWriter, Span span) { + this.copyWriter = copyWriter; + this.span = span; + this.parentContext = Context.current(); + this.tracer = + TracerDecorator.decorate( + Context.current(), + otel, + OtelStorageDecorator.this.baseAttributes, + CopyWriter.class.getName() + "/"); + } + + @Override + public Blob getResult() { + try { + return copyWriter.getResult(); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } + + @Override + public long getBlobSize() { + return copyWriter.getBlobSize(); + } + + @Override + public boolean isDone() { + boolean done = copyWriter.isDone(); + if (done) { + span.end(); + } + return done; + } + + @Override + public long getTotalBytesCopied() { + return copyWriter.getTotalBytesCopied(); + } + + @Override + public RestorableState capture() { + return copyWriter.capture(); + } + + @Override + public void copyChunk() { + Span copyChunkSpan = tracer.spanBuilder("copyChunk").setParent(parentContext).startSpan(); + try (Scope ignore = copyChunkSpan.makeCurrent()) { + copyWriter.copyChunk(); + } catch (Throwable t) { + copyChunkSpan.recordException(t); + copyChunkSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + span.end(); + throw t; + } finally { + copyChunkSpan.end(); + } + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java index db807ce6ce..b824acf4d5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java @@ -54,6 +54,7 @@ import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import io.grpc.Status.Code; +import io.opentelemetry.context.Context; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; @@ -145,7 +146,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab BlobInfo ultimateObject, Opts opts) { this.bufferPool = bufferPool; - this.exec = exec; + this.exec = Context.current().wrap(exec); this.partNamingStrategy = partNamingStrategy; this.partCleanupStrategy = partCleanupStrategy; this.maxElementsPerCompact = maxElementsPerCompact; @@ -154,7 +155,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab this.storage = storage; this.ultimateObject = ultimateObject; this.opts = opts; - this.queue = AsyncAppendingQueue.of(exec, maxElementsPerCompact, this::compose); + this.queue = AsyncAppendingQueue.of(this.exec, maxElementsPerCompact, this::compose); this.pendingParts = new ArrayList<>(); // this can be modified by another thread this.successfulParts = Collections.synchronizedList(new ArrayList<>()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 1284067c1e..ee538bcde8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -17,7 +17,6 @@ package com.google.cloud.storage; import static com.google.cloud.storage.SignedUrlEncodingHelper.Rfc3986UriEncode; -import static com.google.cloud.storage.otel.OpenTelemetryTraceUtil.MODULE_STORAGE; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -51,9 +50,6 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; -import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; -import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope; -import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest; import com.google.common.base.CharMatcher; @@ -69,7 +65,6 @@ import com.google.common.io.BaseEncoding; import com.google.common.io.CountingOutputStream; import com.google.common.primitives.Ints; -import io.opentelemetry.api.trace.StatusCode; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -127,38 +122,26 @@ final class StorageImpl extends BaseService implements Storage, final HttpRetryAlgorithmManager retryAlgorithmManager; final StorageRpc storageRpc; final WriterFactory writerFactory; - private final OpenTelemetryTraceUtil openTelemetryTraceUtil; StorageImpl(HttpStorageOptions options, WriterFactory writerFactory) { super(options); this.retryAlgorithmManager = options.getRetryAlgorithmManager(); this.storageRpc = options.getStorageRpcV1(); this.writerFactory = writerFactory; - this.openTelemetryTraceUtil = OpenTelemetryTraceUtil.getInstance(options); } @Override public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - final com.google.api.services.storage.model.Bucket bucketPb = - codecs.bucketInfo().encode(bucketInfo); - final Map optionsMap = - Opts.unwrap(options).resolveFrom(bucketInfo).getRpcOptions(); - ResultRetryAlgorithm algorithm = - retryAlgorithmManager.getForBucketsCreate(bucketPb, optionsMap); - return run( - algorithm, - () -> storageRpc.create(bucketPb, optionsMap), - (b) -> Conversions.json().bucketInfo().decode(b).asBucket(this)); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + final com.google.api.services.storage.model.Bucket bucketPb = + codecs.bucketInfo().encode(bucketInfo); + final Map optionsMap = + Opts.unwrap(options).resolveFrom(bucketInfo).getRpcOptions(); + ResultRetryAlgorithm algorithm = + retryAlgorithmManager.getForBucketsCreate(bucketPb, optionsMap); + return run( + algorithm, + () -> storageRpc.create(bucketPb, optionsMap), + (b) -> Conversions.json().bucketInfo().decode(b).asBucket(this)); } @Override @@ -211,29 +194,19 @@ public Blob create( @Override @Deprecated public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - Map optionsMap = opts.getRpcOptions(); - BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); - BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); - StorageObject blobPb = codecs.blobInfo().encode(updated); - InputStream inputStreamParam = - firstNonNull(content, new ByteArrayInputStream(EMPTY_BYTE_ARRAY)); - // retries are not safe when the input is an InputStream, so we can't retry. - BlobInfo info = - Conversions.json() - .blobInfo() - .decode(storageRpc.create(blobPb, inputStreamParam, optionsMap)); - return info.asBlob(this); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); + Map optionsMap = opts.getRpcOptions(); + BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + StorageObject blobPb = codecs.blobInfo().encode(updated); + InputStream inputStreamParam = + firstNonNull(content, new ByteArrayInputStream(EMPTY_BYTE_ARRAY)); + // retries are not safe when the input is an InputStream, so we can't retry. + BlobInfo info = + Conversions.json() + .blobInfo() + .decode(storageRpc.create(blobPb, inputStreamParam, optionsMap)); + return info.asBlob(this); } private Blob internalCreate( @@ -242,32 +215,22 @@ private Blob internalCreate( final int offset, final int length, Opts opts) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - Preconditions.checkNotNull(content); - final Map optionsMap = opts.getRpcOptions(); - - BlobInfo updated = opts.blobInfoMapper().apply(info.toBuilder()).build(); - final StorageObject blobPb = codecs.blobInfo().encode(updated); - ResultRetryAlgorithm algorithm = - retryAlgorithmManager.getForObjectsCreate(blobPb, optionsMap); - return run( - algorithm, - () -> - storageRpc.create( - blobPb, new ByteArrayInputStream(content, offset, length), optionsMap), - (x) -> { - BlobInfo info1 = Conversions.json().blobInfo().decode(x); - return info1.asBlob(this); - }); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + Preconditions.checkNotNull(content); + final Map optionsMap = opts.getRpcOptions(); + + BlobInfo updated = opts.blobInfoMapper().apply(info.toBuilder()).build(); + final StorageObject blobPb = codecs.blobInfo().encode(updated); + ResultRetryAlgorithm algorithm = + retryAlgorithmManager.getForObjectsCreate(blobPb, optionsMap); + return run( + algorithm, + () -> + storageRpc.create( + blobPb, new ByteArrayInputStream(content, offset, length), optionsMap), + (x) -> { + BlobInfo info1 = Conversions.json().blobInfo().decode(x); + return info1.asBlob(this); + }); } @Override @@ -279,55 +242,45 @@ public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) @Override public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) throws IOException { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("createFrom", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - if (Files.isDirectory(path)) { - throw new StorageException(0, path + " is a directory"); - } - long size = Files.size(path); - if (size == 0L) { - return create(blobInfo, null, options); - } - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - final Map optionsMap = opts.getRpcOptions(); - BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); - BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); - StorageObject encode = codecs.blobInfo().encode(updated); - - Supplier uploadIdSupplier = - ResumableMedia.startUploadForBlobInfo( - getOptions(), - updated, - optionsMap, - retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); - JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); - - JsonResumableSession session = - ResumableSession.json( - HttpClientContext.from(storageRpc), - getOptions().asRetryDependencies(), - retryAlgorithmManager.idempotent(), - jsonResumableWrite); - HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.explicit(0L, size), size); - ResumableOperationResult put = - session.put(RewindableContent.of(path), contentRange); - // all exception translation is taken care of down in the JsonResumableSession - StorageObject object = put.getObject(); - if (object == null) { - // if by some odd chance the put didn't get the StorageObject, query for it - ResumableOperationResult<@Nullable StorageObject> query = session.query(); - object = query.getObject(); - } - return codecs.blobInfo().decode(object).asBlob(this); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); + if (Files.isDirectory(path)) { + throw new StorageException(0, path + " is a directory"); + } + long size = Files.size(path); + if (size == 0L) { + return create(blobInfo, null, options); } + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); + final Map optionsMap = opts.getRpcOptions(); + BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + StorageObject encode = codecs.blobInfo().encode(updated); + + Supplier uploadIdSupplier = + ResumableMedia.startUploadForBlobInfo( + getOptions(), + updated, + optionsMap, + retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); + JsonResumableWrite jsonResumableWrite = + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + + JsonResumableSession session = + ResumableSession.json( + HttpClientContext.from(storageRpc), + getOptions().asRetryDependencies(), + retryAlgorithmManager.idempotent(), + jsonResumableWrite); + HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.explicit(0L, size), size); + ResumableOperationResult put = + session.put(RewindableContent.of(path), contentRange); + // all exception translation is taken care of down in the JsonResumableSession + StorageObject object = put.getObject(); + if (object == null) { + // if by some odd chance the put didn't get the StorageObject, query for it + ResumableOperationResult<@Nullable StorageObject> query = session.query(); + object = query.getObject(); + } + return codecs.blobInfo().decode(object).asBlob(this); } @Override @@ -340,35 +293,20 @@ public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption.. public Blob createFrom( BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options) throws IOException { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("createFrom", MODULE_STORAGE); - try (OpenTelemetryTraceUtil.Scope ignored = otelSpan.makeCurrent()) { - - ApiFuture objectFuture; - try (StorageWriteChannel writer = writer(blobInfo, options)) { - objectFuture = writer.getObject(); - uploadHelper(Channels.newChannel(content), writer, bufferSize); - } - // keep these two try blocks separate for the time being - // leaving the above will cause the writer to close writing and finalizing the session and - // (hopefully, on successful finalization) resolve our future - try { - BlobInfo info = objectFuture.get(10, TimeUnit.SECONDS); - return info.asBlob(this); - } catch (ExecutionException | InterruptedException | TimeoutException e) { - otelSpan.recordException(e); - otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); - throw StorageException.coalesce(e); - } - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); - // We don't want to wrap the storage exception, but we want to record any other exception - // we simply throw the exception after recording in the span. - throw e; - - } finally { - otelSpan.end(); + + ApiFuture objectFuture; + try (StorageWriteChannel writer = writer(blobInfo, options)) { + objectFuture = writer.getObject(); + uploadHelper(Channels.newChannel(content), writer, bufferSize); + } + // keep these two try blocks separate for the time being + // leaving the above will cause the writer to close writing and finalizing the session and + // (hopefully, on successful finalization) resolve our future + try { + BlobInfo info = objectFuture.get(10, TimeUnit.SECONDS); + return info.asBlob(this); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + throw StorageException.coalesce(e); } } @@ -722,40 +660,30 @@ public Blob compose(final ComposeRequest composeRequest) { @Override public CopyWriter copy(final CopyRequest copyRequest) { - Span otelSpan = openTelemetryTraceUtil.startSpan("copy", MODULE_STORAGE); - try (Scope ignored = otelSpan.makeCurrent()) { - BlobId source = copyRequest.getSource(); - BlobInfo target = copyRequest.getTarget(); - Opts sourceOpts = - Opts.unwrap(copyRequest.getSourceOptions()).resolveFrom(source).projectAsSource(); - Opts targetOpts = - Opts.unwrap(copyRequest.getTargetOptions()).resolveFrom(target); - - StorageObject sourcePb = codecs.blobId().encode(source); - StorageObject targetPb = codecs.blobInfo().encode(target); - ImmutableMap sourceOptions = sourceOpts.getRpcOptions(); - ImmutableMap targetOptions = targetOpts.getRpcOptions(); - RewriteRequest rewriteRequest = - new RewriteRequest( - sourcePb, - sourceOptions, - copyRequest.overrideInfo(), - targetPb, - targetOptions, - copyRequest.getMegabytesCopiedPerChunk()); - ResultRetryAlgorithm algorithm = - retryAlgorithmManager.getForObjectsRewrite(rewriteRequest); - return run( - algorithm, - () -> storageRpc.openRewrite(rewriteRequest), - (r) -> new HttpCopyWriter(getOptions(), r)); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + BlobId source = copyRequest.getSource(); + BlobInfo target = copyRequest.getTarget(); + Opts sourceOpts = + Opts.unwrap(copyRequest.getSourceOptions()).resolveFrom(source).projectAsSource(); + Opts targetOpts = + Opts.unwrap(copyRequest.getTargetOptions()).resolveFrom(target); + + StorageObject sourcePb = codecs.blobId().encode(source); + StorageObject targetPb = codecs.blobInfo().encode(target); + ImmutableMap sourceOptions = sourceOpts.getRpcOptions(); + ImmutableMap targetOptions = targetOpts.getRpcOptions(); + RewriteRequest rewriteRequest = + new RewriteRequest( + sourcePb, + sourceOptions, + copyRequest.overrideInfo(), + targetPb, + targetOptions, + copyRequest.getMegabytesCopiedPerChunk()); + ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsRewrite(rewriteRequest); + return run( + algorithm, + () -> storageRpc.openRewrite(rewriteRequest), + (r) -> new HttpCopyWriter(getOptions(), r)); } @Override @@ -765,22 +693,13 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio @Override public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("readAllBytes", MODULE_STORAGE); - try (Scope ignored = otelSpan.makeCurrent()) { - final StorageObject storageObject = codecs.blobId().encode(blob); - Opts unwrap = Opts.unwrap(options); - Opts resolve = unwrap.resolveFrom(blob); - ImmutableMap optionsMap = resolve.getRpcOptions(); - ResultRetryAlgorithm algorithm = - retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap); - return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity()); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + final StorageObject storageObject = codecs.blobId().encode(blob); + Opts unwrap = Opts.unwrap(options); + Opts resolve = unwrap.resolveFrom(blob); + ImmutableMap optionsMap = resolve.getRpcOptions(); + ResultRetryAlgorithm algorithm = + retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap); + return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity()); } @Override @@ -795,19 +714,10 @@ public StorageReadChannel reader(String bucket, String blob, BlobSourceOption... @Override public StorageReadChannel reader(BlobId blob, BlobSourceOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("reader", MODULE_STORAGE); - try (Scope ignored = otelSpan.makeCurrent()) { - Opts opts = Opts.unwrap(options).resolveFrom(blob); - StorageObject storageObject = Conversions.json().blobId().encode(blob); - ImmutableMap optionsMap = opts.getRpcOptions(); - return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + Opts opts = Opts.unwrap(options).resolveFrom(blob); + StorageObject storageObject = Conversions.json().blobId().encode(blob); + ImmutableMap optionsMap = opts.getRpcOptions(); + return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); } @Override @@ -821,84 +731,57 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) { @Override public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("downloadTo", MODULE_STORAGE); - try (Scope ignored = otelSpan.makeCurrent()) { - final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); - final StorageObject pb = codecs.blobId().encode(blob); - ImmutableMap optionsMap = - Opts.unwrap(options).resolveFrom(blob).getRpcOptions(); - ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap); - run( - algorithm, - callable( - () -> { - storageRpc.read( - pb, optionsMap, countingOutputStream.getCount(), countingOutputStream); - }), - Function.identity()); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); + final StorageObject pb = codecs.blobId().encode(blob); + ImmutableMap optionsMap = + Opts.unwrap(options).resolveFrom(blob).getRpcOptions(); + ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap); + run( + algorithm, + callable( + () -> { + storageRpc.read( + pb, optionsMap, countingOutputStream.getCount(), countingOutputStream); + }), + Function.identity()); } @Override public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { - Span otelSpan = openTelemetryTraceUtil.startSpan("writer", MODULE_STORAGE); - try (Scope ignored = otelSpan.makeCurrent()) { - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - final Map optionsMap = opts.getRpcOptions(); - BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); - BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); - - StorageObject encode = codecs.blobInfo().encode(updated); - // open the resumable session outside the write channel - // the exception behavior of open is different from #write(ByteBuffer) - Supplier uploadIdSupplier = - ResumableMedia.startUploadForBlobInfo( - getOptions(), - updated, - optionsMap, - retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); - JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); + final Map optionsMap = opts.getRpcOptions(); + BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + + StorageObject encode = codecs.blobInfo().encode(updated); + // open the resumable session outside the write channel + // the exception behavior of open is different from #write(ByteBuffer) + Supplier uploadIdSupplier = + ResumableMedia.startUploadForBlobInfo( + getOptions(), + updated, + optionsMap, + retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); + JsonResumableWrite jsonResumableWrite = + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); } @Override public StorageWriteChannel writer(URL signedURL) { - Span otelSpan = openTelemetryTraceUtil.startSpan("writer", MODULE_STORAGE); - try (Scope ignored = otelSpan.makeCurrent()) { - // TODO: is it possible to know if a signed url is configured to have a constraint which makes - // it idempotent? - ResultRetryAlgorithm forResumableUploadSessionCreate = - retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); - // open the resumable session outside the write channel - // the exception behavior of open is different from #write(ByteBuffer) - String signedUrlString = signedURL.toString(); - Supplier uploadIdSupplier = - ResumableMedia.startUploadForSignedUrl( - getOptions(), signedURL, forResumableUploadSessionCreate); - JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); - } catch (Exception e) { - otelSpan.recordException(e); - otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; - } finally { - otelSpan.end(); - } + // TODO: is it possible to know if a signed url is configured to have a constraint which makes + // it idempotent? + ResultRetryAlgorithm forResumableUploadSessionCreate = + retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); + // open the resumable session outside the write channel + // the exception behavior of open is different from #write(ByteBuffer) + String signedUrlString = signedURL.toString(); + Supplier uploadIdSupplier = + ResumableMedia.startUploadForSignedUrl( + getOptions(), signedURL, forResumableUploadSessionCreate); + JsonResumableWrite jsonResumableWrite = + JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java index 9381181683..f149db1107 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java @@ -29,7 +29,7 @@ import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.spi.StorageRpcFactory; -import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.io.InputStream; import java.util.Properties; @@ -111,10 +111,14 @@ public abstract static class Builder public abstract StorageOptions.Builder setBlobWriteSessionConfig( @NonNull BlobWriteSessionConfig blobWriteSessionConfig); - /** @since 2.46.1 This new api is in preview and is subject to breaking changes. */ + /** + * Enable OpenTelemetry Tracing and provide an instance for the client to use. + * + * @param openTelemetry User defined instance of OpenTelemetry to be used by the library + * @since 2.47.0 This new api is in preview and is subject to breaking changes. + */ @BetaApi - public abstract StorageOptions.Builder setOpenTelemetrySdk( - @NonNull OpenTelemetrySdk openTelemetrySdk); + public abstract StorageOptions.Builder setOpenTelemetry(OpenTelemetry openTelemetry); @Override public abstract StorageOptions build(); @@ -150,7 +154,9 @@ public static String version() { return VERSION; } - public abstract OpenTelemetrySdk getOpenTelemetrySdk(); + /** @since 2.47.0 This new api is in preview and is subject to breaking changes. */ + @BetaApi + public abstract OpenTelemetry getOpenTelemetry(); @SuppressWarnings("unchecked") @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/NoOpOpenTelemetryInstance.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/NoOpOpenTelemetryInstance.java deleted file mode 100644 index 0106f18f86..0000000000 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/NoOpOpenTelemetryInstance.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.storage.otel; - -import com.google.api.core.ApiFuture; -import io.opentelemetry.api.trace.StatusCode; -import java.util.Map; -import javax.annotation.Nonnull; - -class NoOpOpenTelemetryInstance implements OpenTelemetryTraceUtil { - - @Override - public OpenTelemetryTraceUtil.Span startSpan(String spanName, String module) { - return new Span(); - } - - @Override - public OpenTelemetryTraceUtil.Span startSpan( - String spanName, String module, OpenTelemetryTraceUtil.Context parent) { - return new Span(); - } - - @Nonnull - @Override - public Span currentSpan() { - return new Span(); - } - - @Nonnull - @Override - public Context currentContext() { - return new Context(); - } - - static class Span implements OpenTelemetryTraceUtil.Span { - @Override - public void end() {} - - @Override - public void end(Throwable error) {} - - @Override - public void endAtFuture(ApiFuture futureValue) {} - - @Override - public OpenTelemetryTraceUtil.Span recordException(Throwable error) { - return this; - } - - @Override - public OpenTelemetryTraceUtil.Span setStatus(StatusCode status, String name) { - return this; - } - - @Override - public OpenTelemetryTraceUtil.Span addEvent(String name) { - return this; - } - - @Override - public OpenTelemetryTraceUtil.Span addEvent(String name, Map attributes) { - return this; - } - - @Override - public Scope makeCurrent() { - return new Scope(); - } - } - - static class Context implements OpenTelemetryTraceUtil.Context { - @Override - public Scope makeCurrent() { - return new Scope(); - } - } - - static class Scope implements OpenTelemetryTraceUtil.Scope { - @Override - public void close() {} - } -} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java deleted file mode 100644 index dd9f5d4233..0000000000 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.storage.otel; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.core.ApiFuture; -import com.google.cloud.storage.GrpcStorageOptions; -import com.google.cloud.storage.StorageOptions; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import java.util.Map; -import javax.annotation.Nonnull; - -class OpenTelemetryInstance implements OpenTelemetryTraceUtil { - private final Tracer tracer; - private final OpenTelemetry openTelemetry; - private final StorageOptions storageOptions; - - private static final String LIBRARY_NAME = "cloud.google.com/java/storage"; - - private final String transport; - - OpenTelemetryInstance(StorageOptions storageOptions) { - this.storageOptions = storageOptions; - this.openTelemetry = storageOptions.getOpenTelemetrySdk(); - this.tracer = openTelemetry.getTracer(LIBRARY_NAME, storageOptions.getLibraryVersion()); - this.transport = storageOptions instanceof GrpcStorageOptions ? "grpc" : "http"; - } - - static class Span implements OpenTelemetryTraceUtil.Span { - private final io.opentelemetry.api.trace.Span span; - private final String spanName; - - private Span(io.opentelemetry.api.trace.Span span, String spanName) { - this.span = span; - this.spanName = spanName; - } - - @Override - public OpenTelemetryTraceUtil.Span recordException(Throwable error) { - span.recordException( - error, - Attributes.of( - AttributeKey.stringKey("exception.message"), error.getMessage(), - AttributeKey.stringKey("exception.type"), error.getClass().getName(), - AttributeKey.stringKey("exception.stacktrace"), error.getStackTrace().toString())); - return this; - } - - @Override - public OpenTelemetryTraceUtil.Span setStatus(StatusCode status, String name) { - span.setStatus(status, name); - return this; - } - - @Override - public OpenTelemetryTraceUtil.Span addEvent(String name) { - span.addEvent(name); - return this; - } - - @Override - public OpenTelemetryTraceUtil.Span addEvent(String name, Map attributes) { - AttributesBuilder attributesBuilder = Attributes.builder(); - attributes.forEach( - (key, value) -> { - if (value instanceof Integer) { - attributesBuilder.put(key, (int) value); - } else if (value instanceof Long) { - attributesBuilder.put(key, (long) value); - } else if (value instanceof Double) { - attributesBuilder.put(key, (double) value); - } else if (value instanceof Float) { - attributesBuilder.put(key, (float) value); - } else if (value instanceof Boolean) { - attributesBuilder.put(key, (boolean) value); - } else if (value instanceof String) { - attributesBuilder.put(key, (String) value); - } else { - // OpenTelemetry APIs do not support any other type. - throw new IllegalArgumentException( - "Unknown attribute type:" + value.getClass().getSimpleName()); - } - }); - span.addEvent(name, attributesBuilder.build()); - return this; - } - - @Override - public Scope makeCurrent() { - return new Scope(span.makeCurrent()); - } - - @Override - public void end() { - span.end(); - } - - @Override - public void end(Throwable error) {} - - @Override - public void endAtFuture(ApiFuture futureValue) {} - } - - static class Scope implements OpenTelemetryTraceUtil.Scope { - private final io.opentelemetry.context.Scope scope; - - private Scope(io.opentelemetry.context.Scope scope) { - this.scope = scope; - } - - @Override - public void close() { - scope.close(); - } - } - - static class Context implements OpenTelemetryTraceUtil.Context { - private final io.opentelemetry.context.Context context; - - private Context(io.opentelemetry.context.Context context) { - this.context = context; - } - - @Override - public Scope makeCurrent() { - return new Scope(context.makeCurrent()); - } - } - - @Override - public OpenTelemetryTraceUtil.Span startSpan(String methodName, String module) { - String formatSpanName = String.format("%s/%s", module, methodName); - SpanBuilder spanBuilder = tracer.spanBuilder(formatSpanName); - io.opentelemetry.api.trace.Span span = - addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); - return new Span(span, formatSpanName); - } - - @Override - public OpenTelemetryTraceUtil.Span startSpan( - String methodName, String module, OpenTelemetryTraceUtil.Context parent) { - checkArgument( - parent instanceof OpenTelemetryInstance.Context, - "parent must be an instance of " + OpenTelemetryInstance.Context.class.getName()); - String formatSpanName = String.format("%s/%s", module, methodName); - Context p2 = (Context) parent; - SpanBuilder spanBuilder = - tracer.spanBuilder(formatSpanName).setSpanKind(SpanKind.CLIENT).setParent(p2.context); - io.opentelemetry.api.trace.Span span = - addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); - return new Span(span, formatSpanName); - } - - @Nonnull - @Override - public OpenTelemetryTraceUtil.Span currentSpan() { - return new Span(io.opentelemetry.api.trace.Span.current(), ""); - } - - @Nonnull - @Override - public OpenTelemetryTraceUtil.Context currentContext() { - return new Context(io.opentelemetry.context.Context.current()); - } - - private SpanBuilder addSettingsAttributesToCurrentSpan(SpanBuilder spanBuilder) { - spanBuilder = spanBuilder.setAttribute("gcp.client.service", "Storage"); - spanBuilder = - spanBuilder.setAllAttributes( - Attributes.builder() - .put("gcp.client.version", storageOptions.getLibraryVersion()) - .put("gcp.client.repo", "googleapis/java-storage") - .put("gcp.client.artifact", "com.google.cloud:google-cloud-storage") - .put("rpc.system", transport) - .build()); - return spanBuilder; - } -} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryTraceUtil.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryTraceUtil.java deleted file mode 100644 index ed76c31e39..0000000000 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryTraceUtil.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.storage.otel; - -import com.google.api.core.ApiFuture; -import com.google.api.core.InternalApi; -import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageOptions; -import com.google.cloud.storage.spi.v1.StorageRpc; -import io.opentelemetry.api.trace.StatusCode; -import java.util.Map; -import javax.annotation.Nonnull; - -@InternalApi -public interface OpenTelemetryTraceUtil { - String MODULE_STORAGE = Storage.class.getName(); - String MODULE_STORAGE_RPC = StorageRpc.class.getName(); - - @InternalApi - static OpenTelemetryTraceUtil getInstance(@Nonnull StorageOptions storageOptions) { - boolean createNoOp = storageOptions.getOpenTelemetrySdk() == null; - - if (createNoOp) { - return new NoOpOpenTelemetryInstance(); - } else { - return new OpenTelemetryInstance(storageOptions); - } - } - - /** Represents a trace span. */ - @InternalApi - interface Span { - @InternalApi - Span recordException(Throwable error); - - @InternalApi - Span setStatus(StatusCode status, String name); - /** Adds the given event to this span. */ - @InternalApi - Span addEvent(String name); - - /** Adds the given event with the given attributes to this span. */ - @InternalApi - Span addEvent(String name, Map attributes); - - /** Marks this span as the current span. */ - @InternalApi - Scope makeCurrent(); - - /** Ends this span. */ - @InternalApi - void end(); - - /** Ends this span in an error. */ - @InternalApi - void end(Throwable error); - - /** - * If an operation ends in the future, its relevant span should end _after_ the future has been - * completed. This method "appends" the span completion code at the completion of the given - * future. In order for telemetry info to be recorded, the future returned by this method should - * be completed. - */ - @InternalApi - void endAtFuture(ApiFuture futureValue); - } - - /** Represents a trace context. */ - @InternalApi - interface Context { - /** Makes this context the current context. */ - @InternalApi - Scope makeCurrent(); - } - - /** Represents a trace scope. */ - @InternalApi - interface Scope extends AutoCloseable { - /** Closes the current scope. */ - @InternalApi - void close(); - } - - /** Starts a new span with the given name, sets it as the current span, and returns it. */ - @InternalApi - Span startSpan(String spanName, String module); - - /** - * Starts a new span with the given name and the given context as its parent, sets it as the - * current span, and returns it. - */ - @InternalApi - Span startSpan(String spanName, String module, Context parent); - - /** Returns the current span. */ - @Nonnull - @InternalApi - Span currentSpan(); - - /** Returns the current Context. */ - @Nonnull - @InternalApi - Context currentContext(); -} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/package-info.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/package-info.java deleted file mode 100644 index d8fac3ca8e..0000000000 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/package-info.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Set of internal utilities to make our OTel use a bit more terse. - * - *

All classes, interfaces, etc are considered to be for internal library use only and can break - * at any time. - */ -@InternalApi -package com.google.cloud.storage.otel; - -import com.google.api.core.InternalApi; diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java index b85e6a72f1..3b8957bbac 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java @@ -16,21 +16,22 @@ package com.google.cloud.storage; +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.common.truth.Truth.assertThat; + import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; import com.google.cloud.storage.it.runner.annotations.CrossRun; import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.registry.Generator; import com.google.cloud.storage.otel.TestExporter; -import com.google.cloud.storage.testing.RemoteStorageHelper; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; -import io.opentelemetry.sdk.trace.export.SpanExporter; -import java.util.UUID; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,12 +40,17 @@ backends = Backend.PROD, transports = {Transport.HTTP, Transport.GRPC}) public final class ITOpenTelemetryTest { + @Inject public Storage storage; + + @Inject public BucketInfo bucket; + + @Inject public Generator generator; @Inject public Transport transport; @Test - public void checkInstrumentation() { - SpanExporter exporter = new TestExporter(); + public void checkInstrumentation() throws Exception { + TestExporter exporter = new TestExporter(); OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder() @@ -53,46 +59,33 @@ public void checkInstrumentation() { .addSpanProcessor(SimpleSpanProcessor.create(exporter)) .build()) .build(); - StorageOptions storageOptions = - storage.getOptions().toBuilder().setOpenTelemetrySdk(openTelemetrySdk).build(); - storage = storageOptions.getService(); - String bucket = randomBucketName(); - try { - storage.create(BucketInfo.of(bucket)); - TestExporter testExported = (TestExporter) exporter; - SpanData spanData = testExported.getExportedSpans().get(0); - Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service")); - Assert.assertEquals( - "googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo")); - Assert.assertEquals( - "com.google.cloud:google-cloud-storage", - getAttributeValue(spanData, "gcp.client.artifact")); - Assert.assertEquals( - transport.name().toLowerCase(), getAttributeValue(spanData, "rpc.system")); - } finally { - // Cleanup - RemoteStorageHelper.forceDelete(storage, bucket); + storage.getOptions().toBuilder().setOpenTelemetry(openTelemetrySdk).build(); + try (Storage storage = storageOptions.getService()) { + storage.create(BlobInfo.newBuilder(bucket, generator.randomObjectName()).build()); } + + SpanData spanData = exporter.getExportedSpans().get(0); + assertAll( + () -> assertThat(getAttributeValue(spanData, "gcp.client.service")).isEqualTo("Storage"), + () -> + assertThat(getAttributeValue(spanData, "gcp.client.repo")) + .isEqualTo("googleapis/java-storage"), + () -> + assertThat(getAttributeValue(spanData, "gcp.client.artifact")) + .isEqualTo("com.google.cloud:google-cloud-storage"), + () -> + assertThat(getAttributeValue(spanData, "rpc.system")) + .isEqualTo(transport.name().toLowerCase())); } @Test public void noOpDoesNothing() { - String bucket = randomBucketName(); - try { - storage.create(BucketInfo.of(bucket)); - Assert.assertNull(storage.getOptions().getOpenTelemetrySdk()); - } finally { - // cleanup - RemoteStorageHelper.forceDelete(storage, bucket); - } - } - - private String getAttributeValue(SpanData spanData, String key) { - return spanData.getAttributes().get(AttributeKey.stringKey(key)).toString(); + assertThat(storage.getOptions().getOpenTelemetry()).isSameInstanceAs(OpenTelemetry.noop()); + storage.create(BlobInfo.newBuilder(bucket, generator.randomObjectName()).build()); } - public String randomBucketName() { - return "java-storage-grpc-rand-" + UUID.randomUUID(); + private static String getAttributeValue(SpanData spanData, String key) { + return spanData.getAttributes().get(AttributeKey.stringKey(key)); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTestbenchTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTestbenchTest.java index 66f08c61f3..ed999ad0a4 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTestbenchTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTestbenchTest.java @@ -43,6 +43,7 @@ import java.nio.file.Paths; import java.util.List; import java.util.UUID; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -57,7 +58,6 @@ public class ITOpenTelemetryTestbenchTest { @Inject public Generator generator; @Inject public BucketInfo testBucket; @Inject public Storage storage; - private StorageOptions options; private SpanExporter exporter; private static final byte[] helloWorldTextBytes = "hello world".getBytes(); private BlobId blobId; @@ -73,12 +73,20 @@ public void setUp() { .addSpanProcessor(SimpleSpanProcessor.create(exporter)) .build()) .build(); - options = storage.getOptions().toBuilder().setOpenTelemetrySdk(openTelemetrySdk).build(); + StorageOptions options = + storage.getOptions().toBuilder().setOpenTelemetry(openTelemetrySdk).build(); storage = options.getService(); String objectString = generator.randomObjectName(); blobId = BlobId.of(testBucket.getName(), objectString); } + @After + public void tearDown() throws Exception { + if (storage != null) { + storage.close(); + } + } + @Test public void runCreateBucket() { String bucket = "random-bucket" + UUID.randomUUID();