From 85f3c6e646207086e8f3284c18c7af58ad0ab2c8 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 15 May 2026 11:44:11 -0400 Subject: [PATCH 1/2] Revert "Adds a new coder translator for Java SchemaCoder. (#37631)" This reverts commit 81769cbfd132ed3c257685fbdc87f076b903e9f5. --- CHANGES.md | 5 +- .../dataflow/DataflowPipelineTranslator.java | 2 +- .../beam/runners/dataflow/DataflowRunner.java | 59 ++++++-------- .../DataflowPipelineTranslatorTest.java | 77 ++++-------------- .../control/ProcessBundleDescriptorsTest.java | 6 +- .../util/construction/CoderTranslation.java | 62 +-------------- .../util/construction/CoderTranslator.java | 1 - .../CoderTranslatorRegistrar.java | 16 ---- .../util/construction/CoderTranslators.java | 79 ------------------- .../construction/ModelCoderRegistrar.java | 28 +------ .../sdk/util/construction/ModelCoders.java | 2 - .../construction/RehydratedComponents.java | 3 +- .../sdk/util/construction/SdkComponents.java | 39 ++++----- .../construction/CoderTranslationTest.java | 38 +-------- .../expansion/service/ExpansionService.java | 2 +- .../avro/AvroGenericCoderRegistrar.java | 18 ----- .../fn/harness/state/StateBackedIterable.java | 22 ------ 17 files changed, 72 insertions(+), 387 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ca911e52a7ad..52475a99d8e1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -85,9 +85,6 @@ ## Breaking Changes -* Portable Java SDK now encodes SchemaCoders in a portable way ([#34672](https://github.com/apache/beam/issues/34672)). - - Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.73")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47) ([#34672](https://github.com/apache/beam/issues/34672)). - - Fixes ([#36496](https://github.com/apache/beam/issues/36496)), ([#30276](https://github.com/apache/beam/issues/30276)), ([#29245](https://github.com/apache/beam/issues/29245)). * (Python) Made Beartype the default fallback type checking tool. This can be disabled with the `--disable_beartype` pipeline option. ([#38275](https://github.com/apache/beam/issues/38275)) * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). @@ -2441,4 +2438,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 1609cf6ea238..4016f31a5475 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -221,7 +221,7 @@ public Boolean visit(OrFinallyTrigger trigger) { private static byte[] serializeWindowingStrategy( WindowingStrategy windowingStrategy, PipelineOptions options) { try { - SdkComponents sdkComponents = SdkComponents.create(options); + SdkComponents sdkComponents = SdkComponents.create(); String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index c19673a3117e..299e7fa21ed1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1333,20 +1333,19 @@ public DataflowPipelineJob run(Pipeline pipeline) { // with the SDK harness image (which implements Fn API). // // The same Environment is used in different and contradictory ways, depending on whether - // it is a portable or non-portable job submission. + // it is a v1 or v2 job submission. RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(workerHarnessContainerImageURL); - // The SdkComponents for portable and non-portable job submission must be kept distinct. Both + // The SdkComponents for portable an non-portable job submission must be kept distinct. Both // need the default environment. - SdkComponents portableComponents = - SdkComponents.create( - options, - defaultEnvironmentForDataflow - .toBuilder() - .addAllDependencies(getDefaultArtifacts()) - .addAllCapabilities(Environments.getJavaCapabilities()) - .build()); + SdkComponents portableComponents = SdkComponents.create(); + portableComponents.registerEnvironment( + defaultEnvironmentForDataflow + .toBuilder() + .addAllDependencies(getDefaultArtifacts()) + .addAllCapabilities(Environments.getJavaCapabilities()) + .build()); RunnerApi.Pipeline portablePipelineProto = PipelineTranslation.toProto(pipeline, portableComponents, false); @@ -1375,30 +1374,28 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash); if (useUnifiedWorker(options)) { - LOG.info( - "Skipping non-portable transform replacements since job will run on portable worker."); + LOG.info("Skipping v1 transform replacements since job will run on v2."); } else { - // Now rewrite things to be as needed for non-portable (mutates the pipeline). - // This way the job submitted is valid for portable and non-portable, simultaneously. + // Now rewrite things to be as needed for v1 (mutates the pipeline) + // This way the job submitted is valid for v1 and v2, simultaneously replaceV1Transforms(pipeline); } - // Capture the SdkComponents for look up during step translations. - SdkComponents dataflowNonPortableComponents = - SdkComponents.create( - options, - defaultEnvironmentForDataflow - .toBuilder() - .addAllDependencies(getDefaultArtifacts()) - .addAllCapabilities(Environments.getJavaCapabilities()) - .build()); - // No need to perform transform upgrading for the non-portable runner proto. - RunnerApi.Pipeline dataflowNonPortablePipelineProto = - PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false); + // Capture the SdkComponents for look up during step translations + SdkComponents dataflowV1Components = SdkComponents.create(); + dataflowV1Components.registerEnvironment( + defaultEnvironmentForDataflow + .toBuilder() + .addAllDependencies(getDefaultArtifacts()) + .addAllCapabilities(Environments.getJavaCapabilities()) + .build()); + // No need to perform transform upgrading for the Runner v1 proto. + RunnerApi.Pipeline dataflowV1PipelineProto = + PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false); if (LOG.isDebugEnabled()) { LOG.debug( - "Dataflow non-portable worker pipeline proto:\n{}", - TextFormat.printer().printToString(dataflowNonPortablePipelineProto)); + "Dataflow v1 pipeline proto:\n{}", + TextFormat.printer().printToString(dataflowV1PipelineProto)); } // Set a unique client_request_id in the CreateJob request. @@ -1418,11 +1415,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { JobSpecification jobSpecification = translator.translate( - pipeline, - dataflowNonPortablePipelineProto, - dataflowNonPortableComponents, - this, - packages); + pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages); if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) { List experiments = diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 953f7a638ede..f8818931a68f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -47,7 +47,6 @@ import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; -import com.google.auto.value.AutoValue; import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -93,8 +92,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; @@ -169,11 +166,15 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); private SdkComponents createSdkComponents(PipelineOptions options) { + SdkComponents sdkComponents = SdkComponents.create(); + String containerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(containerImageURL); - return SdkComponents.create(options, defaultEnvironmentForDataflow); + + sdkComponents.registerEnvironment(defaultEnvironmentForDataflow); + return sdkComponents; } // A Custom Mockito matcher for an initial Job that checks that all @@ -1293,16 +1294,15 @@ public String apply(byte[] input) { file1.deleteOnExit(); File file2 = File.createTempFile("file2-", ".txt"); file2.deleteOnExit(); - SdkComponents sdkComponents = - SdkComponents.create( - options, - Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options)) - .toBuilder() - .addAllDependencies( - Environments.getArtifacts( - ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2))) - .addAllCapabilities(Environments.getJavaCapabilities()) - .build()); + SdkComponents sdkComponents = SdkComponents.create(); + sdkComponents.registerEnvironment( + Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options)) + .toBuilder() + .addAllDependencies( + Environments.getArtifacts( + ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2))) + .addAllCapabilities(Environments.getJavaCapabilities()) + .build()); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); @@ -1870,53 +1870,4 @@ public OffsetRange getInitialRange(@SuppressWarnings("unused") @Element String e return null; } } - - @AutoValue - @DefaultSchema(AutoValueSchema.class) - public abstract static class SimpleAutoValue { - public abstract String getString(); - - public abstract int getInt32(); - - public abstract long getInt64(); - - public static DataflowPipelineTranslatorTest.SimpleAutoValue of( - String string, int int32, long int64) { - return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64); - } - } - - @Test - public void testSchemaCoderTranslation() throws Exception { - DataflowPipelineOptions options = buildPipelineOptions(); - Pipeline pipeline = Pipeline.create(options); - pipeline - .apply(Impulse.create()) - .apply( - MapElements.via( - new SimpleFunction() { - @Override - public SimpleAutoValue apply(byte[] input) { - return SimpleAutoValue.of("foo", 5, 10L); - } - })) - .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))); - { - SdkComponents sdkComponents = createSdkComponents(options); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); - Map coders = pipelineProto.getComponents().getCodersMap(); - assertTrue(coders.containsKey("SchemaCoder")); - assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn()); - } - - // Prior to version 2.74, SchemaCoders are translated as custom java coders. - { - options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.73"); - SdkComponents sdkComponents = createSdkComponents(options); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); - Map coders = pipelineProto.getComponents().getCodersMap(); - assertTrue(coders.containsKey("SchemaCoder")); - assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn()); - } - } } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java index 9ea7404053d6..21d7550c38b9 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java @@ -78,8 +78,7 @@ public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exce // Add another stateful stage with a non-standard key coder Pipeline p = Pipeline.create(); Coder keycoder = VoidCoder.of(); - ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar(); - assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), is(false)); + assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false)); p.apply("impulse", Impulse.create()) .apply( "create", @@ -166,8 +165,7 @@ public void onTimer() {} public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception { Pipeline p = Pipeline.create(); Coder voidCoder = VoidCoder.of(); - ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar(); - assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), is(false)); + assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false)); p.apply("impulse", Impulse.create()) .apply( ParDo.of( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java index 2cc4bf0c6a03..22859dc68b93 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java @@ -25,13 +25,11 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.dataflow.qual.Deterministic; @@ -64,8 +62,6 @@ private static class DefaultTranslationContext implements TranslationContext {} private static @MonotonicNonNull BiMap, String> knownCoderUrns; - private static @MonotonicNonNull List coderTranslatorRegistrars; - private static @MonotonicNonNull Map, CoderTranslator> knownTranslators; @@ -84,53 +80,6 @@ static BiMap, String> getKnownCoderUrns() { return knownCoderUrns; } - private static void initializeCoderTranslatorRegistrars() { - ImmutableList.Builder registrars = ImmutableList.builder(); - for (CoderTranslatorRegistrar coderTranslatorRegistrar : - ServiceLoader.load(CoderTranslatorRegistrar.class)) { - registrars.add(coderTranslatorRegistrar); - } - coderTranslatorRegistrars = registrars.build(); - } - - static boolean isKnownCoder(Coder coder, PipelineOptions options) { - if (coderTranslatorRegistrars == null) { - initializeCoderTranslatorRegistrars(); - } - for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) { - if (registrar.isKnownCoder(coder, options)) { - return true; - } - } - return false; - } - - static CoderTranslator getCoderTranslator(Class coderClass) { - if (coderTranslatorRegistrars == null) { - initializeCoderTranslatorRegistrars(); - } - for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) { - CoderTranslator translator = registrar.getCoderTranslator(coderClass); - if (translator != null) { - return translator; - } - } - return null; - } - - static Class getCoderForUrn(String coderUrn) { - if (coderTranslatorRegistrars == null) { - initializeCoderTranslatorRegistrars(); - } - for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) { - Class coder = registrar.getCoderForUrn(coderUrn); - if (coder != null) { - return coder; - } - } - return null; - } - @VisibleForTesting @Deterministic static Map, CoderTranslator> getKnownTranslators() { @@ -158,7 +107,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder coder) throws IOE public static RunnerApi.Coder toProto(Coder coder, SdkComponents components) throws IOException { - if (isKnownCoder(coder, components.getPipelineOptions())) { + if (getKnownCoderUrns().containsKey(coder.getClass())) { return toKnownCoder(coder, components); } @@ -180,10 +129,7 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder) private static RunnerApi.Coder toKnownCoder(Coder coder, SdkComponents components) throws IOException { - CoderTranslator translator = getCoderTranslator(coder.getClass()); - if (translator == null) { - throw new IOException("Unable to find CoderTranslator for known Coder"); - } + CoderTranslator translator = getKnownTranslators().get(coder.getClass()); List componentIds = registerComponents(coder, translator, components); return RunnerApi.Coder.newBuilder() .addAllComponentCoderIds(componentIds) @@ -240,8 +186,8 @@ private static Coder fromKnownCoder( components.getComponents().getCodersOrThrow(componentId), components, context); coderComponents.add(innerCoder); } - Class coderType = getCoderForUrn(coderUrn); - CoderTranslator translator = getCoderTranslator(coderType); + Class coderType = getKnownCoderUrns().inverse().get(coderUrn); + CoderTranslator translator = getKnownTranslators().get(coderType); if (translator != null) { return translator.fromComponents( coderComponents, coder.getSpec().getPayload().toByteArray(), context); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java index 78f5b61c0f0e..3d89c4c7ff4a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java @@ -28,7 +28,6 @@ * additional payload, which is not currently supported. This exists as a temporary measure. */ public interface CoderTranslator> { - /** Extract all component {@link Coder coders} within a coder. */ List> getComponents(T from); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java index 44e8c2956aee..b69d0290de52 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java @@ -19,8 +19,6 @@ import java.util.Map; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.checkerframework.checker.nullness.qual.Nullable; /** A registrar of {@link Coder} URNs to the associated {@link CoderTranslator}. */ @SuppressWarnings({ @@ -36,18 +34,4 @@ public interface CoderTranslatorRegistrar { /** Returns a mapping of URN to {@link CoderTranslator}. */ Map, CoderTranslator> getCoderTranslators(); - - /** - * Returns whether the given Coder is known to this CoderTranslatorRegistrar. If the Coder is - * known, then getCoderTranslator() will return a non-null CoderTranslator. - */ - boolean isKnownCoder(Coder coder, PipelineOptions options); - - /** Returns the CoderTranslator to use for this Coder, or null if the Coder is not known. */ - @Nullable - CoderTranslator getCoderTranslator(Class coderClass); - - /** Returns the Coder to use for the given Urn, or null if the Urn is for an unknown Coder. */ - @Nullable - Class getCoderForUrn(String coderUrn); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java index a847bf780dff..84a90721a983 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java @@ -19,7 +19,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import java.io.IOException; import java.util.Collections; import java.util.List; import org.apache.beam.model.pipeline.v1.SchemaApi; @@ -31,19 +30,12 @@ import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaTranslation; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.ShardedKey; -import org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; -import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -185,77 +177,6 @@ public RowCoder fromComponents( }; } - static CoderTranslator> schema() { - return new CoderTranslator>() { - private static final String TO_ROW_FUNCTION_URN = "beam:torowfn:javasdk:v1"; - private static final String FROM_ROW_FUNCTION_URN = "beam:fromrowfn:javasdk:v1"; - private static final String TYPE_DESCRIPTOR_URN = "beam:typedescriptor:javasdk:v1"; - - @Override - public ImmutableList> getComponents(SchemaCoder from) { - return ImmutableList.of(); - } - - @Override - public byte[] getPayload(SchemaCoder from) { - SchemaApi.SchemaCoderPayload.Builder payload = SchemaApi.SchemaCoderPayload.newBuilder(); - payload.setSchema(SchemaTranslation.schemaToProto(from.getSchema(), true)); - payload - .getToRowFnBuilder() - .setUrn(TO_ROW_FUNCTION_URN) - .setPayload( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(from.getToRowFunction()))); - payload - .getFromRowFnBuilder() - .setUrn(FROM_ROW_FUNCTION_URN) - .setPayload( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(from.getFromRowFunction()))); - payload - .addAdditionalCoderInfosBuilder() - .setUrn(TYPE_DESCRIPTOR_URN) - .setPayload( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(from.getEncodedTypeDescriptor()))); - return payload.build().toByteArray(); - } - - @Override - public SchemaCoder fromComponents( - List> components, byte[] payload, TranslationContext context) { - checkArgument( - components.isEmpty(), "Expected empty component list, but received: %s", components); - try { - SchemaApi.SchemaCoderPayload schemaCoderPayload = - SchemaApi.SchemaCoderPayload.parseFrom(payload); - if (schemaCoderPayload.getAdditionalCoderInfosCount() == 0) { - throw new IllegalArgumentException("Missing serialized typeDescriptor"); - } - TypeDescriptor typeDescriptor = - (TypeDescriptor) - SerializableUtils.deserializeFromByteArray( - schemaCoderPayload.getAdditionalCoderInfos(0).getPayload().toByteArray(), - "typeDescriptor"); - SerializableFunction toRowFunction = - (SerializableFunction) - SerializableUtils.deserializeFromByteArray( - schemaCoderPayload.getToRowFn().getPayload().toByteArray(), "toRowFunction"); - SerializableFunction fromRowFunction = - (SerializableFunction) - SerializableUtils.deserializeFromByteArray( - schemaCoderPayload.getFromRowFn().getPayload().toByteArray(), - "fromRowFunction"); - - Schema schema = SchemaTranslation.schemaFromProto(schemaCoderPayload.getSchema()); - return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction); - } catch (IOException | IllegalArgumentException e) { - throw new RuntimeException(e); - } - } - }; - } - static CoderTranslator> shardedKey() { return new SimpleStructuredCoderTranslator>() { @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java index 1f9f1eaafbed..5b0d5aedd619 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java @@ -34,9 +34,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.ShardedKey; @@ -74,7 +71,6 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { ModelCoders.PARAM_WINDOWED_VALUE_CODER_URN) .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN) .put(RowCoder.class, ModelCoders.ROW_CODER_URN) - .put(SchemaCoder.class, ModelCoders.SCHEMA_CODER_URN) .put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN) .put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN) .put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN) @@ -100,7 +96,6 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { CoderTranslators.paramWindowedValue()) .put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class)) .put(RowCoder.class, CoderTranslators.row()) - .put(SchemaCoder.class, CoderTranslators.schema()) .put(ShardedKey.Coder.class, CoderTranslators.shardedKey()) .put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow()) .put(NullableCoder.class, CoderTranslators.nullable()) @@ -128,6 +123,10 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { Coder.class.getSimpleName()); } + public static boolean isKnownCoder(Coder coder) { + return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass()); + } + @Override public Map, String> getCoderURNs() { return BEAM_MODEL_CODER_URNS; @@ -137,23 +136,4 @@ public Map, String> getCoderURNs() { public Map, CoderTranslator> getCoderTranslators() { return BEAM_MODEL_CODERS; } - - @Override - public boolean isKnownCoder(Coder coder, PipelineOptions options) { - if (coder.getClass() == SchemaCoder.class - && StreamingOptions.updateCompatibilityVersionLessThan(options, "2.74")) { - return false; - } - return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass()); - } - - @Override - public CoderTranslator getCoderTranslator(Class coderClass) { - return BEAM_MODEL_CODERS.getOrDefault(coderClass, null); - } - - @Override - public Class getCoderForUrn(String coderUrn) { - return BEAM_MODEL_CODER_URNS.inverse().getOrDefault(coderUrn, null); - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java index 5059cc1c6b83..7b7546aceb61 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java @@ -61,7 +61,6 @@ private ModelCoders() {} getUrn(StandardCoders.Enum.PARAM_WINDOWED_VALUE); public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW); - public static final String SCHEMA_CODER_URN = getUrn(StandardCoders.Enum.SCHEMA); public static final String STATE_BACKED_ITERABLE_CODER_URN = "beam:coder:state_backed_iterable:v1"; @@ -91,7 +90,6 @@ private ModelCoders() {} WINDOWED_VALUE_CODER_URN, DOUBLE_CODER_URN, ROW_CODER_URN, - SCHEMA_CODER_URN, PARAM_WINDOWED_VALUE_CODER_URN, STATE_BACKED_ITERABLE_CODER_URN, SHARDED_KEY_CODER_URN, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java index 64c7898a37b9..f79696214368 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java @@ -189,7 +189,6 @@ public SdkComponents getSdkComponents(Collection requirements) { windowingStrategies.asMap(), coders.asMap(), Collections.emptyMap(), - requirements, - pipeline.getOptions()); + requirements); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java index 6288649aba3d..446697f24a81 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java @@ -63,7 +63,6 @@ public class SdkComponents { private final BiMap environmentIds = HashBiMap.create(); private final BiMap coderProtoToId = HashBiMap.create(); private final Set requirements; - private final PipelineOptions pipelineOptions; private final Set reservedIds = new HashSet<>(); @@ -72,7 +71,17 @@ public class SdkComponents { /** Create a new {@link SdkComponents} with no components. */ public static SdkComponents create() { - return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", null); + return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, ""); + } + + /** + * Create new {@link SdkComponents} importing all items from provided {@link Components} object. + * + *

