Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
"https://github.com/apache/beam/pull/38497": "sickbay two failed tests"
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
5 changes: 1 addition & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@

## Breaking Changes

* Portable Java SDK now encodes SchemaCoders in a portable way ([#34672](https://github.com/apache/beam/issues/34672)).
- Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.73")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47) ([#34672](https://github.com/apache/beam/issues/34672)).
- Fixes ([#36496](https://github.com/apache/beam/issues/36496)), ([#30276](https://github.com/apache/beam/issues/30276)), ([#29245](https://github.com/apache/beam/issues/29245)).
* (Python) Made Beartype the default fallback type checking tool. This can be disabled with the `--disable_beartype` pipeline option. ([#38275](https://github.com/apache/beam/issues/38275))
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

Expand Down Expand Up @@ -2441,4 +2438,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss

## Highlights

- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The file is missing a newline at the end. It is a best practice to end text files with a newline character to ensure compatibility with various Unix tools and to avoid 'No newline at end of file' warnings in diffs.

Suggested change
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).

Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public Boolean visit(OrFinallyTrigger trigger) {
private static byte[] serializeWindowingStrategy(
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
try {
SdkComponents sdkComponents = SdkComponents.create(options);
SdkComponents sdkComponents = SdkComponents.create();

String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,20 +1333,19 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// with the SDK harness image (which implements Fn API).
//
// The same Environment is used in different and contradictory ways, depending on whether
// it is a portable or non-portable job submission.
// it is a v1 or v2 job submission.
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);

// The SdkComponents for portable and non-portable job submission must be kept distinct. Both
// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a typo in the comment: an should be and.

Suggested change
// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
// The SdkComponents for portable and non-portable job submission must be kept distinct. Both

// need the default environment.
SdkComponents portableComponents =
SdkComponents.create(
options,
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
SdkComponents portableComponents = SdkComponents.create();
portableComponents.registerEnvironment(
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());

RunnerApi.Pipeline portablePipelineProto =
PipelineTranslation.toProto(pipeline, portableComponents, false);
Expand Down Expand Up @@ -1375,30 +1374,28 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);

if (useUnifiedWorker(options)) {
LOG.info(
"Skipping non-portable transform replacements since job will run on portable worker.");
LOG.info("Skipping v1 transform replacements since job will run on v2.");
} else {
// Now rewrite things to be as needed for non-portable (mutates the pipeline).
// This way the job submitted is valid for portable and non-portable, simultaneously.
// Now rewrite things to be as needed for v1 (mutates the pipeline)
// This way the job submitted is valid for v1 and v2, simultaneously
Comment on lines +1379 to +1380
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Comments should end with a period for better readability and consistency with the rest of the codebase.

Suggested change
// Now rewrite things to be as needed for v1 (mutates the pipeline)
// This way the job submitted is valid for v1 and v2, simultaneously
// Now rewrite things to be as needed for v1 (mutates the pipeline).
// This way the job submitted is valid for v1 and v2, simultaneously.

replaceV1Transforms(pipeline);
}
// Capture the SdkComponents for look up during step translations.
SdkComponents dataflowNonPortableComponents =
SdkComponents.create(
options,
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
// No need to perform transform upgrading for the non-portable runner proto.
RunnerApi.Pipeline dataflowNonPortablePipelineProto =
PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false);
// Capture the SdkComponents for look up during step translations
SdkComponents dataflowV1Components = SdkComponents.create();
dataflowV1Components.registerEnvironment(
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
// No need to perform transform upgrading for the Runner v1 proto.
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Dataflow non-portable worker pipeline proto:\n{}",
TextFormat.printer().printToString(dataflowNonPortablePipelineProto));
"Dataflow v1 pipeline proto:\n{}",
TextFormat.printer().printToString(dataflowV1PipelineProto));
}

// Set a unique client_request_id in the CreateJob request.
Expand All @@ -1418,11 +1415,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {

JobSpecification jobSpecification =
translator.translate(
pipeline,
dataflowNonPortablePipelineProto,
dataflowNonPortableComponents,
this,
packages);
pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages);

if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) {
List<String> experiments =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -93,8 +92,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
Expand Down Expand Up @@ -169,11 +166,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();

private SdkComponents createSdkComponents(PipelineOptions options) {
SdkComponents sdkComponents = SdkComponents.create();

String containerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(containerImageURL);
return SdkComponents.create(options, defaultEnvironmentForDataflow);

sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
return sdkComponents;
}

// A Custom Mockito matcher for an initial Job that checks that all
Expand Down Expand Up @@ -1293,16 +1294,15 @@ public String apply(byte[] input) {
file1.deleteOnExit();
File file2 = File.createTempFile("file2-", ".txt");
file2.deleteOnExit();
SdkComponents sdkComponents =
SdkComponents.create(
options,
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
.toBuilder()
.addAllDependencies(
Environments.getArtifacts(
ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2)))
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
.toBuilder()
.addAllDependencies(
Environments.getArtifacts(
ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2)))
.addAllCapabilities(Environments.getJavaCapabilities())
.build());

RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);

Expand Down Expand Up @@ -1870,53 +1870,4 @@ public OffsetRange getInitialRange(@SuppressWarnings("unused") @Element String e
return null;
}
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class SimpleAutoValue {
public abstract String getString();

public abstract int getInt32();

public abstract long getInt64();

public static DataflowPipelineTranslatorTest.SimpleAutoValue of(
String string, int int32, long int64) {
return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64);
}
}

@Test
public void testSchemaCoderTranslation() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(Impulse.create())
.apply(
MapElements.via(
new SimpleFunction<byte[], SimpleAutoValue>() {
@Override
public SimpleAutoValue apply(byte[] input) {
return SimpleAutoValue.of("foo", 5, 10L);
}
}))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
{
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
assertTrue(coders.containsKey("SchemaCoder"));
assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn());
}

// Prior to version 2.74, SchemaCoders are translated as custom java coders.
{
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.73");
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
assertTrue(coders.containsKey("SchemaCoder"));
assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exce
// Add another stateful stage with a non-standard key coder
Pipeline p = Pipeline.create();
Coder<Void> keycoder = VoidCoder.of();
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), is(false));
assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false));
p.apply("impulse", Impulse.create())
.apply(
"create",
Expand Down Expand Up @@ -166,8 +165,7 @@ public void onTimer() {}
public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception {
Pipeline p = Pipeline.create();
Coder<Void> voidCoder = VoidCoder.of();
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), is(false));
assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false));
p.apply("impulse", Impulse.create())
.apply(
ParDo.of(
Expand Down
5 changes: 0 additions & 5 deletions runners/portability/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = ""
// TODO(https://github.com/apache/beam/issues/31231)
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'

// TODO(https://github.com/apache/beam/issues/33859): Failed with "KeyError: 'beam:coder:schema:v1'".
// New schema coder urn is not yet supported in runners other than dataflow
excludeTestsMatching 'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithShuffle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithoutShuffle'

for (String test : sickbayTests) {
excludeTestsMatching test
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.dataflow.qual.Deterministic;
Expand Down Expand Up @@ -64,8 +62,6 @@ private static class DefaultTranslationContext implements TranslationContext {}

private static @MonotonicNonNull BiMap<Class<? extends Coder>, String> knownCoderUrns;

private static @MonotonicNonNull List<CoderTranslatorRegistrar> coderTranslatorRegistrars;

private static @MonotonicNonNull Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
knownTranslators;

Expand All @@ -84,53 +80,6 @@ static BiMap<Class<? extends Coder>, String> getKnownCoderUrns() {
return knownCoderUrns;
}

private static void initializeCoderTranslatorRegistrars() {
ImmutableList.Builder<CoderTranslatorRegistrar> registrars = ImmutableList.builder();
for (CoderTranslatorRegistrar coderTranslatorRegistrar :
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
registrars.add(coderTranslatorRegistrar);
}
coderTranslatorRegistrars = registrars.build();
}

static boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
if (registrar.isKnownCoder(coder, options)) {
return true;
}
}
return false;
}

static CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
CoderTranslator translator = registrar.getCoderTranslator(coderClass);
if (translator != null) {
return translator;
}
}
return null;
}

