Skip to content

A tutorial repo for demonstrating how to create a Java Xlang transform and use it in Beam YAML

License

Notifications You must be signed in to change notification settings

Polber/beam-yaml-xlang

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Creating a Beam Java Transform Catalog and Using in Beam YAML

Prerequisites

To complete this tutorial, you must have the following software installed:

  • Java 11 or later
  • Apache Maven 3.6 or later

Overview

The purpose of this tutorial is to introduce the fundamental concepts of the Cross-Language framework that is leveraged by Beam YAML to allow a user to specify Beam Java transforms through the use of transform providers such that the transforms can be easily defined in a Beam YAML pipeline.

As we walk through these concepts, we will be constructing a Transform called ToUpperCase that will take in a single parameter field, which represents a field in the collection of elements, and modify that field by converting the string to uppercase.

There are four main steps to follow:

  1. Define the transformation itself as a PTransform that consumes and produces any number of schema'd PCollections.
  2. Expose this transform via a SchemaTransformProvider which provides an identifier used to refer to this transform later as well as metadata like a human-readable description and its configuration parameters.
  3. Build a Jar that contains these classes and vends them via the Service Loader infrastructure.
  4. Write a provider specification that tells Beam YAML where to find this jar and what it contains.

Project Structure

The project structure for this tutorial is as follows:

MyExternalTransforms
├── pom.xml
└── src
    └── main
        └── java
            └── org
                └── example
                    ├── ToUpperCaseTransformProvider.java
                    └── SkeletonSchemaProvider.java

Here is a brief description of each file:

  • pom.xml: The Maven project configuration file.
  • SkeletonSchemaProvider.java: The Java class that contains the bare-minimum skeleton code for implementing a SchemaTransform Identity function.
  • ToUpperCaseTransformProvider.java: The Java class that contains the SchemaTransform to be used in the Beam YAML pipeline. This project structure assumes that the java module is org.example, but any module path can be used so long as the project structure matches.

Creating the pom.xml file

A pom.xml file, which stands for Project Object Model, is an essential file used in Maven projects. It's an XML file that contains all the critical information about a project, including its configuration details for Maven to build it successfully. This file specifies things like the project's name, version, dependencies on other libraries, and how the project should be packaged (e.g., JAR file).

Since this tutorial won’t cover all the details about Maven and pom.xml, here's a link to the official documentation for more details: https://maven.apache.org/pom.html

Minimal pom.xml

A minimal pom.xml file can be found in the project repo here.

Writing the External Transform

Writing a transform that is compatible with the Beam YAML framework requires leveraging Beam’s cross-language framework, and more specifically, the SchemaTransformProvider interface (and even more specifically, the TypedSchemaTransformProvider interface).

This framework relies on creating a PTransform that operates solely on Beam Row’s - a schema-aware data type built into Beam that is capable of being translated across SDK’s. Leveraging the SchemaTransformProvider interface removes the need to write a lot of the boilerplate code required to translate data across the SDK’s, allowing us to focus on the transform functionality itself.

SchemaTransformProvider Skeleton Code

package org.example;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.NonNull;
@AutoService(SchemaTransformProvider.class)
public class SkeletonSchemaProvider
extends TypedSchemaTransformProvider<SkeletonSchemaProvider.Configuration> {
private static final String INPUT_ROWS_TAG = "input";
private static final String OUTPUT_ROWS_TAG = "output";
@Override
protected @NonNull Class<Configuration> configurationClass() {
return Configuration.class;
}
@Override
public @NonNull String identifier() {
return "some:urn:transform_name:v1";
}
@Override
public @NonNull String description() {
return "An example transform description.";
}
@Override
public @NonNull List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_ROWS_TAG);
}
@Override
public @NonNull List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_ROWS_TAG);
}
@Override
protected @NonNull SchemaTransform from(Configuration configuration) {
return new Identity();
}
protected static class Identity extends SchemaTransform {
private static class IdentityDoFn extends DoFn<Row, Row> {
@ProcessElement
public void processElement(@Element Row element, OutputReceiver<Row> out) {
out.output(Row.fromRow(element).build());
}
}
@Override
public @NonNull PCollectionRowTuple expand(PCollectionRowTuple input) {
// Get input rows
PCollection<Row> inputRows = input.get(INPUT_ROWS_TAG);
// Apply the PTransform
PCollection<Row> outputRows =
inputRows
.apply("Identity", ParDo.of(new IdentityDoFn()))
.setRowSchema(inputRows.getSchema());
// Construct output collection and tag successful records
return PCollectionRowTuple.of(OUTPUT_ROWS_TAG, outputRows);
}
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration implements Serializable {
public static Builder builder() {
return new AutoValue_SkeletonSchemaProvider_Configuration.Builder();
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Configuration build();
}
}
}

