Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3081] NonNull by default for sdk/transforms and friends #4037

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.datatorrent.api.DAG;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.apex.api.Launcher.AppHandle;
import org.apache.apex.api.Launcher.ShutdownMode;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -52,7 +53,8 @@ public State cancel() throws IOException {
}

@Override
public State waitUntilFinish(Duration duration) {
@Nullable
public State waitUntilFinish(@Nullable Duration duration) {
long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
: System.currentTimeMillis() + duration.getMillis();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;

import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
Expand Down Expand Up @@ -127,7 +128,7 @@ public interface BatchTransformTranslator<TransformT extends PTransform> {
* Returns a translator for the given node, if it is possible, otherwise null.
*/
private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
@Nullable PTransform<?, ?> transform = node.getTransform();

// Root of the graph is null
if (transform == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public State waitUntilFinish() {

@Override
public MetricResults metrics() {
return null;
throw new UnsupportedOperationException(
String.format("%s does not support querying metrics", getClass().getSimpleName()));
}

private State getGearpumpState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,11 @@ private Node(

/**
* Returns the transform associated with this transform node.
*
* @returns {@code null} if and only if this is the root node of the graph, which has no
* associated transform
*/
@Nullable
public PTransform<?, ?> getTransform() {
return transform;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.state;

import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;

/**
Expand All @@ -28,9 +29,10 @@ public class ReadableStates {
/**
* A {@link ReadableState} constructed from a constant value, hence immediately available.
*/
public static <T> ReadableState<T> immediate(final T value) {
public static <T> ReadableState<T> immediate(@Nullable final T value) {
return new ReadableState<T>() {
@Override
@Nullable
public T read() {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineRunner;
Expand Down Expand Up @@ -605,6 +606,7 @@ public MatcherCheckerFn(SerializableMatcher<T> matcher) {
}

@Override
@Nullable
public Void apply(T actual) {
assertThat(actual, matcher);
return null;
Expand Down Expand Up @@ -1269,6 +1271,7 @@ public AssertIsEqualTo(T expected) {
}

@Override
@Nullable
public Void apply(T actual) {
assertThat(actual, equalTo(expected));
return null;
Expand All @@ -1287,6 +1290,7 @@ public AssertNotEqualTo(T expected) {
}

@Override
@Nullable
public Void apply(T actual) {
assertThat(actual, not(equalTo(expected)));
return null;
Expand Down Expand Up @@ -1316,6 +1320,7 @@ public AssertContainsInAnyOrder(Iterable<T> expected) {
}

@Override
@Nullable
public Void apply(Iterable<T> actual) {
assertThat(actual, containsInAnyOrder(expected));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.avro.reflect.Nullable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
Expand Down Expand Up @@ -455,7 +455,7 @@ static long sampleSizeFromEstimationError(double estimationError) {
}

private static void populateDisplayData(
DisplayData.Builder builder, long sampleSize, Double maxEstimationError) {
DisplayData.Builder builder, long sampleSize, @Nullable Double maxEstimationError) {
builder
.add(DisplayData.item("sampleSize", sampleSize)
.withLabel("Sample Size"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
Expand Down Expand Up @@ -440,6 +441,7 @@ public abstract static class BinaryCombineFn<V> extends
/**
* Returns the value that should be used for the combine of the empty set.
*/
@Nullable
public V identity() {
return null;
}
Expand Down Expand Up @@ -506,7 +508,7 @@ public Coder<V> getDefaultOutputCoder(CoderRegistry registry, Coder<V> inputCode
* <p>Used only as a private accumulator class.
*/
public static class Holder<V> {
private V value;
@Nullable private V value;
private boolean present;
private Holder() { }
private Holder(V value) {
Expand Down Expand Up @@ -1945,10 +1947,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
* the hot and cold key paths.
*/
private static class InputOrAccum<InputT, AccumT> {
public final InputT input;
public final AccumT accum;
@Nullable public final InputT input;
@Nullable public final AccumT accum;

private InputOrAccum(InputT input, AccumT aggr) {
private InputOrAccum(@Nullable InputT input, @Nullable AccumT aggr) {
this.input = input;
this.accum = aggr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
Expand Down Expand Up @@ -176,6 +177,7 @@ private enum NullValue {
* <p>It is an error to request a non-exist tuple tag from the {@link CoCombineResult}.
*/
@SuppressWarnings("unchecked")
@Nullable
public <V> V get(TupleTag<V> tag) {
checkArgument(
valuesMap.keySet().contains(tag), "TupleTag " + tag + " is not in the CoCombineResult");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public static <T> Values<T> of(Iterable<T> elems) {
* Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
*/
@SafeVarargs
public static <T> Values<T> of(T elem, T... elems) {
public static <T> Values<T> of(@Nullable T elem, @Nullable T... elems) {
// This can't be an ImmutableList, as it may accept nulls
List<T> input = new ArrayList<>(elems.length + 1);
input.add(elem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;

import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -115,6 +116,7 @@ public KV<T, Void> apply(T element) {
Combine.<T, Void>perKey(
new SerializableFunction<Iterable<Void>, Void>() {
@Override
@Nullable
public Void apply(Iterable<Void> iter) {
return null; // ignore input
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -514,17 +516,17 @@ public <T> List<T> takeOutputElements(TupleTag<T> tag) {

private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) {
@SuppressWarnings({"unchecked", "rawtypes"})
List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag);
List<ValueInSingleWindow<T>> elems = (List) getOutputs().get(tag);
return ImmutableList.copyOf(
MoreObjects.firstNonNull(elems, Collections.<ValueInSingleWindow<T>>emptyList()));
}

@SuppressWarnings({"unchecked", "rawtypes"})
public <T> List<ValueInSingleWindow<T>> getMutableOutput(TupleTag<T> tag) {
List<ValueInSingleWindow<T>> outputList = (List) outputs.get(tag);
List<ValueInSingleWindow<T>> outputList = (List) getOutputs().get(tag);
if (outputList == null) {
outputList = new ArrayList<>();
outputs.put(tag, (List) outputList);
getOutputs().put(tag, (List) outputList);
}
return outputList;
}
Expand Down Expand Up @@ -688,11 +690,12 @@ private enum State {
private TupleTag<OutputT> mainOutputTag = new TupleTag<>();

/** The original DoFn under test, if started. */
private DoFn<InputT, OutputT> fn;
private DoFnInvoker<InputT, OutputT> fnInvoker;
@Nullable private DoFn<InputT, OutputT> fn;

/** The outputs from the {@link DoFn} under test. */
private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
@Nullable private DoFnInvoker<InputT, OutputT> fnInvoker;

/** The outputs from the {@link DoFn} under test. Access via {@link #getOutputs()}. */
@CheckForNull private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;

/** The state of processing of the {@link DoFn} under test. */
private State state = State.UNINITIALIZED;
Expand All @@ -704,12 +707,14 @@ private DoFnTester(DoFn<InputT, OutputT> origFn) {
param.match(
new DoFnSignature.Parameter.Cases.WithDefault<Void>() {
@Override
@Nullable
public Void dispatch(DoFnSignature.Parameter.ProcessContextParameter p) {
// ProcessContext parameter is obviously supported.
return null;
}

@Override
@Nullable
public Void dispatch(DoFnSignature.Parameter.WindowParameter p) {
// We also support the BoundedWindow parameter.
return null;
Expand Down Expand Up @@ -738,6 +743,12 @@ private void initializeState() throws Exception {
}
fnInvoker = DoFnInvokers.invokerFor(fn);
fnInvoker.invokeSetup();
outputs = new HashMap<>();
}

private Map getOutputs() {
if (outputs == null) {
outputs = new HashMap<>();
}
return outputs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
Expand All @@ -34,15 +35,15 @@
*/
public class FlatMapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
private final transient TypeDescriptor<InputT> inputType;
private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient TypeDescriptor<InputT> inputType;
@Nullable private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient Object originalFnForDisplayData;
@Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;

private FlatMapElements(
@Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
@Nullable Object originalFnForDisplayData,
TypeDescriptor<InputT> inputType,
@Nullable TypeDescriptor<InputT> inputType,
TypeDescriptor<OutputT> outputType) {
this.fn = fn;
this.originalFnForDisplayData = originalFnForDisplayData;
Expand Down Expand Up @@ -146,6 +147,13 @@ public TypeDescriptor<InputT> getInputTypeDescriptor() {

@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
checkState(
outputType != null,
"%s output type descriptor was null; "
+ "this probably means that getOutputTypeDescriptor() was called after "
+ "serialization/deserialization, but it is only available prior to "
+ "serialization, for constructing a pipeline and inferring coders",
FlatMapElements.class.getSimpleName());
return outputType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
Expand All @@ -34,15 +35,15 @@
*/
public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
private final transient TypeDescriptor<InputT> inputType;
private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient TypeDescriptor<InputT> inputType;
@Nullable private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient Object originalFnForDisplayData;
@Nullable private final Contextful<Fn<InputT, OutputT>> fn;

private MapElements(
@Nullable Contextful<Fn<InputT, OutputT>> fn,
@Nullable Object originalFnForDisplayData,
TypeDescriptor<InputT> inputType,
@Nullable TypeDescriptor<InputT> inputType,
TypeDescriptor<OutputT> outputType) {
this.fn = fn;
this.originalFnForDisplayData = originalFnForDisplayData;
Expand Down Expand Up @@ -140,6 +141,13 @@ public TypeDescriptor<InputT> getInputTypeDescriptor() {

@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
checkState(
outputType != null,
"%s output type descriptor was null; "
+ "this probably means that getOutputTypeDescriptor() was called after "
+ "serialization/deserialization, but it is only available prior to "
+ "serialization, for constructing a pipeline and inferring coders",
MapElements.class.getSimpleName());
return outputType;
}
}).withSideInputs(fn.getRequirements().getSideInputs()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import java.util.Comparator;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;

Expand Down Expand Up @@ -214,11 +215,11 @@ Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {

private static class MaxFn<T> extends BinaryCombineFn<T> {

private final T identity;
@Nullable private final T identity;
private final Comparator<? super T> comparator;

private <ComparatorT extends Comparator<? super T> & Serializable> MaxFn(
T identity, ComparatorT comparator) {
@Nullable T identity, ComparatorT comparator) {
this.identity = identity;
this.comparator = comparator;
}
Expand Down