Skip to content

Commit

Permalink
This closes #4058: [BEAM-3081] NonNull by default in the rest of the …
Browse files Browse the repository at this point in the history
…core SDK and shared runner code

  NonNull by default in runners/core
  NonNull by default in runners/core/metrics
  NonNull by default in runners/core/fn
  NonNull by default in runners/core/triggers
  NonNull by default in runners/core/construction/metrics
  NonNull by default in runners/core/construction
  NonNull by default in sdk/io/range
  NonNull by default in sdk/io/fs
  NonNull by default in sdk/values
  NonNull by default in sdk/io
  • Loading branch information
kennknowles committed Nov 10, 2017
2 parents b203af7 + 87b07d8 commit e40e882
Show file tree
Hide file tree
Showing 59 changed files with 259 additions and 140 deletions.
Expand Up @@ -30,11 +30,11 @@ public class NoOpStepContext implements StepContext, Serializable {

@Override
public StateInternals stateInternals() {
return null;
throw new UnsupportedOperationException("stateInternals is not supported");
}

@Override
public TimerInternals timerInternals() {
return null;
throw new UnsupportedOperationException("timerInternals is not supported");
}
}
Expand Up @@ -18,6 +18,8 @@

package org.apache.beam.runners.apex.translation.utils;

import static org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -132,7 +134,7 @@ public Instant getWatermark() {

@Override
public CheckpointMark getCheckpointMark() {
return null;
return NOOP_CHECKPOINT_MARK;
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions runners/core-construction-java/pom.xml
Expand Up @@ -84,6 +84,11 @@
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Expand Up @@ -38,8 +38,8 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
private final TupleTag<Iterable<WindowedValue<?>>> tag;
private final ViewFn<Iterable<WindowedValue<?>>, T> viewFn;
private final WindowMappingFn<?> windowMappingFn;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Coder<Iterable<WindowedValue<?>>> coder;
private final @Nullable WindowingStrategy<?, ?> windowingStrategy;
private final @Nullable Coder<Iterable<WindowedValue<?>>> coder;
private final transient PCollection<?> pCollection;

/**
Expand Down
Expand Up @@ -323,7 +323,9 @@ public String apply(T input) {
private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
extends DoFn<InputT, KV<InputT, RestrictionT>> {
private DoFn<InputT, OutputT> fn;
private transient DoFnInvoker<InputT, OutputT> invoker;

// Initialized in setup()
private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;

PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
this.fn = fn;
Expand All @@ -347,7 +349,9 @@ public void processElement(ProcessContext context) {
private static class SplitRestrictionFn<InputT, RestrictionT>
extends DoFn<KV<InputT, RestrictionT>, KV<InputT, RestrictionT>> {
private final DoFn<InputT, ?> splittableFn;
private transient DoFnInvoker<InputT, ?> invoker;

// Initialized in setup()
private transient @Nullable DoFnInvoker<InputT, ?> invoker;

SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
this.splittableFn = splittableFn;
Expand Down
Expand Up @@ -138,10 +138,6 @@ public List<BoundedToUnboundedSourceAdapter<T>> split(
}
List<? extends BoundedSource<T>> splits =
boundedSource.split(desiredBundleSize, options);
if (splits == null) {
LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
return ImmutableList.of(this);
}
return Lists.transform(
splits,
new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
Expand Down Expand Up @@ -258,7 +254,8 @@ public void verifyDeterministic() throws NonDeterministicException {
*/
@VisibleForTesting
class Reader extends UnboundedReader<T> {
private ResidualElements residualElements;
// Initialized in init()
private @Nullable ResidualElements residualElements;
private @Nullable ResidualSource residualSource;
private final PipelineOptions options;
private boolean done;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
Expand Down Expand Up @@ -101,7 +102,7 @@ public static WindowIntoPayload getWindowIntoPayload(AppliedPTransform<?, ?, ?>
}
}

public static WindowFn<?, ?> getWindowFn(AppliedPTransform<?, ?, ?> application) {
public static @Nullable WindowFn<?, ?> getWindowFn(AppliedPTransform<?, ?, ?> application) {
return WindowingStrategyTranslation.windowFnFromProto(
getWindowIntoPayload(application).getWindowFn());
}
Expand Down
Expand Up @@ -19,4 +19,8 @@
/**
* Utilities for runners to implement metrics.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.construction.metrics;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -19,4 +19,8 @@
/**
* Provides utilities for Beam runner authors, prior to execution.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.construction;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
5 changes: 5 additions & 0 deletions runners/core-java/pom.xml
Expand Up @@ -79,6 +79,11 @@

<!-- build dependencies -->

<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
Expand Down
Expand Up @@ -59,18 +59,18 @@
@Experimental(Kind.STATE)
public class InMemoryStateInternals<K> implements StateInternals {

public static <K> InMemoryStateInternals<K> forKey(K key) {
public static <K> InMemoryStateInternals<K> forKey(@Nullable K key) {
return new InMemoryStateInternals<>(key);
}

private final K key;
private final @Nullable K key;

protected InMemoryStateInternals(K key) {
protected InMemoryStateInternals(@Nullable K key) {
this.key = key;
}

@Override
public K getKey() {
public @Nullable K getKey() {
return key;
}

Expand Down Expand Up @@ -179,7 +179,7 @@ public WatermarkHoldState bindWatermark(
public static final class InMemoryValue<T>
implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
private boolean isCleared = true;
private T value = null;
private @Nullable T value = null;

@Override
public void clear() {
Expand Down
Expand Up @@ -161,7 +161,7 @@ public Timer timer(String timerId) {
// Currently we can't verify this because there are no hooks into tryClaim().
// See https://issues.apache.org/jira/browse/BEAM-2607
processContext.cancelScheduledCheckpoint();
KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
@Nullable KV<RestrictionT, Instant> residual = processContext.getTakenCheckpoint();
if (cont.shouldResume()) {
if (residual == null) {
// No checkpoint had been taken by the runner while the ProcessElement call ran, however
Expand Down Expand Up @@ -207,13 +207,13 @@ private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
// even if these events happen almost at the same time.
// This is either the result of the sole tracker.checkpoint() call, or null if
// the call completed before reaching the given number of outputs or duration.
private RestrictionT checkpoint;
private @Nullable RestrictionT checkpoint;
// Watermark captured at the moment before checkpoint was taken, describing a lower bound
// on the output from "checkpoint".
private Instant residualWatermark;
private @Nullable Instant residualWatermark;
// A handle on the scheduled action to take a checkpoint.
private Future<?> scheduledCheckpoint;
private Instant lastReportedWatermark;
private @Nullable Instant lastReportedWatermark;

public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
fn.super();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;

import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.util.common.Reiterator;

/**
Expand All @@ -29,7 +30,7 @@
* @param <T> the type of elements returned by this iterator
*/
public final class PeekingReiterator<T> implements Reiterator<T> {
private T nextElement;
private @Nullable T nextElement;
private boolean nextElementComputed;
private final Reiterator<T> iterator;

Expand Down
Expand Up @@ -546,7 +546,7 @@ private class OnTimerArgumentProvider
private final TimeDomain timeDomain;

/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
private StateNamespace namespace;
private @Nullable StateNamespace namespace;

/**
* The state namespace for this context.
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
Expand All @@ -39,7 +40,8 @@ public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
private final Collection<PCollectionView<?>> views;
private final ReadyCheckingSideInputReader sideInputReader;

private Set<BoundedWindow> notReadyWindows;
// Initialized in startBundle()
private @Nullable Set<BoundedWindow> notReadyWindows;

public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(
DoFnRunner<InputT, OutputT> underlying,
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Iterables;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
Expand Down Expand Up @@ -244,12 +245,13 @@ public static class ProcessFn<
private final Coder<RestrictionT> restrictionCoder;
private final WindowingStrategy<InputT, ?> inputWindowingStrategy;

private transient StateInternalsFactory<String> stateInternalsFactory;
private transient TimerInternalsFactory<String> timerInternalsFactory;
private transient SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>
private transient @Nullable StateInternalsFactory<String> stateInternalsFactory;
private transient @Nullable TimerInternalsFactory<String> timerInternalsFactory;
private transient @Nullable SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>
processElementInvoker;

private transient DoFnInvoker<InputT, OutputT> invoker;
private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;

public ProcessFn(
DoFn<InputT, OutputT> fn,
Expand Down Expand Up @@ -376,7 +378,7 @@ public void processElement(final ProcessContext c) {
return;
}
restrictionState.write(result.getResidualRestriction());
Instant futureOutputWatermark = result.getFutureOutputWatermark();
@Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark();
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
}
Expand Down
Expand Up @@ -37,12 +37,12 @@ public class Result {
@Nullable
private final RestrictionT residualRestriction;
private final DoFn.ProcessContinuation continuation;
private final Instant futureOutputWatermark;
private final @Nullable Instant futureOutputWatermark;

public Result(
@Nullable RestrictionT residualRestriction,
DoFn.ProcessContinuation continuation,
Instant futureOutputWatermark) {
@Nullable Instant futureOutputWatermark) {
this.continuation = checkNotNull(continuation);
if (continuation.shouldResume()) {
checkNotNull(residualRestriction);
Expand All @@ -65,7 +65,7 @@ public DoFn.ProcessContinuation getContinuation() {
return continuation;
}

public Instant getFutureOutputWatermark() {
public @Nullable Instant getFutureOutputWatermark() {
return futureOutputWatermark;
}
}
Expand Down
Expand Up @@ -19,4 +19,8 @@
/**
* Provides utilities for a Beam runner to interact with a client using the Fn API.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.fn;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
Expand Down Expand Up @@ -425,14 +426,14 @@ private static class AccumulatedMetricResult<T> implements MetricResult<T> {
private final MetricName name;
private final String step;
private final T attempted;
private final T committed;
private final @Nullable T committed;
private final boolean isCommittedSupported;

private AccumulatedMetricResult(
MetricName name,
String step,
T attempted,
T committed,
@Nullable T committed,
boolean isCommittedSupported) {
this.name = name;
this.step = step;
Expand Down
Expand Up @@ -19,4 +19,8 @@
/**
* Utilities for runners to implement metrics.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.metrics;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -19,4 +19,8 @@
/**
* Provides utilities for Beam runner authors.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -143,7 +143,7 @@ public boolean apply(ExecutableTriggerStateMachine trigger) {
}

@Override
public ExecutableTriggerStateMachine firstUnfinishedSubTrigger() {
public @Nullable ExecutableTriggerStateMachine firstUnfinishedSubTrigger() {
for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
if (!finishedSet.isFinished(subTrigger)) {
return subTrigger;
Expand Down
Expand Up @@ -20,4 +20,8 @@
* State machine implementations for triggers, called "triggers" because
* they react to events.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.runners.core.triggers;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Up @@ -28,12 +28,12 @@ public class FlinkNoOpStepContext implements StepContext {

@Override
public StateInternals stateInternals() {
return null;
throw new UnsupportedOperationException("stateInternals is not supported");
}

@Override
public TimerInternals timerInternals() {
return null;
throw new UnsupportedOperationException("timerInternals is not supported");
}
}

0 comments on commit e40e882

Please sign in to comment.