This is the bare minimum code (excluding import and package) to create a SchemaTransformProvider that can be used by the cross-language framework, and therefore allowing the SchemaTransform defined within it to be defined in any Beam YAML pipeline. In this case, the transform will act as an Identity function and will output the input collection of elements with no alteration.

Let’s start by breaking down the top-level methods that are required to be compatible with the SchemaTransformProvider interface.

configurationClass()

@Override
protected @NonNull Class<Configuration> configurationClass() {
return Configuration.class;
}

The configurationClass() method is responsible for telling the cross-language framework which Java class defines the input parameters to the Transform. The Configuration class we defined in the skeleton code will be revisited in depth later.

identifier()

@Override
public @NonNull String identifier() {
return "some:urn:transform_name:v1";
}

The identifier() method defines a unique identifier for the Transform. It is important to ensure that this name does not collide with any other Transform URN that will be given to the External Transform service, both those built-in to Beam, and any that are defined in a custom catalog such as this.

inputCollectionNames()

@Override
public @NonNull List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_ROWS_TAG);
}

The inputCollectionNames() method returns a list of expected input names for the tagged input collections. In most cases, especially in Beam YAML, there will be a collection of input elements that are tagged “input”. It is acceptable to use different names in this method as it is not actually used as a contract between SDK's, but what is important to note is that Beam YAML will be sending a collection that is tagged "input" to the transform. It is best to use this method definition with the macros defined at the top of the file.

outputCollectionNames()

@Override
public @NonNull List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_ROWS_TAG);
}

The outputCollectionNames() method returns a list of output names for the tagged output collections. Similar to inputCollectionNames(), there will be a collection of output elements that are tagged “output”, but in most cases, there will also be a subset of elements tagged with whatever was defined in the error_handling section of the transform config in Beam YAML. Since this method is also not used as a contract, it is best to use this method definition with the macros defined at the top of the file.

from()

@Override
protected @NonNull SchemaTransform from(Configuration configuration) {
return new Identity();
}

The from() method is the method that is responsible for returning the SchemaTransform itself. This transform is the PTransform that will actually perform the transform on the incoming collection of elements. Since it is a PTransform, it requires one method - expand() which defines the expansion of the transform and includes the DoFn.

description()

@Override
public @NonNull String description() {
return "An example transform description.";
}

The optional description() method is where a description of the transform can be written. This description is largely unused by the Beam YAML framework, but is useful for generating docs when used in conjunction with the generate_yaml_docs.py script. This is useful when generating docs for a transform catalog. For example, the Beam YAML transform glossary.

ToUpperCaseProvider Configuration Class

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration implements Serializable {
@SchemaFieldDescription(
"The field in the input collection to perform the uppercase operation on. "
+ "This field must be a String.")
public abstract String getField();
@Nullable
public abstract ErrorHandling getErrorHandling();
public void validate() {
checkArgument(!"metadata".equals(getField()), "Cannot modify field 'metadata'");
}
public static Builder builder() {
return new AutoValue_ToUpperCaseTransformProvider_Configuration.Builder();
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setField(String field);
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
public abstract Configuration build();
}
}