static Class<? extends Coder> getCoderForUrn(String coderUrn) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
Class<? extends Coder> coder = registrar.getCoderForUrn(coderUrn);
if (coder != null) {
return coder;
}
}
return null;
}

@VisibleForTesting
@Deterministic
static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getKnownTranslators() {
Expand Down Expand Up @@ -158,7 +107,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOE

public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents components)
throws IOException {
if (isKnownCoder(coder, components.getPipelineOptions())) {
if (getKnownCoderUrns().containsKey(coder.getClass())) {
return toKnownCoder(coder, components);
}

Expand All @@ -180,10 +129,7 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder)

private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
throws IOException {
CoderTranslator translator = getCoderTranslator(coder.getClass());
if (translator == null) {
throw new IOException("Unable to find CoderTranslator for known Coder");
}
CoderTranslator translator = getKnownTranslators().get(coder.getClass());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The explicit null check and descriptive IOException were removed. If a translator is missing for a known coder, this will now result in a NullPointerException during the call to registerComponents. It is safer to keep the check and throw a descriptive exception to aid debugging.

    CoderTranslator translator = getKnownTranslators().get(coder.getClass());
    if (translator == null) {
      throw new IOException("Unable to find CoderTranslator for known Coder: " + coder.getClass());
    }

List<String> componentIds = registerComponents(coder, translator, components);
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
Expand Down Expand Up @@ -240,8 +186,8 @@ private static Coder<?> fromKnownCoder(
components.getComponents().getCodersOrThrow(componentId), components, context);
coderComponents.add(innerCoder);
}
Class<? extends Coder> coderType = getCoderForUrn(coderUrn);
CoderTranslator<?> translator = getCoderTranslator(coderType);
Class<? extends Coder> coderType = getKnownCoderUrns().inverse().get(coderUrn);
CoderTranslator<?> translator = getKnownTranslators().get(coderType);
if (translator != null) {
return translator.fromComponents(
coderComponents, coder.getSpec().getPayload().toByteArray(), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
* additional payload, which is not currently supported. This exists as a temporary measure.
*/
public interface CoderTranslator<T extends Coder<?>> {

/** Extract all component {@link Coder coders} within a coder. */
List<? extends Coder<?>> getComponents(T from);

Expand Down
Loading
Loading