Skip to content

Commit

Permalink
NonNull by default in runners/core
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Nov 8, 2017
1 parent 34a46b1 commit 6b4518f
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 24 deletions.
Expand Up @@ -30,11 +30,11 @@ public class NoOpStepContext implements StepContext, Serializable {

@Override
public StateInternals stateInternals() {
return null;
throw new UnsupportedOperationException("stateInternals is not supported");
}

@Override
public TimerInternals timerInternals() {
return null;
throw new UnsupportedOperationException("timerInternals is not supported");
}
}
Expand Up @@ -59,18 +59,18 @@
@Experimental(Kind.STATE)
public class InMemoryStateInternals<K> implements StateInternals {

public static <K> InMemoryStateInternals<K> forKey(K key) {
public static <K> InMemoryStateInternals<K> forKey(@Nullable K key) {
return new InMemoryStateInternals<>(key);
}

private final K key;
private final @Nullable K key;

protected InMemoryStateInternals(K key) {
protected InMemoryStateInternals(@Nullable K key) {
this.key = key;
}

@Override
public K getKey() {
public @Nullable K getKey() {
return key;
}

Expand Down Expand Up @@ -179,7 +179,7 @@ public WatermarkHoldState bindWatermark(
public static final class InMemoryValue<T>
implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
private boolean isCleared = true;
private T value = null;
private @Nullable T value = null;

@Override
public void clear() {
Expand Down
Expand Up @@ -161,7 +161,7 @@ public Timer timer(String timerId) {
// Currently we can't verify this because there are no hooks into tryClaim().
// See https://issues.apache.org/jira/browse/BEAM-2607
processContext.cancelScheduledCheckpoint();
KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
@Nullable KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
if (cont.shouldResume()) {
if (residual == null) {
// No checkpoint had been taken by the runner while the ProcessElement call ran, however
Expand Down Expand Up @@ -207,13 +207,13 @@ private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
// even if these events happen almost at the same time.
// This is either the result of the sole tracker.checkpoint() call, or null if
// the call completed before reaching the given number of outputs or duration.
private RestrictionT checkpoint;
private @Nullable RestrictionT checkpoint;
// Watermark captured at the moment before checkpoint was taken, describing a lower bound
// on the output from "checkpoint".
private Instant residualWatermark;
private @Nullable Instant residualWatermark;
// A handle on the scheduled action to take a checkpoint.
private Future<?> scheduledCheckpoint;
private Instant lastReportedWatermark;
private @Nullable Instant lastReportedWatermark;

public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
fn.super();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;

import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.util.common.Reiterator;

/**
Expand All @@ -29,7 +30,7 @@
* @param <T> the type of elements returned by this iterator
*/
public final class PeekingReiterator<T> implements Reiterator<T> {
private T nextElement;
private @Nullable T nextElement;
private boolean nextElementComputed;
private final Reiterator<T> iterator;

Expand Down
Expand Up @@ -546,7 +546,7 @@ private class OnTimerArgumentProvider
private final TimeDomain timeDomain;

/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
private StateNamespace namespace;
private @Nullable StateNamespace namespace;

/**
* The state namespace for this context.
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
Expand All @@ -39,7 +40,8 @@ public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
private final Collection<PCollectionView<?>> views;
private final ReadyCheckingSideInputReader sideInputReader;

private Set<BoundedWindow> notReadyWindows;
// Initialized in startBundle()
private @Nullable Set<BoundedWindow> notReadyWindows;

public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(
DoFnRunner<InputT, OutputT> underlying,
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Iterables;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
Expand Down Expand Up @@ -244,12 +245,13 @@ public static class ProcessFn<
private final Coder<RestrictionT> restrictionCoder;
private final WindowingStrategy<InputT, ?> inputWindowingStrategy;

private transient StateInternalsFactory<String> stateInternalsFactory;
private transient TimerInternalsFactory<String> timerInternalsFactory;
private transient SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>
private transient @Nullable StateInternalsFactory<String> stateInternalsFactory;
private transient @Nullable TimerInternalsFactory<String> timerInternalsFactory;
private transient @Nullable SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>
processElementInvoker;

private transient DoFnInvoker<InputT, OutputT> invoker;
private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;

public ProcessFn(
DoFn<InputT, OutputT> fn,
Expand Down Expand Up @@ -376,7 +378,7 @@ public void processElement(final ProcessContext c) {
return;
}
restrictionState.write(result.getResidualRestriction());
Instant futureOutputWatermark = result.getFutureOutputWatermark();
@Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark();
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
}
Expand Down
Expand Up @@ -37,12 +37,12 @@ public class Result {
@Nullable
private final RestrictionT residualRestriction;
private final DoFn.ProcessContinuation continuation;
private final Instant futureOutputWatermark;
private final @Nullable Instant futureOutputWatermark;

public Result(
@Nullable RestrictionT residualRestriction,
DoFn.ProcessContinuation continuation,
Instant futureOutputWatermark) {
@Nullable Instant futureOutputWatermark) {
this.continuation = checkNotNull(continuation);
if (continuation.shouldResume()) {
checkNotNull(residualRestriction);
Expand All @@ -65,7 +65,7 @@ public DoFn.ProcessContinuation getContinuation() {
return continuation;
}

public Instant getFutureOutputWatermark() {
public @Nullable Instant getFutureOutputWatermark() {
return futureOutputWatermark;
}
}
Expand Down
Expand Up @@ -19,4 +19,8 @@
/**
* Provides utilities for Beam runner authors.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -106,12 +106,12 @@ static class NoOpStepContext implements StepContext {

@Override
public StateInternals stateInternals() {
return null;
throw new UnsupportedOperationException("stateInternals not supported");
}

@Override
public TimerInternals timerInternals() {
return null;
throw new UnsupportedOperationException("timerInternals not supported");
}
}

Expand Down

0 comments on commit 6b4518f

Please sign in to comment.