The Configuration class is responsible for defining the parameters to the transform. This AutoValue class is annotated with the AutoValueSchema interface to generate the schema for the transform. This interface scrapes the inputs by using all the getter methods. In our ToUpperCase example, there is one input parameter, field, that specifies the field in the input collection to perform the operation on. So, we define a getter method, getField, that will tell the AutoValueSchema class about our input parameter.

Likewise, the Configuration class needs a Builder subclass annotated with AutoValue.Builder so that the AutoValue class can be instantiated. This builder needs a setter method for each subsequent getter method in the parent class.

Optional parameters should be annotated with the @Nullable annotation. Required parameters, therefore, should omit this annotation.

The @SchemaFieldDescription annotation can also be optionally used to define the parameter. This is also passed to the Beam YAML framework and can be used in conjunction with the generate_yaml_docs.py script to generate docs for the transform. This is useful when generating docs for a transform catalog. For example, the Beam YAML transform glossary.

Error Handling

In Beam YAML, there is a built-in error handling framework that allows a transform to consume the error output from a transform, both in the turnkey Beam YAML transforms and in compatible External Transforms. This error output could be any Exceptions that are caught and tagged, or any custom logic that would cause an element to be treated as an error.

To make a transform compatible with this framework, one must take the ErrorHandling object as an input to the transform, and therefore define it in this Configuration.

Validation

One last optional method for the Configuration class is the validate() method. This method is responsible for checking the input parameters to the transform. In our example, we assume there is a field metadata in the input collection that cannot be modified. So, we perform a check on the field parameter to verify the user is not attempting to modify this field. This method can also be useful for checking dependent inputs, (i.e. parameter A is required if parameter B is specified).

ToUpperCaseProvider SchemaTransform Class

protected static class ToUpperCaseTransform extends SchemaTransform {
private static final TupleTag<Row> successValues = new TupleTag<Row>() {};
private static final TupleTag<Row> errorValues = new TupleTag<Row>() {};
private final Configuration configuration;
private ToUpperCaseTransform(Configuration configuration) {
// Validate the transform config before expansion
configuration.validate();
this.configuration = configuration;
}
private static DoFn<Row, Row> createDoFn(String field, boolean handleErrors, Schema errorSchema) {
return new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row inputRow, MultiOutputReceiver out) {
try {
// Apply toUpperCase() to given field and tag successful records
Row output =
Row.fromRow(inputRow)
.withFieldValue(
field, Objects.requireNonNull(inputRow.getString(field)).toUpperCase())
.build();
out.get(successValues).output(output);
} catch (Exception e) {
if (handleErrors) {
// Catch any errors and tag with error tag if error_handling is specified
out.get(errorValues).output(ErrorHandling.errorRecord(errorSchema, inputRow, e));
} else {
// Throw error if error_handling was not specified
throw new RuntimeException(e);
}
}
}
};
}
@Override
public @NonNull PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) {
// Get input rows
PCollection<Row> inputRows = input.get(INPUT_ROWS_TAG);
// Get input row schema and construct error schema from it
Schema inputSchema = inputRows.getSchema();
Schema errorSchema = ErrorHandling.errorSchema(inputSchema);
// Determine if error_handling was specified
boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
// Apply the PTransform
PCollectionTuple output =
inputRows.apply(
"ToUpperCase",
ParDo.of(createDoFn(configuration.getField(), handleErrors, errorSchema))
.withOutputTags(successValues, TupleTagList.of(errorValues)));
// Set the schemas for successful records and error records.
// This is needed so runner can translate the element schema across SDK's
output.get(successValues).setRowSchema(inputSchema);
output.get(errorValues).setRowSchema(errorSchema);
// Construct output collection and tag successful records
PCollectionRowTuple result =
PCollectionRowTuple.of(OUTPUT_ROWS_TAG, output.get(successValues));
if (handleErrors) {
// Add tagged error records to output collection if error_handling was specified
result =
result.and(
Objects.requireNonNull(configuration.getErrorHandling()).getOutput(),
output.get(errorValues));
}
return result;
}
}
}

