Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Add MapFn and FlatMapFn
Browse files Browse the repository at this point in the history
These are DoFn implementations that represent a simple Map or FlatMap
operation. They take a Serializable Function and apply it to all
elements in the input PCollection.

The fact that SerializableFunction is a functional interface makes it
easy for Java 8 users to define simple mapping from input to output
PCollections.

Add ParDo#map(MapFn) and ParDo#flatMap(FlatMapFn), which provide
implementation-time hints to use MapFn and FlatMapFn when appropriate.

----Release Notes----
Add MapFn and FlatMapFn, which apply a SerializableFunction to all
elements in the input/output PCollections

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=93437423
  • Loading branch information
tgroh authored and davorbonaci committed May 12, 2015
1 parent 6c99876 commit 431b317
Show file tree
Hide file tree
Showing 6 changed files with 549 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.dataflow.sdk.transforms;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.dataflow.sdk.values.PCollection;

import java.lang.reflect.Method;

/**
* A {@link DoFn} that represents a FlatMap operation. {@code FlatMapFn} is applied to each
* element of the input {@link PCollection} and outputs each element in the result of the
* application. No behavior is provided in startBundle and finishBundle. A {@link
* SerializableFunction} can be provided to specify the behavior of {@link #apply}.
*
* @param <InputT> the type of input element
* @param <OutputT> the type of output element
*/
public abstract class FlatMapFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
private static final long serialVersionUID = 0L;
private final SerializableFunction<InputT, Iterable<OutputT>> fn;

/**
* Creates a new {@code FlatMapFn}. {@link #apply} must be overridden for this FlatMapDoFn to
* function properly.
*/
protected FlatMapFn() {
this.fn = null;

if (!applyOverriden()) {
throw new IllegalStateException("Didn't find an override for FlatMapFn#apply(InputT). "
+ "Apply must be overridden to use the no-arg FlatMapFn constructor.");
}
}

/**
* Creates a new {@code FlatMapFn} that applies the {@link SerializableFunction} to its inputs.
*/
protected FlatMapFn(SerializableFunction<InputT, Iterable<OutputT>> fn) {
this.fn = checkNotNull(fn, "null SerializableFunction provided to FlatMapFn constructor");

if (applyOverriden()) {
throw new IllegalStateException("Found an override of FlatMapFn#apply(InputT). "
+ "FlatMapFn#apply(InputT) cannot be overriden if a SerializableFunction is provided.");
}
}

private boolean applyOverriden() {
try {
Method m = getClass().getMethod("apply", Object.class);
if (m.getDeclaringClass().equals(FlatMapFn.class)) {
return false;
}
return true;
} catch (NoSuchMethodException e) {
throw new AssertionError(
"NoSuchMethodException encountered for method apply() in FlatMapFn "
+ "but FlatMapFn declares apply()",
e);
}
}

@Override
public final void startBundle(Context c) throws Exception {}

@Override
public final void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
for (OutputT output : apply(c.element())) {
c.output(output);
}
}

@Override
public final void finishBundle(Context c) throws Exception {}

/**
* Applies this FlatMapFn to an input element, returning the elements to add to the output
* {@link PCollection}.
*
* <p> If a {@link SerializableFunction} was not provided to this {@code FlatMapDoFn} when it was
* created, this method must be overriden, or it will throw a {@link NullPointerException} when it
* is invoked.
*/
public Iterable<OutputT> apply(InputT input) {
return fn.apply(input);
}
}
102 changes: 102 additions & 0 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapFn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.dataflow.sdk.transforms;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.dataflow.sdk.values.PCollection;

import java.lang.reflect.Method;

/**
* A {@link DoFn} that represents a simple Map operation. {@code MapDoFn} is applied to each element
* of the input {@link PCollection} and outputs the result. No behavior is provided in startBundle
* and finishBundle. A {@link SerializableFunction} can be provided to specify the behavior of
* {@link #apply}.
*
* @param <InputT> the type of input element
* @param <OutputT> the type of output element
*/
public abstract class MapFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
private static final long serialVersionUID = 0L;
private final SerializableFunction<InputT, OutputT> fn;

/**
* Creates a new {@code MapFn}. {@link #apply} must be overridden for this MapFn to function
* properly.
*/
protected MapFn() {
this.fn = null;

if (!applyOverriden()) {
throw new IllegalStateException("Didn't find an override for MapFn#apply(InputT). "
+ "Apply must be overridden to use the no-arg MapFn constructor.");
}
}

