Skip to content

Commit

Permalink
Merge pull request #7917 from aaltay/origin/release-2.11.0
Browse files Browse the repository at this point in the history
[BEAM-6720] Cherry Pick PR 7911 - Add binary compatibility adapters for ProcessFunction
  • Loading branch information
aaltay committed Feb 21, 2019
2 parents 4085677 + 56c4310 commit c34ed7e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 0 deletions.
Expand Up @@ -112,6 +112,12 @@ public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
return new Contextful<>((element, c) -> fn.apply(element), Requirements.empty());
}

/** Binary compatibility adapter for {@link #fn(ProcessFunction)}. */
public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
final SerializableFunction<InputT, OutputT> fn) {
return fn((ProcessFunction<InputT, OutputT>) fn);
}

/** Same with {@link #of} but with better type inference behavior for the case of {@link Fn}. */
public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
final Fn<InputT, OutputT> fn, Requirements requirements) {
Expand Down
Expand Up @@ -51,6 +51,12 @@ public static <T, PredicateT extends ProcessFunction<T, Boolean>> Filter<T> by(
return new Filter<>(predicate);
}

/** Binary compatibility adapter for {@link #by(ProcessFunction)}. */
public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T> by(
PredicateT predicate) {
return by((ProcessFunction<T, Boolean>) predicate);
}

/**
* Returns a {@code PTransform} that takes an input {@link PCollection} and returns a {@link
* PCollection} with elements that are less than a given value, based on the elements' natural
Expand Down
Expand Up @@ -85,6 +85,12 @@ public static <InputT, OutputT> FlatMapElements<InputT, OutputT> via(
return new FlatMapElements<>(wrapped, fn, inputType, outputType);
}

/** Binary compatibility adapter for {@link #via(ProcessFunction)}. */
public static <InputT, OutputT> FlatMapElements<InputT, OutputT> via(
SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
return via((InferableFunction<? super InputT, ? extends Iterable<OutputT>>) fn);
}

/**
* Returns a new {@link FlatMapElements} transform with the given type descriptor for the output
* type, but the mapping function yet to be specified using {@link #via(ProcessFunction)}.
Expand Down Expand Up @@ -113,6 +119,12 @@ public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
(Contextful) Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType);
}

/** Binary compatibility adapter for {@link #via(ProcessFunction)}. */
public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
return via((ProcessFunction<NewInputT, ? extends Iterable<OutputT>>) fn);
}

/** Like {@link #via(ProcessFunction)}, but allows access to additional context. */
@Experimental(Experimental.Kind.CONTEXTFUL)
public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
Expand Down
Expand Up @@ -76,6 +76,12 @@ public static <InputT, OutputT> MapElements<InputT, OutputT> via(
Contextful.fn(fn), fn, fn.getInputTypeDescriptor(), fn.getOutputTypeDescriptor());
}

/** Binary compatibility adapter for {@link #via(InferableFunction)}. */
public static <InputT, OutputT> MapElements<InputT, OutputT> via(
final SimpleFunction<InputT, OutputT> fn) {
return via((InferableFunction<InputT, OutputT>) fn);
}

/**
* Returns a new {@link MapElements} transform with the given type descriptor for the output type,
* but the mapping function yet to be specified using {@link #via(ProcessFunction)}.
Expand All @@ -101,6 +107,12 @@ public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT
return new MapElements<>(Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType);
}

/** Binary compatibility adapter for {@link #via(ProcessFunction)}. */
public <NewInputT> MapElements<NewInputT, OutputT> via(
SerializableFunction<NewInputT, OutputT> fn) {
return via((ProcessFunction<NewInputT, OutputT>) fn);
}

/** Like {@link #via(ProcessFunction)}, but supports access to context, such as side inputs. */
@Experimental(Kind.CONTEXTFUL)
public <NewInputT> MapElements<NewInputT, OutputT> via(Contextful<Fn<NewInputT, OutputT>> fn) {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;

/**
* A utility class for creating {@link TypeDescriptor} objects for different types, such as Java
Expand Down Expand Up @@ -394,6 +395,12 @@ public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, InputT>() {});
}

/** Binary compatibility adapter for {@link #inputOf(ProcessFunction)}. */
public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
SerializableFunction<InputT, OutputT> fn) {
return inputOf((ProcessFunction<InputT, OutputT>) fn);
}

/**
* Returns a type descriptor for the output of the given {@link ProcessFunction}, subject to Java
* type erasure: may contain unresolved type variables if the type was erased.
Expand All @@ -406,6 +413,12 @@ public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, OutputT>() {});
}

/** Binary compatibility adapter for {@link #outputOf(ProcessFunction)}. */
public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
SerializableFunction<InputT, OutputT> fn) {
return outputOf((ProcessFunction<InputT, OutputT>) fn);
}

/** Like {@link #inputOf(ProcessFunction)} but for {@link Contextful.Fn}. */
public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
Contextful.Fn<InputT, OutputT> fn) {
Expand Down

0 comments on commit c34ed7e

Please sign in to comment.