This is the class that will define the actual transform that is performed on the incoming PCollection. Let’s first take a look at the expand() function.

@Override
public @NonNull PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) {
// Get input rows
PCollection<Row> inputRows = input.get(INPUT_ROWS_TAG);
// Get input row schema and construct error schema from it
Schema inputSchema = inputRows.getSchema();
Schema errorSchema = ErrorHandling.errorSchema(inputSchema);
// Determine if error_handling was specified
boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
// Apply the PTransform
PCollectionTuple output =
inputRows.apply(
"ToUpperCase",
ParDo.of(createDoFn(configuration.getField(), handleErrors, errorSchema))
.withOutputTags(successValues, TupleTagList.of(errorValues)));
// Set the schemas for successful records and error records.
// This is needed so runner can translate the element schema across SDK's
output.get(successValues).setRowSchema(inputSchema);
output.get(errorValues).setRowSchema(errorSchema);
// Construct output collection and tag successful records
PCollectionRowTuple result =
PCollectionRowTuple.of(OUTPUT_ROWS_TAG, output.get(successValues));
if (handleErrors) {
// Add tagged error records to output collection if error_handling was specified
result =
result.and(
Objects.requireNonNull(configuration.getErrorHandling()).getOutput(),
output.get(errorValues));
}
return result;
}

Every incoming PCollectionRowTuple is essentially a tagged collection. As stated before, in most cases, with context to Beam YAML, there will only be one tag, “input”. To get the PCollection of Row elements, we first need to unpack these from the PCollectionRowTuple using the “input” tag.

From this collection of elements, the schema of the input collection can be obtained. This is useful for assembling our output collections, since their schema will be based on this schema. In the case of the successful records, the schema will remain unchanged (since we are modifying a single field in-place), and the error records will use a schema that essentially wraps the original schema with a couple error-specific fields as defined by errorSchema.

Whether to do error_handling is determined by the ErrorHandling class. If error handling is specified in the config of the transform in the Beam YAML pipeline, then exceptions will be caught and stored in an “error”-tagged output as opposed to thrown at Runtime.

Next, the PTransform is actually applied. The DoFn will be detailed more below, but what is important to note here is that the output is tagged using two arbitrary TupleTag objects - one for successful records, and one for error records.

After the PTransform is applied, it is important to ensure that both output collections have their schema configured so that the cross-language service can encode and decode the objects back to the Beam YAML SDK.

Finally, the resulting PCollectionRowTuple must be constructed. The successful records should be stored and tagged “output” regardless of if error_handling was specified, and if error_handling was specified, it can be appended to the same PCollectionRowTuple and tagged according to the output specified by the error_handling config in the Beam YAML pipeline.

private static DoFn<Row, Row> createDoFn(String field, boolean handleErrors, Schema errorSchema) {
return new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row inputRow, MultiOutputReceiver out) {
try {
// Apply toUpperCase() to given field and tag successful records
Row output =
Row.fromRow(inputRow)
.withFieldValue(
field, Objects.requireNonNull(inputRow.getString(field)).toUpperCase())
.build();
out.get(successValues).output(output);
} catch (Exception e) {
if (handleErrors) {
// Catch any errors and tag with error tag if error_handling is specified
out.get(errorValues).output(ErrorHandling.errorRecord(errorSchema, inputRow, e));
} else {
// Throw error if error_handling was not specified
throw new RuntimeException(e);
}
}
}
};
}

Now, taking a look at the createDoFn() method that is responsible for constructing and returning the actual DoFn, you will notice that the only thing that makes this special versus any other DoFn you may write, is that it checks to see if error_handling was specified, constructs an error record from the input records and tags those records using the arbitrary error TupleTag discussed previously. Aside from that, this DoFn is simply taking the input Row, applying the .toUpperCase() method to the field within the Row and tagging all successful records using the arbitrary successful TupleTag also discussed previously.

