Skip to content

Commit

Permalink
[BEAM-13223] Documents two new features of cross-language transforms (#…
Browse files Browse the repository at this point in the history
…15941)

* Documents two new features of cross-language transforms

* Address reviewer comments
  • Loading branch information
chamikaramj committed Nov 13, 2021
1 parent a05ff52 commit fd00fc9
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
Expand Up @@ -18,6 +18,8 @@ limitations under the License.

With the samples on this page we will demonstrate how to create and leverage cross-language pipelines.

> **Note:** Please see the [Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation on cross-language transforms.
The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.

In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
Expand Down
146 changes: 146 additions & 0 deletions website/www/site/content/en/documentation/programming-guide.md
Expand Up @@ -6555,6 +6555,115 @@ In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/jav

#### 13.1.1. Creating cross-language Java transforms

There are two ways to make Java transforms available to other SDKs.

* Option 1: In some cases, you can use existing Java transforms from other SDKs without writing any additional Java code.
* Option 2: You can use arbitrary Java Transforms from other SDKs by adding a few Java classes.

##### 13.1.1.1 Using Existing Java Transforms from Other SDKs Without Writing more Java Code

Starting with Beam 2.34.0, Python SDK users can use some Java transforms without writing additional Java code. This can be useful in many cases. For example,
* A developer not familiar with Java may need to use an existing Java transform from a Python pipeline
* A developer may need to make an existing Java transform available to a Python pipeline without writing/releasing more Java code

> **Note:** This feature is currently only available when using Java transforms from a Python pipeline.
To be eligible for direct usage, the API of the Java transform has to follow the following pattern.
* Requirement 1: The Java transform can be constructed using an available public constructor or a public static method (a constructor method) in the same Java class.
* Requirement 2: The Java transform can be configured using one or more builder methods. Each builder method should be public and should return an instance of the Java transform.

See below for the structure of an example Java class that can be directly used from the Python API.

{{< highlight >}}
public class JavaDataGenerator extends PTransform<PBegin, PCollection<String>> {
. . .

// Following method satisfies the Requirement 1.
// Note that you may also use a class constructor instead of a static method.
public static JavaDataGenerator create(Integer size) {
return new JavaDataGenerator(size);
}

static class JavaDataGeneratorConfig implements Serializable {
public String prefix;
public long length;
public String suffix;
. . .
}

// Following method conforms to the Requirement 2
public JavaDataGenerator withJavaDataGeneratorConfig(JavaDataGeneratorConfig dataConfig) {
return new JavaDataGenerator(this.size, javaDataGeneratorConfig);
}

. . .
}
{{< /highlight >}}

To use a Java class that conforms to the above requirement from a Python SDK pipeline you may do the following.

* Step 1: create an allowlist file in the _yaml_ format that describes the Java transform classes and methods that will be directly accessed from Python.
* Step 2: start an Expansion Service with the `javaClassLookupAllowlistFile` option passing path to the allowlist defined in Step 1 as the value.
* Step 3: Use the Python [JavaExternalTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) API to directly
access Java transforms defined in the allowlist from the Python side.

Starting with Beam 2.35.0, Step 1 and 2 may be skipped as described in corresponding sections below.

##### Step 1

To use this Java transform from Python, you may define an allowlist file in the _yaml_ format. This allowlist lists the class names,
constructor methods, and builder methods that are directly available to be used from the Python side.

Starting with Beam 2.35.0, you have the option to specify `*` to the `javaClassLookupAllowlistFile` option instead of defining an actual allowlist which
denotes that all supported transforms in the classpath of the expansion service may be accessed through the API.

{{< highlight >}}
version: v1
allowedClasses:
- className: my.beam.transforms.JavaDataGenerator
allowedConstructorMethods:
- create
allowedBuilderMethods:
- withJavaDataGeneratorConfig
{{< /highlight >}}

##### Step 2

The allowlist is provided as an argument when starting up the Java expansion service. For example, the expansion service can be started
as a local Java process using the following command.

{{< highlight >}}
java -jar <jar file> <port> --javaClassLookupAllowlistFile=<path to the allowlist file>
{{< /highlight >}}

Starting with Beam 2.35.0, Beam ``JavaExternalTransform` API will automatically startup an expansion service with a given set of `jar` file dependencies
if an expansion service address was not provided.

##### Step 3

You can directly use the Java class from your Python pipeline using a stub transform created using the `JavaExternalTransform` API. This API allows you to construct the transform
using the Java class name and allows you to invoke builder methods to configure the class.

Constructor and method parameter types are mapped between Python and Java using a Beam Schema. The Schema is auto-generated using the object types
provided on the Python side. If the Java class constructor method or builder method accepts any complex object types, make sure that the Beam Schema
for these objects is registered and available for the Java expansion service. If a schema has not been registered, the Java expansion service will
try to register a schema using [JavaFieldSchema](https://beam.apache.org/documentation/programming-guide/#creating-schemas). In Python arbitrary objects
can be represented using `NamedTuple`s which will be represented as Beam Rows in the Schema. See below for a Python stub transform that represents the above
mentioned Java transform.

{{< highlight >}}
JavaDataGeneratorConfig = typing.NamedTuple(
'JavaDataGeneratorConfig', [('prefix', str), ('length', int), ('suffix', str)])
data_config = JavaDataGeneratorConfig(prefix='start', length=20, suffix='end')

java_transform = JavaExternalTransform(
'my.beam.transforms.JavaDataGenerator', expansion_service='localhost:<port>').create(numpy.int32(100)).withJavaDataGeneratorConfig(data_config)
{{< /highlight >}}

This transform can be used in a Python pipeline along with other Python transforms.

##### 13.1.1.2 Full API for Making Existing Java Transforms Available to Other SDKs

To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.

**Implementing the interfaces**
Expand Down Expand Up @@ -6752,6 +6861,43 @@ Currently Python external transforms are limited to dependencies available in co
Go currently does not support creating cross-language transforms, only using cross-language
transforms from other languages; see more at [BEAM-9923](https://issues.apache.org/jira/browse/BEAM-9923).

#### 13.1.4. Selecting a URN for Cross-language Transforms

Developing a cross-language transform involves defining a URN for registering the transform with an expansion service. In this section
we provide a convention for defining such URNs. Following this convention is optional but it will ensure that your transform
will not run into conflicts when registering in an expansion service along with transforms developed by other developers.

##### Schema

A URN should consist of the following components:
* ns-id: A namespace identifier. Default recommendation is `beam:transform`.
* org-identifier: Identifies the organization where the transform was defined. Transforms defined in Apache Beam use `org.apache.beam` for this.
* functionality-identifier - Identifies the functionality of the cross-language transform.
* version - a version number for the transform

We provide the schema from the URN convention in [augmented Backus–Naur](https://en.wikipedia.org/wiki/Augmented_Backus%E2%80%93Naur_form) form.
Keywords in upper case are from the [URN spec](https://datatracker.ietf.org/doc/html/rfc8141).

{{< highlight >}}
transform-urn = ns-id “:” org-identifier “:” functionality-identifier “:” version
ns-id = (“beam” / NID) “:” “transform”
id-char = ALPHA / DIGIT / "-" / "." / "_" / "~" ; A subset of characters allowed in a URN
org-identifier = 1*id-char
functionality-identifier = 1*id-char
version = “v” 1*(DIGIT / “.”) ; For example, ‘v1.2’
{{< /highlight >}}

##### Examples

Below we’ve given some example transform classes and corresponding URNs to be used.

* A transform offered with Apache Beam that writes Parquet files.
* `beam:transform:org.apache.beam:parquet_write:v1`
* A transform offered with Apache Beam that reads from Kafka with metadata.
* `beam:transform:org.apache.beam:kafka_read_with_metadata:v1`
* A transform developed by organization abc.org that reads from data store MyDatastore.
* `beam:transform:org.abc:mydatastore_read:v1`

### 13.2. Using cross-language transforms {#use-x-lang-transforms}

Depending on the SDK language of the pipeline, you can use a high-level SDK-wrapper class, or a low-level transform class to access a cross-language transform.
Expand Down

0 comments on commit fd00fc9

Please sign in to comment.