WARNING: This action might cause some of duplicate items created. + */ + public static SdkComponents create( + RunnerApi.Components components, Collection requirements) { + return new SdkComponents(components, requirements, ""); } /*package*/ static SdkComponents create( @@ -82,9 +91,8 @@ public static SdkComponents create() { Map> windowingStrategies, Map> coders, Map environments, - Collection requirements, - PipelineOptions pipelineOptions) { - SdkComponents sdkComponents = new SdkComponents(components, requirements, "", pipelineOptions); + Collection requirements) { + SdkComponents sdkComponents = SdkComponents.create(components, requirements); sdkComponents.transformIds.inverse().putAll(transforms); sdkComponents.pCollectionIds.inverse().putAll(pCollections); sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies); @@ -95,28 +103,19 @@ public static SdkComponents create() { public static SdkComponents create(PipelineOptions options) { SdkComponents sdkComponents = - new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", options); + new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, ""); PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class); sdkComponents.registerEnvironment( Environments.createOrGetDefaultEnvironment(portablePipelineOptions)); return sdkComponents; } - public static SdkComponents create(PipelineOptions options, Environment environment) { - SdkComponents sdkComponents = - new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", options); - sdkComponents.registerEnvironment(environment); - return sdkComponents; - } - private SdkComponents( @Nullable Components components, @Nullable Collection requirements, - String newIdPrefix, - @Nullable PipelineOptions pipelineOptions) { + String newIdPrefix) { this.newIdPrefix = newIdPrefix; this.requirements = new HashSet<>(); - this.pipelineOptions = pipelineOptions; if (components == null) { if (requirements != null) { @@ -154,7 +153,7 @@ public void mergeFrom( */ public SdkComponents withNewIdPrefix(String newIdPrefix) { SdkComponents sdkComponents = - new SdkComponents(componentsBuilder.build(), requirements, newIdPrefix, pipelineOptions); + new SdkComponents(componentsBuilder.build(), requirements, newIdPrefix); sdkComponents.transformIds.putAll(transformIds); sdkComponents.pCollectionIds.putAll(pCollectionIds); sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds); @@ -175,7 +174,7 @@ public String registerPTransform( throws IOException { String name = getApplicationName(appliedPTransform); // If this transform is present in the components, nothing to do. return the existing name. - // Otherwise, the transform must be translated and added to the components. + // Otherwise the transform must be translated and added to the components. if (componentsBuilder.getTransformsOrDefault(name, null) != null) { return name; } @@ -376,8 +375,4 @@ public RunnerApi.Components toComponents() { public Collection requirements() { return ImmutableSet.copyOf(requirements); } - - public PipelineOptions getPipelineOptions() { - return pipelineOptions; - } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java index 1ec0a74f5be1..b8f92ff0053e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.not; -import com.google.auto.value.AutoValue; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -46,20 +45,14 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.SchemaCoder; -import org.apache.beam.sdk.schemas.SchemaRegistry; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -77,34 +70,6 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) public class CoderTranslationTest { - @AutoValue - @DefaultSchema(AutoValueSchema.class) - public abstract static class SimpleAutoValue { - public abstract String getString(); - - public abstract int getInt32(); - - public abstract long getInt64(); - - public static SimpleAutoValue of(String string, Integer int32, Long int64) { - return new AutoValue_CoderTranslationTest_SimpleAutoValue(string, int32, int64); - } - } - - private static final SchemaRegistry REGISTRY = SchemaRegistry.createDefault(); - - private static SchemaCoder schemaCoderFrom(TypeDescriptor typeDescriptor) { - try { - return SchemaCoder.of( - REGISTRY.getSchema(typeDescriptor), - typeDescriptor, - REGISTRY.getToRowFunction(typeDescriptor), - REGISTRY.getFromRowFunction(typeDescriptor)); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } - private static final Set> KNOWN_CODERS = ImmutableSet.>builder() .add(ByteArrayCoder.of()) @@ -129,7 +94,6 @@ private static SchemaCoder schemaCoderFrom(TypeDescriptor typeDescriptor) { Field.of("array", FieldType.array(FieldType.STRING)), Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)), Field.of("bar", FieldType.logicalType(FixedBytes.of(123)))))) - .add(schemaCoderFrom(TypeDescriptor.of(SimpleAutoValue.class))) .add(ShardedKey.Coder.of(StringUtf8Coder.of())) .add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of())) .add(NullableCoder.of(ByteArrayCoder.of())) @@ -163,7 +127,7 @@ public void validateKnownCoders() { } @Test - public void validateModelCoderTranslators() { + public void validateCoderTranslators() { assertThat( "Every Model Coder must have a Translator", new ModelCoderRegistrar().getCoderURNs().keySet(), diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index c93de2014798..6ecb029c5d97 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -600,7 +600,7 @@ private Map loadRegisteredTransforms() { pipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read"); } else { LOG.warn( - "Using use_deprecated_read in portable runners is runner-dependent. The " + "Using use_depreacted_read in portable runners is runner-dependent. The " + "ExpansionService will respect that, but if your runner does not have support for " + "native Read transform, your Pipeline will fail during Pipeline submission."); } diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java index 8bd18fd8e250..14ab48f66699 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java @@ -21,11 +21,9 @@ import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.construction.CoderTranslator; import org.apache.beam.sdk.util.construction.CoderTranslatorRegistrar; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.checkerframework.checker.nullness.qual.Nullable; /** Coder registrar for AvroGenericCoder. */ @AutoService(CoderTranslatorRegistrar.class) @@ -44,20 +42,4 @@ public Map, String> getCoderURNs() { public Map, CoderTranslator> getCoderTranslators() { return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator()); } - - @Override - public boolean isKnownCoder(Coder coder, PipelineOptions options) { - return coder.getClass() == AvroGenericCoder.class; - } - - @Override - public @Nullable CoderTranslator getCoderTranslator( - Class coderClass) { - return coderClass == AvroGenericCoder.class ? new AvroGenericCoderTranslator() : null; - } - - @Override - public @Nullable Class getCoderForUrn(String coderUrn) { - return AVRO_CODER_URN.equals(coderUrn) ? AvroGenericCoder.class : null; - } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java index 42a6f8d11c2a..ef8d69bc1ec3 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.IterableLikeCoder; import org.apache.beam.sdk.fn.stream.PrefetchableIterable; import org.apache.beam.sdk.fn.stream.PrefetchableIterators; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.BufferedElementCountingOutputStream; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable; @@ -53,7 +52,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -302,26 +300,6 @@ public Map, String> getCoderUR getCoderTranslators() { return ImmutableMap.of(StateBackedIterable.Coder.class, new Translator()); } - - @Override - public boolean isKnownCoder( - org.apache.beam.sdk.coders.Coder coder, PipelineOptions options) { - return coder.getClass() == StateBackedIterable.Coder.class; - } - - @Override - public @Nullable CoderTranslator getCoderTranslator( - Class coderClass) { - return coderClass == StateBackedIterable.Coder.class ? new Translator() : null; - } - - @Override - public @Nullable Class getCoderForUrn( - String coderUrn) { - return STATE_BACKED_ITERABLE_CODER_URN.equals(coderUrn) - ? StateBackedIterable.Coder.class - : null; - } } /** From 130f8dbe427b55b93f365b9db6669ecc4444919e Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 15 May 2026 11:44:29 -0400 Subject: [PATCH 2/2] Revert "Sickbay two failed tests due to new schema coder urn. (#38497)" This reverts commit 3dbd7c8c35b6a396c5e6c6fed2a3b37d4f731252. --- .../beam_PostCommit_Java_ValidatesRunner_ULR.json | 3 +-- runners/portability/java/build.gradle | 5 ----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json index fbd81891f93b..6e2f429dd24e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json @@ -2,6 +2,5 @@ "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface", - "https://github.com/apache/beam/pull/38497": "sickbay two failed tests" + "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/runners/portability/java/build.gradle b/runners/portability/java/build.gradle index aa147e8426c4..6e3b431e802b 100644 --- a/runners/portability/java/build.gradle +++ b/runners/portability/java/build.gradle @@ -214,11 +214,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" // TODO(https://github.com/apache/beam/issues/31231) excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' - // TODO(https://github.com/apache/beam/issues/33859): Failed with "KeyError: 'beam:coder:schema:v1'". - // New schema coder urn is not yet supported in runners other than dataflow - excludeTestsMatching 'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithShuffle' - excludeTestsMatching 'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithoutShuffle' - for (String test : sickbayTests) { excludeTestsMatching test }