Building the Transform Catalog JAR

At this point, you should have all the code as defined in the previous section ready to go, and it is now time to build the JAR that will be provided to the Beam YAML pipeline.

From the root directory, run the following command:

mvn package

This will create a JAR under target called xlang-transforms-bundled-1.0-SNAPSHOT.jar that contains the ToUpperCaseTransformProvider along with its dependencies and the external transform service. The external expansion service is what will be invoked by the Beam YAML SDK to import the transform schema and run the expansion service for the transform.

beam-yaml-xlang/pom.xml

Lines 85 to 87 in 95abf08

<finalName>
${project.artifactId}-bundled-${project.version}
</finalName>
Note: The name of the jar is configurable using the finalName tag in the maven-shade-plugin configuration.

Defining the Transform in Beam YAML

Now that you have a JAR file that contains the transform catalog, it is time to include it as part of your Beam YAML pipeline. This is done using providers - these providers allow one to define a suite of transforms in a given JAR or python package that can be used within the Beam YAML pipeline.

We will be utilizing the javaJar provider as we are planning to keep the names of the config parameters as they are defined in the transform.

For our example, that looks as follows:

providers:
  - type: javaJar
    config:
      jar: xlang-transforms-bundled-1.0-SNAPSHOT.jar
    transforms:
      ToUpperCase: "some:urn:to_upper_case:v1"
      Identity: "some:urn:transform_name:v1"

For transforms where one might want to rename the config parameters, the renaming provider allows us to map the transform parameters to alias names. Otherwise the config parameters will inherit the same name that is defined in Java, with camelCase being converted to snake_case. For example, errorHandling will be called error_handling in the YAML config. If there was a parameter table_spec, and we wanted to call in table in the YAML config. We could use the renaming provider to map the alias.

More robust examples of the renaming provider can be found here.

Now, ToUpperCase can be defined as a transform in the Beam YAML pipeline with the single config parameter - field.

A full example:

pipeline:
  type: chain
  transforms:
    - type: Create
      config:
        elements:
          - name: "john"
            id: 1
          - name: "jane"
            id: 2
    - type: Identity
    - type: ToUpperCase
      config:
        field: "name"
    - type: LogForTesting

providers:
  - type: javaJar
    config:
      jar: xlang-transforms-bundled-1.0-SNAPSHOT.jar
    transforms:
      ToUpperCase: "some:urn:to_upper_case:v1"
      Identity: "some:urn:transform_name:v1"

Expected logs:

message: "{\"name\":\"JOHN\",\"id\":1}"
message: "{\"name\":\"JANE\",\"id\":2}"

Note: Beam YAML will choose the Java implementation of the LogForTesting transform to reduce language switching. The output can get a bit crowded, but look for the logs in the commented “Expected” section at the bottom of the YAML file.

An example with errors caught and handled:

pipeline:
  transforms:
    - type: Create
      config:
        elements:
          - name: "john"
            id: 1
          - name: "jane"
            id: 2
    - type: ToUpperCase
      input: Create
      config:
        field: "unknown"
        error_handling:
          output: errors
    - type: LogForTesting
      input: ToUpperCase
    - type: LogForTesting
      input: ToUpperCase.errors

providers:
  - type: javaJar
    config:
      jar: xlang-transforms-bundled-1.0-SNAPSHOT.jar
    transforms:
      ToUpperCase: "some:urn:to_upper_case:v1"
      Identity: "some:urn:transform_name:v1"

If you have Beam Python installed, you can test this pipeline out locally with

python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml

or if you have gcloud installed you can run this on dataflow with

gcloud dataflow yaml run $JOB_NAME --yaml-pipeline-file=pipeline.yaml --region=$REGION

(Note in this case you will need to upload your jar to a gcs bucket or publish it elsewhere as a globally-accessible URL so it is available to the dataflow service.)

About

A tutorial repo for demonstrating how to create a Java Xlang transform and use it in Beam YAML

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages