From bbf04129c901a59d3e11520cb9bc0eb899bf4911 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 3 Jan 2025 22:20:01 +0000 Subject: [PATCH] test: integrate otel into our StorageITRunner --- google-cloud-storage/pom.xml | 6 + .../cloud/storage/OtelStorageDecorator.java | 13 +- .../PackagePrivateMethodWorkarounds.java | 19 ++- .../it/runner/registry/BackendResources.java | 8 +- .../it/runner/registry/ObjectsFixture.java | 14 +- .../it/runner/registry/OtelSdkShim.java | 102 ++++++++++++++ .../storage/it/runner/registry/Registry.java | 132 +++++++++++++++--- .../registry/TestRunScopedInstance.java | 61 +++++++- 8 files changed, 311 insertions(+), 44 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/OtelSdkShim.java diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml index a246beb313..8ad8d116ef 100644 --- a/google-cloud-storage/pom.xml +++ b/google-cloud-storage/pom.xml @@ -164,6 +164,12 @@ com.google.cloud.opentelemetry exporter-metrics + + com.google.cloud.opentelemetry + exporter-trace + 0.32.0 + test + io.opentelemetry.contrib diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java index a1813a98c5..28a26045f4 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java @@ -31,6 +31,7 @@ import com.google.cloud.storage.PostPolicyV4.PostConditionsV4; import com.google.cloud.storage.PostPolicyV4.PostFieldsV4; import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.common.annotations.VisibleForTesting; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; @@ -58,7 +59,7 @@ final class OtelStorageDecorator implements Storage { /** Becomes the {@code otel.scope.name} attribute in a span */ private static final String OTEL_SCOPE_NAME = "cloud.google.com/java/storage"; - private final Storage delegate; + @VisibleForTesting final Storage delegate; private final OpenTelemetry otel; private final Attributes baseAttributes; private final Tracer tracer; @@ -1501,9 +1502,10 @@ public SpanBuilder spanBuilder(String spanName) { } } - private static final class OtelDecoratedReadChannel implements ReadChannel { + @VisibleForTesting + static final class OtelDecoratedReadChannel implements ReadChannel { - private final ReadChannel reader; + @VisibleForTesting final ReadChannel reader; private final Span span; private OtelDecoratedReadChannel(ReadChannel reader, Span span) { @@ -1620,8 +1622,9 @@ public void close() throws IOException { } } - private static final class OtelDecoratedWriteChannel implements WriteChannel { - private final WriteChannel delegate; + @VisibleForTesting + static final class OtelDecoratedWriteChannel implements WriteChannel { + @VisibleForTesting final WriteChannel delegate; private final Span openSpan; private OtelDecoratedWriteChannel(WriteChannel delegate, Span openSpan) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java index 66df95a896..2aba240722 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java @@ -20,6 +20,8 @@ import com.google.api.core.ApiFutures; import com.google.cloud.ReadChannel; import com.google.cloud.WriteChannel; +import com.google.cloud.storage.OtelStorageDecorator.OtelDecoratedReadChannel; +import com.google.cloud.storage.OtelStorageDecorator.OtelDecoratedWriteChannel; import com.google.common.collect.ImmutableList; import com.google.storage.v2.StorageClient; import java.util.Optional; @@ -49,6 +51,10 @@ public static Blob blobCopyWithStorage(Blob b, Storage s) { public static Function> maybeGetBlobInfoFunction() { return (w) -> { + if (w instanceof OtelDecoratedWriteChannel) { + OtelDecoratedWriteChannel odwc = (OtelDecoratedWriteChannel) w; + w = odwc.delegate; + } if (w instanceof BlobWriteChannelV2) { BlobWriteChannelV2 blobWriteChannel = (BlobWriteChannelV2) w; return Optional.ofNullable(blobWriteChannel.getResolvedObject()) @@ -71,13 +77,16 @@ public static Function> maybeGetBlobInfoFunctio } public static ApiFuture getBlobInfoFromReadChannelFunction(ReadChannel c) { + if (c instanceof OtelDecoratedReadChannel) { + OtelDecoratedReadChannel odrc = (OtelDecoratedReadChannel) c; + c = odrc.reader; + } if (c instanceof StorageReadChannel) { StorageReadChannel src = (StorageReadChannel) c; return src.getObject(); - } else { - return ApiFutures.immediateFailedFuture( - new IllegalStateException("Unsupported ReadChannel Type " + c.getClass().getName())); } + return ApiFutures.immediateFailedFuture( + new IllegalStateException("Unsupported ReadChannel Type " + c.getClass().getName())); } @Nullable @@ -87,6 +96,10 @@ public static StorageClient maybeGetStorageClient(Storage s) { } // handle instances of AbstractStorageProxy Storage service = s.getOptions().getService(); + if (service instanceof OtelStorageDecorator) { + OtelStorageDecorator osd = (OtelStorageDecorator) service; + service = osd.delegate; + } if (service instanceof GrpcStorageImpl) { return ((GrpcStorageImpl) service).storageClient; } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java index a00d4e616e..012a404a34 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java @@ -83,7 +83,7 @@ public String toString() { } @SuppressWarnings("SwitchStatementWithTooFewBranches") - static BackendResources of(Backend backend) { + static BackendResources of(Backend backend, TestRunScopedInstance otelSdk) { ProtectedBucketNames protectedBucketNames = new ProtectedBucketNames(); TestRunScopedInstance storageJson = TestRunScopedInstance.of( @@ -99,7 +99,8 @@ static BackendResources of(Backend backend) { .setProjectId("test-project-id"); break; default: // PROD, java8 doesn't have exhaustive checking for enum switch - optionsBuilder = StorageOptions.http(); + // Register the exporters with OpenTelemetry + optionsBuilder = StorageOptions.http().setOpenTelemetry(otelSdk.get().get()); break; } HttpStorageOptions built = optionsBuilder.build(); @@ -121,7 +122,8 @@ static BackendResources of(Backend backend) { .setProjectId("test-project-id"); break; default: // PROD, java8 doesn't have exhaustive checking for enum switch - optionsBuilder = StorageOptions.grpc(); + // Register the exporters with OpenTelemetry + optionsBuilder = StorageOptions.grpc().setOpenTelemetry(otelSdk.get().get()); break; } GrpcStorageOptions built = diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/ObjectsFixture.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/ObjectsFixture.java index ed9e4428b7..ef8bbddd10 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/ObjectsFixture.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/ObjectsFixture.java @@ -102,7 +102,8 @@ public void start() { BlobInfo info2 = BlobInfo.newBuilder(blobId2).setMetadata(ImmutableMap.of("pow", "2")).build(); BlobInfo info3 = BlobInfo.newBuilder(blobId3).setMetadata(ImmutableMap.of("pow", "3")).build(); BlobInfo info4 = BlobInfo.newBuilder(blobId4).setMetadata(ImmutableMap.of("pow", "4")).build(); - s.create(info1, "A".getBytes(StandardCharsets.UTF_8), blobTargetOptions); + this.info1 = + s.create(info1, "A".getBytes(StandardCharsets.UTF_8), blobTargetOptions).asBlobInfo(); ComposeRequest c2 = ComposeRequest.newBuilder() @@ -122,14 +123,9 @@ public void start() { .setTarget(info4) .setTargetOptions(blobTargetOptions) .build(); - s.compose(c2); - s.compose(c3); - s.compose(c4); - - this.info1 = s.get(blobId1, blobGetOptions).asBlobInfo(); - this.info2 = s.get(blobId2, blobGetOptions).asBlobInfo(); - this.info3 = s.get(blobId3, blobGetOptions).asBlobInfo(); - this.info4 = s.get(blobId4, blobGetOptions).asBlobInfo(); + this.info2 = s.compose(c2).asBlobInfo(); + this.info3 = s.compose(c3).asBlobInfo(); + this.info4 = s.compose(c4).asBlobInfo(); byte[] bytes = DataGenerator.base64Characters().genBytes(512 * 1024); Blob obj512KiB = diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/OtelSdkShim.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/OtelSdkShim.java new file mode 100644 index 0000000000..f2ea10bce5 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/OtelSdkShim.java @@ -0,0 +1,102 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it.runner.registry; + +import com.google.cloud.opentelemetry.metric.GoogleCloudMetricExporter; +import com.google.cloud.opentelemetry.metric.MetricConfiguration; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.time.Duration; +import java.util.Arrays; + +public final class OtelSdkShim implements ManagedLifecycle { + private static final boolean STORAGE_IT_OTEL_ENABLE = + Arrays.asList( + System.getProperty("STORAGE_IT_OTEL_ENABLE"), System.getenv("STORAGE_IT_OTEL_ENABLE")) + .contains("true"); + private final String projectId; + + private OpenTelemetrySdk otelSdk; + + OtelSdkShim(String projectId) { + this.projectId = projectId; + } + + @Override + public OpenTelemetry get() { + if (otelSdk == null) { + return OpenTelemetry.noop(); + } + return otelSdk; + } + + @Override + public void start() { + if (!STORAGE_IT_OTEL_ENABLE) { + otelSdk = null; + return; + } + MetricConfiguration metricConfiguration = + MetricConfiguration.builder() + .setProjectId(projectId) + .setDeadline(Duration.ofSeconds(30)) + .build(); + TraceConfiguration traceConfiguration = + TraceConfiguration.builder() + .setProjectId(projectId) + .setDeadline(Duration.ofSeconds(30)) + .build(); + MetricExporter metricExporter = + GoogleCloudMetricExporter.createWithConfiguration(metricConfiguration); + SpanExporter spanExporter = TraceExporter.createWithConfiguration(traceConfiguration); + + SdkMeterProvider meterProvider = + SdkMeterProvider.builder() + .registerMetricReader( + PeriodicMetricReader.builder(metricExporter) + .setInterval(Duration.ofSeconds(60)) + .build()) + .build(); + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .setSampler(Sampler.traceIdRatioBased(1.0)) + .addSpanProcessor( + BatchSpanProcessor.builder(spanExporter).setMeterProvider(meterProvider).build()) + .build(); + otelSdk = + OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .setMeterProvider(meterProvider) + .build(); + } + + @Override + public void stop() { + if (otelSdk != null) { + otelSdk.close(); + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java index e8c43b20f0..cf0a3cb899 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java @@ -16,11 +16,13 @@ package com.google.cloud.storage.it.runner.registry; +import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.it.runner.CrossRunIntersection; import com.google.cloud.storage.it.runner.TestInitializer; import com.google.cloud.storage.it.runner.annotations.Backend; import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.annotations.SingleBackend; import com.google.cloud.storage.it.runner.annotations.StorageFixture; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; @@ -28,6 +30,12 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; @@ -70,16 +78,26 @@ public final class Registry extends RunListener { private final TestRunScopedInstance generator = TestRunScopedInstance.of("GENERATOR", Generator::new); - private final BackendResources prodBackendResources = BackendResources.of(Backend.PROD); - private final BackendResources testBenchBackendResource = BackendResources.of(Backend.TEST_BENCH); + final TestRunScopedInstance otelSdk = + TestRunScopedInstance.of( + "OTEL_SDK", + () -> { + String projectId = StorageOptions.getDefaultInstance().getProjectId(); + return new OtelSdkShim(projectId); + }); + + private final BackendResources prodBackendResources = BackendResources.of(Backend.PROD, otelSdk); + private final BackendResources testBenchBackendResource = + BackendResources.of(Backend.TEST_BENCH, otelSdk); private final ImmutableList> entries = new ImmutableList.Builder>() .add( - RegistryEntry.of(0, TestBench.class, testBench), - RegistryEntry.of(1, Generator.class, generator), - registryEntry(2, Backend.class, CrossRunIntersection::getBackend), - registryEntry(3, Transport.class, CrossRunIntersection::getTransport)) + RegistryEntry.of(0, OpenTelemetry.class, otelSdk), + RegistryEntry.of(1, TestBench.class, testBench), + RegistryEntry.of(2, Generator.class, generator), + registryEntry(3, Backend.class, CrossRunIntersection::getBackend), + registryEntry(4, Transport.class, CrossRunIntersection::getTransport)) .addAll(prodBackendResources.getRegistryEntries()) .addAll(testBenchBackendResource.getRegistryEntries()) .build(); @@ -87,7 +105,23 @@ public final class Registry extends RunListener { private final ImmutableSet> injectableTypes = entries.stream().map(RegistryEntry::getType).collect(ImmutableSet.toImmutableSet()); private final String injectableTypesString = Joiner.on("|").join(injectableTypes); - private final ThreadLocal currentTest = new ThreadLocal<>(); + private final ThreadLocal currentTest = new ThreadLocal<>(); + + private static final class CurrentTest { + private final Description desc; + private final Span span; + private final Scope scope; + + private CurrentTest(Description desc, Span span, Scope scope) { + this.desc = desc; + this.span = span; + this.scope = scope; + } + + public static CurrentTest of(Description desc, Span span, Scope scope) { + return new CurrentTest(desc, span, scope); + } + } private Registry() {} @@ -117,17 +151,38 @@ TestBench testBench() { @Nullable public Description getCurrentTest() { - return currentTest.get(); + return currentTest.get().desc; } @Override public void testStarted(Description description) { - currentTest.set(description); + OpenTelemetry sdk; + if (description.getDisplayName().contains("[TEST_BENCH]") + || isClassAnnotatedSingleBackendTestBench(description) + || Arrays.stream(description.getTestClass().getDeclaredFields()) + .anyMatch(field -> field.getType() == TestBench.class)) { + sdk = OpenTelemetry.noop(); + } else { + sdk = otelSdk.get().get(); + } + + Tracer tracer = sdk.getTracer("test"); + Span span = + tracer + .spanBuilder( + String.format("%s/%s", description.getClassName(), description.getMethodName())) + .setAttribute("service.name", "test") + .startSpan(); + Scope scope = span.makeCurrent(); + currentTest.set(CurrentTest.of(description, span, scope)); } @Override public void testFinished(Description description) { - currentTest.remove(); + CurrentTest currentTest = this.currentTest.get(); + currentTest.scope.close(); + currentTest.span.end(); + this.currentTest.remove(); } public RunnerScheduler parallelScheduler() { @@ -182,16 +237,40 @@ public Object resolve(FrameworkField ff, CrossRunIntersection crossRunIntersecti // private volatile ListenableFuture shutdownF; private void shutdown() { - entries.stream() - .sorted() - .forEach( - re -> { - try { - re.getInstance().shutdown(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + Span span = + otelSdk + .get() + .get() + .getTracer("registry") + .spanBuilder("registry/shutdown") + .setAttribute("service.name", "registry") + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + entries.stream() + .sorted() + .filter(e -> e.getShutdownPriority() > 0) + .forEach( + re -> { + try { + re.getInstance().shutdown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + try { + // manually shutdown otelSdk so that the previous span can be recorded as ending + otelSdk.shutdown(); + } catch (Exception e) { + //noinspection ThrowFromFinallyBlock + throw new RuntimeException(e); + } + } } @FunctionalInterface @@ -226,4 +305,17 @@ private static RegistryEntry registryEntry( private static StatelessManagedLifecycle lift(Function f) { return (ff, cell) -> f.apply(cell); } + + private static boolean isClassAnnotatedSingleBackendTestBench(Description description) { + return Arrays.stream(description.getTestClass().getAnnotations()) + .anyMatch( + a -> { + if (a instanceof SingleBackend) { + SingleBackend sb = (SingleBackend) a; + return sb.value() == Backend.TEST_BENCH; + } else { + return false; + } + }); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java index d2a6e34ca3..a503b0674b 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java @@ -17,6 +17,9 @@ package com.google.cloud.storage.it.runner.registry; import com.google.common.base.MoreObjects; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.util.function.Supplier; import org.junit.runner.notification.RunListener.ThreadSafe; @@ -40,13 +43,42 @@ static TestRunScopedInstance of(String name, Sup return new TestRunScopedInstance<>(name, ctor); } + public String getName() { + return name; + } + public T get() { if (instance == null) { synchronized (this) { if (instance == null) { - T tmp = ctor.get(); - tmp.start(); - instance = tmp; + // if we don't short-circuit for OTEL_SDK we will cause a stack overflow, because we would + // be trying to get our instance to record that we're starting our instance. + if (name.equals("OTEL_SDK")) { + T tmp = ctor.get(); + tmp.start(); + instance = tmp; + } else { + Span span = + Registry.getInstance() + .otelSdk + .get() + .get() + .getTracer("test") + .spanBuilder("registry/" + name + "/start") + .setAttribute("service.name", "registry") + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + T tmp = ctor.get(); + tmp.start(); + instance = tmp; + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } } } } @@ -59,7 +91,28 @@ public void shutdown() throws Exception { synchronized (this) { instance = null; } - tmp.stop(); + if (name.equals("OTEL_SDK")) { + tmp.stop(); + } else { + Span span = + Registry.getInstance() + .otelSdk + .get() + .get() + .getTracer("test") + .spanBuilder("registry/" + name + "/stop") + .setAttribute("service.name", "registry") + .startSpan(); + try (Scope ignore = span.makeCurrent()) { + tmp.stop(); + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + throw t; + } finally { + span.end(); + } + } } }