Skip to content

Commit

Permalink
[BEAM-12769] Adds support for expanding a Java cross-language transfo…
Browse files Browse the repository at this point in the history
…rm using the class name and builder methods (apache#15343)

* Adds support for expanding a Java cross-language transform using the class name and builder methods

* Adds an allowlist and adds support for annotations

* Fix tests

* Address CheckerFramework errors

* Adds license

* Addresses reviewer comments.

* Apply suggestions from code review

Co-authored-by: Lukasz Cwik <lcwik@google.com>

* Addresses reviewer comments.

* Updated the proto to include a single schema/payload for constructor and each builder method.
Updated the implementation accordingly and added additional tests.

* Some doc updates and few other minor updates.

* Addressing reviewer comments

Co-authored-by: Lukasz Cwik <lcwik@google.com>
  • Loading branch information
2 people authored and calvinleungyk committed Sep 22, 2021
1 parent f2b9a13 commit e0d6332
Show file tree
Hide file tree
Showing 10 changed files with 1,941 additions and 18 deletions.
63 changes: 63 additions & 0 deletions model/pipeline/src/main/proto/external_transforms.proto
Expand Up @@ -29,6 +29,7 @@ option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "ExternalTransforms";

import "schema.proto";
import "beam_runner_api.proto";

// A configuration payload for an external transform.
// Used as the payload of ExternalTransform as part of an ExpansionRequest.
Expand All @@ -40,3 +41,65 @@ message ExternalConfigurationPayload {
// schema.
bytes payload = 2;
}

// Defines specific expansion methods that may be used to expand cross-language
// transforms.
// Has to be set as the URN of the transform of the expansion request.
message ExpansionMethods {
enum Enum {
// Expand a Java transform using specified constructor and builder methods.
// Transform payload will be of type JavaClassLookupPayload.
JAVA_CLASS_LOOKUP = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:expansion:payload:java_class_lookup:v1"];
}
}

// A configuration payload for an external transform.
// Used to define a Java transform that can be directly instantiated by a Java
// expansion service.
message JavaClassLookupPayload {
// Name of the Java transform class.
string class_name = 1;

// A static method to construct the initial instance of the transform.
// If not provided, the transform should be instantiated using a class
// constructor.
string constructor_method = 2;

// The top level fields of the schema represent the method parameters in
// order.
// If able, top level field names are also verified against the method
// parameters for a match.
Schema constructor_schema = 3;

// A payload which can be decoded using beam:coder:row:v1 and the provided
// constructor schema.
bytes constructor_payload = 4;

// Set of builder methods and corresponding parameters to apply after the
// transform object is constructed.
// When constructing the transform object, given builder methods will be
// applied in order.
repeated BuilderMethod builder_methods = 5;
}

// This represents a builder method of the transform class that should be
// applied in-order after instantiating the initial transform object.
// Each builder method may take one or more parameters and has to return an
// instance of the transform object.
message BuilderMethod {
// Name of the builder method
string name = 1;

// The top level fields of the schema represent the method parameters in
// order.
// If able, top level field names are also verified against the method
// parameters for a match.
Schema schema = 2;

// A payload which can be decoded using beam:coder:row:v1 and the builder
// method schema.
bytes payload = 3;
}


3 changes: 3 additions & 0 deletions sdks/java/expansion-service/build.gradle
Expand Up @@ -38,6 +38,9 @@ dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(path: ":runners:core-construction-java")
compile project(path: ":runners:java-fn-execution")
compile library.java.jackson_annotations
compile library.java.jackson_databind
compile library.java.jackson_dataformat_yaml
compile library.java.vendored_grpc_1_36_0
compile library.java.vendored_guava_26_0_jre
compile library.java.slf4j_api
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.expansion.service;

import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

Expand All @@ -35,8 +36,10 @@
import java.util.stream.Collectors;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods;
import org.apache.beam.model.pipeline.v1.ExternalTransforms.ExternalConfigurationPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
Expand All @@ -49,6 +52,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.AllowList;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand All @@ -70,6 +74,7 @@
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -172,8 +177,8 @@ private static <ConfigT> Class<ConfigT> getConfigClass(
return configurationClass;
}

private static <ConfigT> Row decodeRow(ExternalConfigurationPayload payload) {
Schema payloadSchema = SchemaTranslation.schemaFromProto(payload.getSchema());
static <ConfigT> Row decodeConfigObjectRow(SchemaApi.Schema schema, ByteString payload) {
Schema payloadSchema = SchemaTranslation.schemaFromProto(schema);

if (payloadSchema.getFieldCount() == 0) {
return Row.withSchema(Schema.of()).build();
Expand All @@ -200,7 +205,7 @@ private static <ConfigT> Row decodeRow(ExternalConfigurationPayload payload) {

Row configRow;
try {
configRow = RowCoder.of(payloadSchema).decode(payload.getPayload().newInput());
configRow = RowCoder.of(payloadSchema).decode(payload.newInput());
} catch (IOException e) {
throw new RuntimeException("Error decoding payload", e);
}
Expand Down Expand Up @@ -247,7 +252,7 @@ private static <ConfigT> ConfigT payloadToConfigSchema(
SerializableFunction<Row, ConfigT> fromRowFunc =
SCHEMA_REGISTRY.getFromRowFunction(configurationClass);

Row payloadRow = decodeRow(payload);
Row payloadRow = decodeConfigObjectRow(payload.getSchema(), payload.getPayload());

if (!payloadRow.getSchema().assignableTo(configSchema)) {
throw new IllegalArgumentException(
Expand All @@ -263,7 +268,7 @@ private static <ConfigT> ConfigT payloadToConfigSchema(
private static <ConfigT> ConfigT payloadToConfigSetters(
ExternalConfigurationPayload payload, Class<ConfigT> configurationClass)
throws ReflectiveOperationException {
Row configRow = decodeRow(payload);
Row configRow = decodeConfigObjectRow(payload.getSchema(), payload.getPayload());

Constructor<ConfigT> constructor = configurationClass.getDeclaredConstructor();
constructor.setAccessible(true);
Expand Down Expand Up @@ -459,13 +464,22 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
}
}));

@Nullable
TransformProvider transformProvider =
getRegisteredTransforms().get(request.getTransform().getSpec().getUrn());
if (transformProvider == null) {
throw new UnsupportedOperationException(
"Unknown urn: " + request.getTransform().getSpec().getUrn());
String urn = request.getTransform().getSpec().getUrn();

TransformProvider transformProvider = null;
if (getUrn(ExpansionMethods.Enum.JAVA_CLASS_LOOKUP).equals(urn)) {
AllowList allowList =
pipelineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist();
assert allowList != null;
transformProvider = new JavaClassLookupTransformProvider(allowList);
} else {
transformProvider = getRegisteredTransforms().get(urn);
if (transformProvider == null) {
throw new UnsupportedOperationException(
"Unknown urn: " + request.getTransform().getSpec().getUrn());
}
}

Map<String, PCollection<?>> outputs =
transformProvider.apply(
pipeline,
Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.expansion.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.AllowList;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

/** Options used to configure the {@link ExpansionService}. */
public interface ExpansionServiceOptions extends PipelineOptions {

@Description("Allow list for Java class based transform expansion")
@Default.InstanceFactory(JavaClassLookupAllowListFactory.class)
AllowList getJavaClassLookupAllowlist();

void setJavaClassLookupAllowlist(AllowList file);

@Description("Allow list file for Java class based transform expansion")
String getJavaClassLookupAllowlistFile();

void setJavaClassLookupAllowlistFile(String file);

/**
* Loads the allow list from {@link #getJavaClassLookupAllowlistFile}, defaulting to an empty
* {@link JavaClassLookupTransformProvider.AllowList}.
*/
class JavaClassLookupAllowListFactory implements DefaultValueFactory<AllowList> {

@Override
public AllowList create(PipelineOptions options) {
String allowListFile =
options.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlistFile();
if (allowListFile != null) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
File allowListFileObj = new File(allowListFile);
if (!allowListFileObj.exists()) {
throw new IllegalArgumentException(
"Allow list file " + allowListFile + " does not exist");
}
try {
return mapper.readValue(allowListFileObj, AllowList.class);
} catch (IOException e) {
throw new IllegalArgumentException(
"Could not load the provided allowlist file " + allowListFile, e);
}
}

// By default produces an empty allow-list.
return new AutoValue_JavaClassLookupTransformProvider_AllowList(
JavaClassLookupTransformProvider.ALLOW_LIST_VERSION, new ArrayList<>());
}
}
}

0 comments on commit e0d6332

Please sign in to comment.