Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3081] NonNull by default in the rest of the core SDK and shared runner code #4058

Merged
merged 10 commits into from Nov 10, 2017
Expand Up @@ -30,11 +30,11 @@ public class NoOpStepContext implements StepContext, Serializable {

@Override
public StateInternals stateInternals() {
return null;
throw new UnsupportedOperationException("stateInternals is not supported");
}

@Override
public TimerInternals timerInternals() {
return null;
throw new UnsupportedOperationException("timerInternals is not supported");
}
}
Expand Up @@ -18,6 +18,8 @@

package org.apache.beam.runners.apex.translation.utils;

import static org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -132,7 +134,7 @@ public Instant getWatermark() {

@Override
public CheckpointMark getCheckpointMark() {
return null;
return NOOP_CHECKPOINT_MARK;
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions runners/core-construction-java/pom.xml
Expand Up @@ -84,6 +84,11 @@
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Expand Up @@ -38,8 +38,8 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
private final TupleTag<Iterable<WindowedValue<?>>> tag;
private final ViewFn<Iterable<WindowedValue<?>>, T> viewFn;
private final WindowMappingFn<?> windowMappingFn;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Coder<Iterable<WindowedValue<?>>> coder;
private final @Nullable WindowingStrategy<?, ?> windowingStrategy;
private final @Nullable Coder<Iterable<WindowedValue<?>>> coder;
private final transient PCollection<?> pCollection;

/**
Expand Down
Expand Up @@ -323,7 +323,9 @@ public String apply(T input) {
private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
extends DoFn<InputT, KV<InputT, RestrictionT>> {
private DoFn<InputT, OutputT> fn;
private transient DoFnInvoker<InputT, OutputT> invoker;

// Initialized in setup()
private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;

PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
this.fn = fn;
Expand All @@ -347,7 +349,9 @@ public void processElement(ProcessContext context) {
private static class SplitRestrictionFn<InputT, RestrictionT>
extends DoFn<KV<InputT, RestrictionT>, KV<InputT, RestrictionT>> {
private final DoFn<InputT, ?> splittableFn;
private transient DoFnInvoker<InputT, ?> invoker;

// Initialized in setup()
private transient @Nullable DoFnInvoker<InputT, ?> invoker;

SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
this.splittableFn = splittableFn;
Expand Down
Expand Up @@ -138,10 +138,6 @@ public List<BoundedToUnboundedSourceAdapter<T>> split(
}
List<? extends BoundedSource<T>> splits =
boundedSource.split(desiredBundleSize, options);
if (splits == null) {
LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
return ImmutableList.of(this);
}
return Lists.transform(
splits,
new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
Expand Down Expand Up @@ -258,7 +254,8 @@ public void verifyDeterministic() throws NonDeterministicException {
*/
@VisibleForTesting
class Reader extends UnboundedReader<T> {
private ResidualElements residualElements;
// Initialized in init()
private @Nullable ResidualElements residualElements;
private @Nullable ResidualSource residualSource;
private final PipelineOptions options;
private boolean done;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
Expand Down Expand Up @@ -101,7 +102,7 @@ public static WindowIntoPayload getWindowIntoPayload(AppliedPTransform<?, ?, ?>
}
}

public static WindowFn<?, ?> getWindowFn(AppliedPTransform<?, ?, ?> application) {
public static @Nullable WindowFn<?, ?> getWindowFn(AppliedPTransform<?, ?, ?> application) {
return WindowingStrategyTranslation.windowFnFromProto(
getWindowIntoPayload(application).getWindowFn());
}
Expand Down
Expand Up @@ -19,4 +19,8 @@
/**
* Utilities for runners to implement metrics.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.construction.metrics;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -19,4 +19,8 @@
/**
* Provides utilities for Beam runner authors, prior to execution.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.construction;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
5 changes: 5 additions & 0 deletions runners/core-java/pom.xml
Expand Up @@ -79,6 +79,11 @@

<!-- build dependencies -->

<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
Expand Down
Expand Up @@ -59,18 +59,18 @@
@Experimental(Kind.STATE)
public class InMemoryStateInternals<K> implements StateInternals {

public static <K> InMemoryStateInternals<K> forKey(K key) {
public static <K> InMemoryStateInternals<K> forKey(@Nullable K key) {
return new InMemoryStateInternals<>(key);
}

private final K key;
private final @Nullable K key;

protected InMemoryStateInternals(K key) {
protected InMemoryStateInternals(@Nullable K key) {
this.key = key;
}

@Override
public K getKey() {
public @Nullable K getKey() {
return key;
}

Expand Down Expand Up @@ -179,7 +179,7 @@ public WatermarkHoldState bindWatermark(
public static final class InMemoryValue<T>
implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
private boolean isCleared = true;
private T value = null;
private @Nullable T value = null;

@Override
public void clear() {
Expand Down
Expand Up @@ -161,7 +161,7 @@ public Timer timer(String timerId) {
// Currently we can't verify this because there are no hooks into tryClaim().
// See https://issues.apache.org/jira/browse/BEAM-2607
processContext.cancelScheduledCheckpoint();
KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
@Nullable KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
if (cont.shouldResume()) {
if (residual == null) {
// No checkpoint had been taken by the runner while the ProcessElement call ran, however
Expand Down Expand Up @@ -207,13 +207,13 @@ private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
// even if these events happen almost at the same time.
// This is either the result of the sole tracker.checkpoint() call, or null if
// the call completed before reaching the given number of outputs or duration.
private RestrictionT checkpoint;
private @Nullable RestrictionT checkpoint;
// Watermark captured at the moment before checkpoint was taken, describing a lower bound
// on the output from "checkpoint".
private Instant residualWatermark;
private @Nullable Instant residualWatermark;
// A handle on the scheduled action to take a checkpoint.
private Future<?> scheduledCheckpoint;
private Instant lastReportedWatermark;
private @Nullable Instant lastReportedWatermark;

public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
fn.super();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;

import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.util.common.Reiterator;

/**
Expand All @@ -29,7 +30,7 @@
* @param <T> the type of elements returned by this iterator
*/
public final class PeekingReiterator<T> implements Reiterator<T> {
private T nextElement;
private @Nullable T nextElement;
private boolean nextElementComputed;
private final Reiterator<T> iterator;

Expand Down
Expand Up @@ -546,7 +546,7 @@ private class OnTimerArgumentProvider
private final TimeDomain timeDomain;

/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
private StateNamespace namespace;
private @Nullable StateNamespace namespace;

/**
* The state namespace for this context.
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
Expand All @@ -39,7 +40,8 @@ public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
private final Collection<PCollectionView<?>> views;
private final ReadyCheckingSideInputReader sideInputReader;

private Set<BoundedWindow> notReadyWindows;
// Initialized in startBundle()
private @Nullable Set<BoundedWindow> notReadyWindows;

public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(
DoFnRunner<InputT, OutputT> underlying,
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Iterables;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
Expand Down Expand Up @@ -244,12 +245,13 @@ public static class ProcessFn<
private final Coder<RestrictionT> restrictionCoder;
private final WindowingStrategy<InputT, ?> inputWindowingStrategy;

private transient StateInternalsFactory<String> stateInternalsFactory;
private transient TimerInternalsFactory<String> timerInternalsFactory;
private transient SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>
private transient @Nullable StateInternalsFactory<String> stateInternalsFactory;
private transient @Nullable TimerInternalsFactory<String> timerInternalsFactory;
private transient @Nullable SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>
processElementInvoker;

private transient DoFnInvoker<InputT, OutputT> invoker;
private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;

public ProcessFn(
DoFn<InputT, OutputT> fn,
Expand Down Expand Up @@ -376,7 +378,7 @@ public void processElement(final ProcessContext c) {
return;
}
restrictionState.write(result.getResidualRestriction());
Instant futureOutputWatermark = result.getFutureOutputWatermark();
@Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark();
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
}
Expand Down
Expand Up @@ -37,12 +37,12 @@ public class Result {
@Nullable
private final RestrictionT residualRestriction;
private final DoFn.ProcessContinuation continuation;
private final Instant futureOutputWatermark;
private final @Nullable Instant futureOutputWatermark;

public Result(
@Nullable RestrictionT residualRestriction,
DoFn.ProcessContinuation continuation,
Instant futureOutputWatermark) {
@Nullable Instant futureOutputWatermark) {
this.continuation = checkNotNull(continuation);
if (continuation.shouldResume()) {
checkNotNull(residualRestriction);
Expand All @@ -65,7 +65,7 @@ public DoFn.ProcessContinuation getContinuation() {
return continuation;
}

public Instant getFutureOutputWatermark() {
public @Nullable Instant getFutureOutputWatermark() {
return futureOutputWatermark;
}
}
Expand Down
Expand Up @@ -19,4 +19,8 @@
/**
* Provides utilities for a Beam runner to interact with a client using the Fn API.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.fn;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
Expand Down Expand Up @@ -425,14 +426,14 @@ private static class AccumulatedMetricResult<T> implements MetricResult<T> {
private final MetricName name;
private final String step;
private final T attempted;
private final T committed;
private final @Nullable T committed;
private final boolean isCommittedSupported;

private AccumulatedMetricResult(
MetricName name,
String step,
T attempted,
T committed,
@Nullable T committed,
boolean isCommittedSupported) {
this.name = name;
this.step = step;
Expand Down
Expand Up @@ -19,4 +19,8 @@
/**
* Utilities for runners to implement metrics.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.metrics;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -19,4 +19,8 @@
/**
* Provides utilities for Beam runner authors.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -143,7 +143,7 @@ public boolean apply(ExecutableTriggerStateMachine trigger) {
}

@Override
public ExecutableTriggerStateMachine firstUnfinishedSubTrigger() {
public @Nullable ExecutableTriggerStateMachine firstUnfinishedSubTrigger() {
for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
if (!finishedSet.isFinished(subTrigger)) {
return subTrigger;
Expand Down
Expand Up @@ -20,4 +20,8 @@
* State machine implementations for triggers, called "triggers" because
* they react to events.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.triggers;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -28,12 +28,12 @@ public class FlinkNoOpStepContext implements StepContext {

@Override
public StateInternals stateInternals() {
return null;
throw new UnsupportedOperationException("stateInternals is not supported");
}

@Override
public TimerInternals timerInternals() {
return null;
throw new UnsupportedOperationException("timerInternals is not supported");
}
}