/**
* Create a new {@code MapFn} that applies the provided {@link SerializableFunction} to its
* inputs.
*/
protected MapFn(SerializableFunction<InputT, OutputT> fn) {
this.fn = checkNotNull(fn, "null SerializableFunction provided to MapFn constructor");

if (applyOverriden()) {
throw new IllegalStateException("Found an override of MapFn#apply(InputT). "
+ "MapFn#apply(InputT) cannot be overriden if a SerializableFunction is provided.");
}
}

private boolean applyOverriden() {
try {
Method m = getClass().getMethod("apply", Object.class);
if (m.getDeclaringClass().equals(MapFn.class)) {
return false;
}
return true;
} catch (NoSuchMethodException e) {
// Generic apply is declared in this class
throw new AssertionError(
"NoSuchMethodException encountered for method apply() in FlatMapFn "
+ "but FlatMapFn declares apply()",
e);
}
}

@Override
public final void startBundle(Context c) throws Exception {}

@Override
public final void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
c.output(apply(c.element()));
}

@Override
public final void finishBundle(Context c) throws Exception {}

/**
* Applies this MapFn to an input element, returning the element to add to the output
* {@link PCollection}.
*
* <p> If a {@link SerializableFunction} was not provided to this {@code MapFn} when it was
* created, this method must be overriden, or it will throw a {@link NullPointerException} when it
* is invoked.
*/
public OutputT apply(InputT input) {
return fn.apply(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,44 @@ public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT>
return new Unbound().of(fn);
}

/**
* Creates a {@code ParDo} {@link PTransform} that will invoke the
* given {@link MapFn} function.
*
* <p> This is a specialized {@code ParDo} that has no startBundle or finishBundle,
* and which exclusively transforms the input {@code PCollection<InputT>} to an
* output {@code PCollection<OutputT>} based on the apply method of the
* {@code MapFn<InputT, OutputT>}.
*
* <p> The resulting {@code PTransform}'s types have been bound, with the
* input being a {@code PCollection<InputT>} and the output a
* {@code PCollection<OutputT>}, inferred from the types of the argument
* {@code MapFn<InputT, OutputT>}. It is ready to be applied, or further
* properties can be set on it first.
*/
public static <InputT, OutputT> Bound<InputT, OutputT> map(MapFn<InputT, OutputT> fn) {
return of(fn);
}

/**
* Creates a {@code ParDo} {@link PTransform} that will invoke the
* given {@link FlatMapFn} function.
*
* <p> This is a specialized {@code ParDo} that has no startBundle or finishBundle,
* and which exclusively transforms the input {@code PCollection<InputT>} to an
* output {@code PCollection<OutputT>} based on the apply method of the
* {@code FlatMapFn<InputT, OutputT>}.
*
* <p> The resulting {@code PTransform}'s types have been bound, with the
* input being a {@code PCollection<InputT>} and the output a
* {@code PCollection<OutputT>}, inferred from the types of the argument
* {@code FlatMapFn<InputT, OutputT>}. It is ready to be applied, or further
* properties can be set on it first.
*/
public static <InputT, OutputT> Bound<InputT, OutputT> flatMap(FlatMapFn<InputT, OutputT> fn) {
return of(fn);
}

private static <InputT, OutputT> DoFn<InputT, OutputT>
adapt(DoFnWithContext<InputT, OutputT> fn) {
return DoFnReflector.of(fn.getClass()).toDoFn(fn);
Expand Down Expand Up @@ -662,6 +700,31 @@ public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
public <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
return of(adapt(fn));
}

/**
* Returns a new {@code ParDo} {@code PTransform} that's like this
* transform but that will invoke the given {@link MapFn}
* function, and that has its input and output types bound. Does
* not modify this transform. The resulting {@code PTransform} is
* sufficiently specified to be applied, but more properties can
* still be specified.
*/

public <InputT, OutputT> Bound<InputT, OutputT> map(MapFn<InputT, OutputT> fn) {
return of(fn);
}

/**
* Returns a new {@code ParDo} {@code PTransform} that's like this
* transform but that will invoke the given {@link FlatMapFn}
* function, and that has its input and output types bound. Does
* not modify this transform. The resulting {@code PTransform} is
* sufficiently specified to be applied, but more properties can
* still be specified.
*/
public <InputT, OutputT> Bound<InputT, OutputT> flatMap(FlatMapFn<InputT, OutputT> fn) {
return of(fn);
}
}

/**
Expand Down
Loading

0 comments on commit 431b317

Please sign in to comment.