Skip to content

Commit

Permalink
Merge 1c42641 into 6807480
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Dec 8, 2016
2 parents 6807480 + 1c42641 commit 8f56e3d
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util.state;
package org.apache.beam.runners.direct;

import static com.google.common.base.Preconditions.checkState;

Expand All @@ -32,8 +32,24 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryBag;
import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryCombiningValue;
import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryState;
import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryStateBinder;
import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryValue;
import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryWatermarkHold;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateContext;
import org.apache.beam.sdk.util.state.StateContexts;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateTable;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTag.StateBinder;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -262,11 +278,11 @@ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends WatermarkHoldState<W>> existingState =
(InMemoryStateInternals.InMemoryState<? extends WatermarkHoldState<W>>)
(InMemoryState<? extends WatermarkHoldState<W>>)
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryStateInternals.InMemoryWatermarkHold<>(
return new InMemoryWatermarkHold<>(
outputTimeFn);
}
}
Expand All @@ -277,11 +293,11 @@ public <T> ValueState<T> bindValue(
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends ValueState<T>> existingState =
(InMemoryStateInternals.InMemoryState<? extends ValueState<T>>)
(InMemoryState<? extends ValueState<T>>)
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryStateInternals.InMemoryValue<>();
return new InMemoryValue<>();
}
}

Expand All @@ -294,12 +310,11 @@ public <T> ValueState<T> bindValue(
@SuppressWarnings("unchecked")
InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
existingState = (
InMemoryStateInternals
.InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
OutputT>>) underlying.get().get(namespace, address, c);
InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
OutputT>>) underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryStateInternals.InMemoryCombiningValue<>(
return new InMemoryCombiningValue<>(
key, combineFn.asKeyedFn());
}
}
Expand All @@ -310,11 +325,11 @@ public <T> BagState<T> bindBag(
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends BagState<T>> existingState =
(InMemoryStateInternals.InMemoryState<? extends BagState<T>>)
(InMemoryState<? extends BagState<T>>)
underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryStateInternals.InMemoryBag<>();
return new InMemoryBag<>();
}
}

Expand All @@ -328,12 +343,11 @@ public <T> BagState<T> bindBag(
@SuppressWarnings("unchecked")
InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
existingState = (
InMemoryStateInternals
.InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
OutputT>>) underlying.get().get(namespace, address, c);
InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
OutputT>>) underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
return new InMemoryStateInternals.InMemoryCombiningValue<>(key, combineFn);
return new InMemoryCombiningValue<>(key, combineFn);
}
}

Expand Down Expand Up @@ -446,7 +460,7 @@ public InMemoryStateBinderFactory(K key) {

@Override
public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
return new InMemoryStateInternals.InMemoryStateBinder<>(key, c);
return new InMemoryStateBinder<>(key, c);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.beam.sdk.util.BaseExecutionContext;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;

/**
* Execution Context for the {@link DirectRunner}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.joda.time.Instant;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.joda.time.Instant;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util.state;
package org.apache.beam.runners.direct;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
Expand All @@ -39,6 +39,16 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaceForTest;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ public K getKey() {
return key;
}

interface InMemoryState<T extends InMemoryState<T>> {
/**
* Interface common to all in-memory state cells. Includes ability to see whether a cell has been
* cleared and the ability to create a clone of the contents.
*/
public interface InMemoryState<T extends InMemoryState<T>> {
boolean isCleared();
T copy();
}
Expand Down Expand Up @@ -94,11 +98,11 @@ public <T extends State> T state(
/**
* A {@link StateBinder} that returns In Memory {@link State} objects.
*/
static class InMemoryStateBinder<K> implements StateBinder<K> {
public static class InMemoryStateBinder<K> implements StateBinder<K> {
private final K key;
private final StateContext<?> c;

InMemoryStateBinder(K key, StateContext<?> c) {
public InMemoryStateBinder(K key, StateContext<?> c) {
this.key = key;
this.c = c;
}
Expand Down Expand Up @@ -150,7 +154,11 @@ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
}
}

static final class InMemoryValue<T> implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
/**
* An {@link InMemoryState} implementation of {@link ValueState}.
*/
public static final class InMemoryValue<T>
implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
private boolean isCleared = true;
private T value = null;

Expand Down Expand Up @@ -194,7 +202,10 @@ public boolean isCleared() {
}
}

static final class InMemoryWatermarkHold<W extends BoundedWindow>
/**
* An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
*/
public static final class InMemoryWatermarkHold<W extends BoundedWindow>
implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {

private final OutputTimeFn<? super W> outputTimeFn;
Expand Down Expand Up @@ -267,15 +278,18 @@ public InMemoryWatermarkHold<W> copy() {
}
}

static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
/**
* An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}.
*/
public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
private final K key;
private boolean isCleared = true;
private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
private AccumT accum;

InMemoryCombiningValue(
public InMemoryCombiningValue(
K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
this.key = key;
this.combineFn = combineFn;
Expand Down Expand Up @@ -353,7 +367,10 @@ public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
}
}

static final class InMemoryBag<T> implements BagState<T>, InMemoryState<InMemoryBag<T>> {
/**
* An {@link InMemoryState} implementation of {@link BagState}.
*/
public static final class InMemoryBag<T> implements BagState<T>, InMemoryState<InMemoryBag<T>> {
private List<T> contents = new ArrayList<>();

@Override
Expand Down

0 comments on commit 8f56e3d

Please sign in to comment.