From 488e8955000ee905ab26635e0efbd0834a3d0dcf Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 4 Aug 2016 20:50:28 -0700 Subject: [PATCH 1/2] Create StateSpec parallel to StateTag --- .../apache/beam/sdk/util/state/StateSpec.java | 85 ++++ .../beam/sdk/util/state/StateSpecs.java | 396 ++++++++++++++++++ 2 files changed, 481 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java new file mode 100644 index 000000000000..4d4d82944d77 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A specification of a persistent state cell. This includes information necessary to encode the + * value and details about the intended access pattern. + * + * @param The type of key that must be used with the state tag. Contravariant: methods should + * accept values of type {@code StateSpec}. + * @param The type of state being described. + */ +@Experimental(Kind.STATE) +public interface StateSpec extends Serializable { + + /** + * Visitor for binding a {@link StateSpec} and to the associated {@link State}. + * + * @param the type of key this binder embodies. + */ + public interface StateBinder { + ValueState bindValue(StateSpec> spec, Coder coder); + + BagState bindBag(StateSpec> spec, Coder elemCoder); + + AccumulatorCombiningState + bindCombiningValue( + StateSpec> spec, + Coder accumCoder, CombineFn combineFn); + + AccumulatorCombiningState + bindKeyedCombiningValue( + StateSpec> spec, + Coder accumCoder, KeyedCombineFn combineFn); + + AccumulatorCombiningState + bindKeyedCombiningValueWithContext( + StateSpec> spec, + Coder accumCoder, + KeyedCombineFnWithContext combineFn); + + /** + * Bind to a watermark {@link StateSpec}. + * + *

This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps + * added to the returned {@link WatermarkHoldState} are to be combined. + */ + WatermarkHoldState bindWatermark( + StateSpec> spec, + OutputTimeFn outputTimeFn); + } + + /** + * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. + */ + StateT bind(StateBinder binder); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java new file mode 100644 index 000000000000..685cecc11b46 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; + +import java.util.Objects; + +/** Static utility methods for creating {@link StateSpec} instances. */ +@Experimental(Kind.STATE) +public class StateSpecs { + + private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry(); + + static { + STANDARD_REGISTRY.registerStandardCoders(); + } + + private StateSpecs() {} + + /** Create a simple state spec for values of type {@code T}. */ + public static StateSpec> value(Coder valueCoder) { + return new ValueStateSpec<>(valueCoder); + } + + /** + * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + */ + public static + StateSpec> combiningValue( + Coder accumCoder, CombineFn combineFn) { + return combiningValueInternal(accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link + * KeyedCombineFn} comes from the keyed {@link StateAccessor}. + */ + public static + StateSpec> keyedCombiningValue( + Coder accumCoder, KeyedCombineFn combineFn) { + return keyedCombiningValueInternal(accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically + * merge multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link + * KeyedCombineFn} comes from the keyed {@link StateAccessor}, the context provided comes from the + * {@link StateContext}. + */ + public static + StateSpec> + keyedCombiningValueWithContext( + Coder accumCoder, + KeyedCombineFnWithContext combineFn) { + return new KeyedCombiningValueWithContextStateSpec( + accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + * + *

This determines the {@code Coder} from the given {@code Coder}, and should + * only be used to initialize static values. + */ + public static + StateSpec> + combiningValueFromInputInternal( + Coder inputCoder, CombineFn combineFn) { + try { + Coder accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); + return combiningValueInternal(accumCoder, combineFn); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to determine accumulator coder for " + + combineFn.getClass().getSimpleName() + + " from " + + inputCoder, + e); + } + } + + private static + StateSpec> combiningValueInternal( + Coder accumCoder, CombineFn combineFn) { + return new CombiningValueStateSpec(accumCoder, combineFn); + } + + private static + StateSpec> keyedCombiningValueInternal( + Coder accumCoder, KeyedCombineFn combineFn) { + return new KeyedCombiningValueStateSpec(accumCoder, combineFn); + } + + /** + * Create a state spec that is optimized for adding values frequently, and occasionally retrieving + * all the values that have been added. + */ + public static StateSpec> bag(Coder elemCoder) { + return new BagStateSpec(elemCoder); + } + + /** Create a state spec for holding the watermark. */ + public static + StateSpec> watermarkStateInternal( + OutputTimeFn outputTimeFn) { + return new WatermarkStateSpecInternal(outputTimeFn); + } + + public static + StateSpec> convertToBagSpecInternal( + StateSpec> combiningSpec) { + if (combiningSpec instanceof KeyedCombiningValueStateSpec) { + // Checked above; conversion to a bag spec depends on the provided spec being one of those + // created via the factory methods in this class. + @SuppressWarnings("unchecked") + KeyedCombiningValueStateSpec typedSpec = + (KeyedCombiningValueStateSpec) combiningSpec; + return typedSpec.asBagSpec(); + } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) { + @SuppressWarnings("unchecked") + KeyedCombiningValueWithContextStateSpec typedSpec = + (KeyedCombiningValueWithContextStateSpec) combiningSpec; + return typedSpec.asBagSpec(); + } else { + throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec); + } + } + + /** + * A value state cell for values of type {@code T}. + * + * @param the type of value being stored + */ + private static class ValueStateSpec implements StateSpec> { + + private final Coder coder; + + private ValueStateSpec(Coder coder) { + this.coder = coder; + } + + @Override + public ValueState bind(StateSpec.StateBinder visitor) { + return visitor.bindValue(this, coder); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof ValueStateSpec)) { + return false; + } + + ValueStateSpec that = (ValueStateSpec) obj; + return Objects.equals(this.coder, that.coder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), coder); + } + } + + /** + * A state cell for values that are combined according to a {@link CombineFn}. + * + * @param the type of input values + * @param type of mutable accumulator values + * @param type of output values + */ + private static class CombiningValueStateSpec + extends KeyedCombiningValueStateSpec + implements StateSpec> { + + private final Coder accumCoder; + private final CombineFn combineFn; + + private CombiningValueStateSpec( + Coder accumCoder, CombineFn combineFn) { + super(accumCoder, combineFn.asKeyedFn()); + this.combineFn = combineFn; + this.accumCoder = accumCoder; + } + } + + /** + * A state cell for values that are combined according to a {@link KeyedCombineFnWithContext}. + * + * @param the type of keys + * @param the type of input values + * @param type of mutable accumulator values + * @param type of output values + */ + private static class KeyedCombiningValueWithContextStateSpec + implements StateSpec> { + + private final Coder accumCoder; + private final KeyedCombineFnWithContext combineFn; + + protected KeyedCombiningValueWithContextStateSpec( + Coder accumCoder, KeyedCombineFnWithContext combineFn) { + this.combineFn = combineFn; + this.accumCoder = accumCoder; + } + + @Override + public AccumulatorCombiningState bind( + StateBinder visitor) { + return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) { + return false; + } + + KeyedCombiningValueWithContextStateSpec that = + (KeyedCombiningValueWithContextStateSpec) obj; + return Objects.equals(this.accumCoder, that.accumCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), accumCoder); + } + + private StateSpec> asBagSpec() { + return new BagStateSpec(accumCoder); + } + } + + /** + * A state cell for values that are combined according to a {@link KeyedCombineFn}. + * + * @param the type of keys + * @param the type of input values + * @param type of mutable accumulator values + * @param type of output values + */ + private static class KeyedCombiningValueStateSpec + implements StateSpec> { + + private final Coder accumCoder; + private final KeyedCombineFn keyedCombineFn; + + protected KeyedCombiningValueStateSpec( + Coder accumCoder, KeyedCombineFn keyedCombineFn) { + this.keyedCombineFn = keyedCombineFn; + this.accumCoder = accumCoder; + } + + @Override + public AccumulatorCombiningState bind( + StateBinder visitor) { + return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof CombiningValueStateSpec)) { + return false; + } + + KeyedCombiningValueStateSpec that = + (KeyedCombiningValueStateSpec) obj; + return Objects.equals(this.accumCoder, that.accumCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), accumCoder); + } + + private StateSpec> asBagSpec() { + return new BagStateSpec(accumCoder); + } + } + + /** + * A state cell optimized for bag-like access patterns (frequent additions, occasional reads of + * all the values). + * + * @param the type of value in the bag + */ + private static class BagStateSpec implements StateSpec> { + + private final Coder elemCoder; + + private BagStateSpec(Coder elemCoder) { + this.elemCoder = elemCoder; + } + + @Override + public BagState bind(StateBinder visitor) { + return visitor.bindBag(this, elemCoder); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof BagStateSpec)) { + return false; + } + + BagStateSpec that = (BagStateSpec) obj; + return Objects.equals(this.elemCoder, that.elemCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), elemCoder); + } + } + + private static class WatermarkStateSpecInternal + implements StateSpec> { + + /** + * When multiple output times are added to hold the watermark, this determines how they are + * combined, and also the behavior when merging windows. Does not contribute to equality/hash + * since we have at most one watermark hold spec per computation. + */ + private final OutputTimeFn outputTimeFn; + + private WatermarkStateSpecInternal(OutputTimeFn outputTimeFn) { + this.outputTimeFn = outputTimeFn; + } + + @Override + public WatermarkHoldState bind(StateBinder visitor) { + return visitor.bindWatermark(this, outputTimeFn); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof WatermarkStateSpecInternal)) { + return false; + } + + // All instance of WatermarkHoldState are considered equal + return true; + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + } +} From b4bde87c511c226496f848e4e023c081482cd22a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 4 Aug 2016 21:48:48 -0700 Subject: [PATCH 2/2] Make StateTag carry a StateSpec separately from its id --- .../streaming/state/FlinkStateInternals.java | 54 ++- .../main/resources/beam/findbugs-filter.xml | 2 +- .../CopyOnAccessInMemoryStateInternals.java | 120 +++--- .../util/state/InMemoryStateInternals.java | 26 +- .../beam/sdk/util/state/StateBinder.java | 67 +++ .../apache/beam/sdk/util/state/StateSpec.java | 47 +-- .../beam/sdk/util/state/StateSpecs.java | 20 +- .../beam/sdk/util/state/StateTable.java | 14 +- .../apache/beam/sdk/util/state/StateTag.java | 47 +-- .../apache/beam/sdk/util/state/StateTags.java | 390 +++--------------- .../apache/beam/sdk/util/TriggerTester.java | 6 +- 11 files changed, 246 insertions(+), 547 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index e6a43dcb03b6..b274f100d2d6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -25,19 +25,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTable; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.util.state.*; import org.apache.beam.sdk.values.PCollectionView; import com.google.protobuf.ByteString; @@ -98,45 +86,51 @@ private interface CheckpointableIF { protected final StateTable inMemoryState = new StateTable() { @Override - protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext c) { - return new StateTag.StateBinder() { + protected StateBinder binderForNamespace(final StateNamespace namespace, final StateContext c) { + return new StateBinder() { @Override - public ValueState bindValue(StateTag> address, Coder coder) { - return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder); + public ValueState bindValue(String id, StateSpec> spec, Coder coder) { + return new FlinkInMemoryValue<>(encodeKey(namespace, id), coder); } @Override - public BagState bindBag(StateTag> address, Coder elemCoder) { - return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder); + public BagState bindBag(String id, StateSpec> spec, Coder elemCoder) { + return new FlinkInMemoryBag<>(encodeKey(namespace, id), elemCoder); } @Override public AccumulatorCombiningState bindCombiningValue( - StateTag> address, + String id, + StateSpec> spec, Coder accumCoder, Combine.CombineFn combineFn) { - return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); + return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, id), combineFn, accumCoder, c); } @Override public AccumulatorCombiningState bindKeyedCombiningValue( - StateTag> address, + String id, + StateSpec> spec, Coder accumCoder, Combine.KeyedCombineFn combineFn) { - return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); + return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, id), combineFn, accumCoder, c); } @Override public AccumulatorCombiningState bindKeyedCombiningValueWithContext( - StateTag> address, + String id, + StateSpec> spec, Coder accumCoder, CombineWithContext.KeyedCombineFnWithContext combineFn) { - return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); + return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, id), combineFn, accumCoder, c); } @Override - public WatermarkHoldState bindWatermark(StateTag> address, OutputTimeFn outputTimeFn) { - return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn); + public WatermarkHoldState bindWatermark( + String id, + StateSpec> spec, + OutputTimeFn outputTimeFn) { + return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, id), outputTimeFn); } }; } @@ -184,7 +178,7 @@ public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loa /** * We remove the first character which encodes the type of the stateTag ('s' for system * and 'u' for user). For more details check out the source of - * {@link StateTags.StateTagBase#getId()}. + * {@link StateTags.SimpleStateTag#getId()}. */ private void decodeState(StateCheckpointReader reader, ClassLoader loader) throws IOException, ClassNotFoundException { @@ -277,12 +271,12 @@ private void decodeState(StateCheckpointReader reader, ClassLoader loader) } } - private ByteString encodeKey(StateNamespace namespace, StateTag address) { + private ByteString encodeKey(StateNamespace namespace, String id) { StringBuilder sb = new StringBuilder(); try { namespace.appendTo(sb); sb.append('+'); - address.appendTo(sb); + sb.append(id); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index f117fbfcb15b..11cfb6e6edec 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -134,7 +134,7 @@ - + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java index 3cc34a694077..48082ba7fd74 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryState; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; import com.google.common.base.Optional; import com.google.common.collect.Iterables; @@ -251,9 +250,11 @@ public CopyOnBindBinderFactory(K key, Optional> underlying) { this.underlying = underlying; } - private boolean containedInUnderlying(StateNamespace namespace, StateTag tag) { - return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace) - && underlying.get().getTagsInUse(namespace).containsKey(tag); + private boolean containedInUnderlying( + StateNamespace namespace, String id, StateSpec spec) { + return underlying.isPresent() + && underlying.get().isNamespaceInUse(namespace) + && underlying.get().getTagsInUse(namespace).containsKey(StateTags.tagForSpec(id, spec)); } @Override @@ -261,28 +262,28 @@ public StateBinder forNamespace(final StateNamespace namespace, final StateCo return new StateBinder() { @Override public WatermarkHoldState bindWatermark( - StateTag> address, + String id, + StateSpec> spec, OutputTimeFn outputTimeFn) { - if (containedInUnderlying(namespace, address)) { + if (containedInUnderlying(namespace, id, spec)) { @SuppressWarnings("unchecked") InMemoryState> existingState = (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); + underlying.get().get(namespace, StateTags.tagForSpec(id, spec), c); return existingState.copy(); } else { - return new InMemoryStateInternals.InMemoryWatermarkHold<>( - outputTimeFn); + return new InMemoryStateInternals.InMemoryWatermarkHold<>(outputTimeFn); } } @Override public ValueState bindValue( - StateTag> address, Coder coder) { - if (containedInUnderlying(namespace, address)) { + String id, StateSpec> spec, Coder coder) { + if (containedInUnderlying(namespace, id, spec)) { @SuppressWarnings("unchecked") InMemoryState> existingState = (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); + underlying.get().get(namespace, StateTags.tagForSpec(id, spec), c); return existingState.copy(); } else { return new InMemoryStateInternals.InMemoryValue<>(); @@ -290,17 +291,19 @@ public ValueState bindValue( } @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, CombineFn combineFn) { - if (containedInUnderlying(namespace, address)) { + public + AccumulatorCombiningState bindCombiningValue( + String id, + StateSpec> spec, + Coder accumCoder, + CombineFn combineFn) { + if (containedInUnderlying(namespace, id, spec)) { @SuppressWarnings("unchecked") InMemoryState> - existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); + existingState = + (InMemoryStateInternals.InMemoryState< + ? extends AccumulatorCombiningState>) + underlying.get().get(namespace, StateTags.tagForSpec(id, spec), c); return existingState.copy(); } else { return new InMemoryStateInternals.InMemoryCombiningValue<>( @@ -310,12 +313,12 @@ public ValueState bindValue( @Override public BagState bindBag( - StateTag> address, Coder elemCoder) { - if (containedInUnderlying(namespace, address)) { + String id, StateSpec> spec, Coder elemCoder) { + if (containedInUnderlying(namespace, id, spec)) { @SuppressWarnings("unchecked") InMemoryState> existingState = (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); + underlying.get().get(namespace, StateTags.tagForSpec(id, spec), c); return existingState.copy(); } else { return new InMemoryStateInternals.InMemoryBag<>(); @@ -323,18 +326,19 @@ public BagState bindBag( } @Override - public AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, + public + AccumulatorCombiningState bindKeyedCombiningValue( + String id, + StateSpec> spec, Coder accumCoder, KeyedCombineFn combineFn) { - if (containedInUnderlying(namespace, address)) { + if (containedInUnderlying(namespace, id, spec)) { @SuppressWarnings("unchecked") InMemoryState> - existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); + existingState = + (InMemoryStateInternals.InMemoryState< + ? extends AccumulatorCombiningState>) + underlying.get().get(namespace, StateTags.tagForSpec(id, spec), c); return existingState.copy(); } else { return new InMemoryStateInternals.InMemoryCombiningValue<>(key, combineFn); @@ -342,13 +346,14 @@ public BagState bindBag( } @Override - public AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, + public + AccumulatorCombiningState bindKeyedCombiningValueWithContext( + String id, + StateSpec> spec, Coder accumCoder, KeyedCombineFnWithContext combineFn) { return bindKeyedCombiningValue( - address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + id, spec, accumCoder, CombineFnUtil.bindContext(combineFn, c)); } }; } @@ -394,48 +399,53 @@ public StateBinder forNamespace(final StateNamespace namespace, final StateCo return new StateBinder() { @Override public WatermarkHoldState bindWatermark( - StateTag> address, + String id, + StateSpec> spec, OutputTimeFn outputTimeFn) { - return underlying.get(namespace, address, c); + return underlying.get(namespace, StateTags.tagForSpec(id, spec), c); } @Override public ValueState bindValue( - StateTag> address, Coder coder) { - return underlying.get(namespace, address, c); + String id, StateSpec> spec, Coder coder) { + return underlying.get(namespace, StateTags.tagForSpec(id, spec), c); } @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, CombineFn combineFn) { - return underlying.get(namespace, address, c); + public + AccumulatorCombiningState bindCombiningValue( + String id, + StateSpec> spec, + Coder accumCoder, + CombineFn combineFn) { + return underlying.get(namespace, StateTags.tagForSpec(id, spec), c); } @Override public BagState bindBag( - StateTag> address, Coder elemCoder) { - return underlying.get(namespace, address, c); + String id, StateSpec> spec, Coder elemCoder) { + return underlying.get(namespace, StateTags.tagForSpec(id, spec), c); } @Override - public AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, + public + AccumulatorCombiningState bindKeyedCombiningValue( + String id, + StateSpec> spec, Coder accumCoder, KeyedCombineFn combineFn) { - return underlying.get(namespace, address, c); + return underlying.get(namespace, StateTags.tagforSpec(id, spec), c); } @Override - public AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, + public + AccumulatorCombiningState bindKeyedCombiningValueWithContext( + String id, + StateSpec> address, Coder accumCoder, KeyedCombineFnWithContext combineFn) { return bindKeyedCombiningValue( - address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + id, address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); } }; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java index 1d5d4325dad2..c3c835402545 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java @@ -26,15 +26,12 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; - import org.joda.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; - import javax.annotation.Nullable; /** @@ -108,20 +105,21 @@ static class InMemoryStateBinder implements StateBinder { @Override public ValueState bindValue( - StateTag> address, Coder coder) { + String id, StateSpec> address, Coder coder) { return new InMemoryValue(); } @Override public BagState bindBag( - final StateTag> address, Coder elemCoder) { + String id, final StateSpec> address, Coder elemCoder) { return new InMemoryBag(); } @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, + public + AccumulatorCombiningState bindCombiningValue( + String id, + StateSpec> address, Coder accumCoder, final CombineFn combineFn) { return new InMemoryCombiningValue(key, combineFn.asKeyedFn()); @@ -129,7 +127,8 @@ public BagState bindBag( @Override public WatermarkHoldState bindWatermark( - StateTag> address, + String id, + StateSpec> address, OutputTimeFn outputTimeFn) { return new InMemoryWatermarkHold(outputTimeFn); } @@ -137,7 +136,8 @@ public WatermarkHoldState bindWatermark( @Override public AccumulatorCombiningState bindKeyedCombiningValue( - StateTag> address, + String id, + StateSpec> address, Coder accumCoder, KeyedCombineFn combineFn) { return new InMemoryCombiningValue(key, combineFn); @@ -146,10 +146,12 @@ public WatermarkHoldState bindWatermark( @Override public AccumulatorCombiningState bindKeyedCombiningValueWithContext( - StateTag> address, + String id, + StateSpec> address, Coder accumCoder, KeyedCombineFnWithContext combineFn) { - return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + return bindKeyedCombiningValue( + id, address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java new file mode 100644 index 000000000000..0521e15ca099 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; + +/** + * Visitor for binding a {@link StateSpec} and to the associated {@link State}. + * + * @param the type of key this binder embodies. + */ +public interface StateBinder { + ValueState bindValue(String id, StateSpec> spec, Coder coder); + + BagState bindBag(String id, StateSpec> spec, Coder elemCoder); + + AccumulatorCombiningState bindCombiningValue( + String id, + StateSpec> spec, + Coder accumCoder, + Combine.CombineFn combineFn); + + + AccumulatorCombiningState bindKeyedCombiningValue( + String id, + StateSpec> spec, + Coder accumCoder, + Combine.KeyedCombineFn combineFn); + + + AccumulatorCombiningState bindKeyedCombiningValueWithContext( + String id, + StateSpec> spec, + Coder accumCoder, + CombineWithContext.KeyedCombineFnWithContext + combineFn); + + /** + * Bind to a watermark {@link StateSpec}. + * + *

This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to + * the returned {@link WatermarkHoldState} are to be combined. + */ + WatermarkHoldState bindWatermark( + String id, + StateSpec> spec, + OutputTimeFn outputTimeFn); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java index 4d4d82944d77..76f26580b566 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java @@ -19,15 +19,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import java.io.IOException; import java.io.Serializable; /** @@ -41,45 +33,8 @@ @Experimental(Kind.STATE) public interface StateSpec extends Serializable { - /** - * Visitor for binding a {@link StateSpec} and to the associated {@link State}. - * - * @param the type of key this binder embodies. - */ - public interface StateBinder { - ValueState bindValue(StateSpec> spec, Coder coder); - - BagState bindBag(StateSpec> spec, Coder elemCoder); - - AccumulatorCombiningState - bindCombiningValue( - StateSpec> spec, - Coder accumCoder, CombineFn combineFn); - - AccumulatorCombiningState - bindKeyedCombiningValue( - StateSpec> spec, - Coder accumCoder, KeyedCombineFn combineFn); - - AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateSpec> spec, - Coder accumCoder, - KeyedCombineFnWithContext combineFn); - - /** - * Bind to a watermark {@link StateSpec}. - * - *

This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps - * added to the returned {@link WatermarkHoldState} are to be combined. - */ - WatermarkHoldState bindWatermark( - StateSpec> spec, - OutputTimeFn outputTimeFn); - } - /** * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. */ - StateT bind(StateBinder binder); + StateT bind(String id, StateBinder binder); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index 685cecc11b46..50365f43e597 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -168,8 +168,8 @@ private ValueStateSpec(Coder coder) { } @Override - public ValueState bind(StateSpec.StateBinder visitor) { - return visitor.bindValue(this, coder); + public ValueState bind(String id, StateBinder visitor) { + return visitor.bindValue(id, this, coder); } @Override @@ -236,8 +236,8 @@ protected KeyedCombiningValueWithContextStateSpec( @Override public AccumulatorCombiningState bind( - StateBinder visitor) { - return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn); + String id, StateBinder visitor) { + return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn); } @Override @@ -287,8 +287,8 @@ protected KeyedCombiningValueStateSpec( @Override public AccumulatorCombiningState bind( - StateBinder visitor) { - return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn); + String id, StateBinder visitor) { + return visitor.bindKeyedCombiningValue(id, this, accumCoder, keyedCombineFn); } @Override @@ -331,8 +331,8 @@ private BagStateSpec(Coder elemCoder) { } @Override - public BagState bind(StateBinder visitor) { - return visitor.bindBag(this, elemCoder); + public BagState bind(String id, StateBinder visitor) { + return visitor.bindBag(id, this, elemCoder); } @Override @@ -370,8 +370,8 @@ private WatermarkStateSpecInternal(OutputTimeFn outputTimeFn) { } @Override - public WatermarkHoldState bind(StateBinder visitor) { - return visitor.bindWatermark(this, outputTimeFn); + public WatermarkHoldState bind(String id, StateBinder visitor) { + return visitor.bindWatermark(id, this, outputTimeFn); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTable.java index 2ae651679d9e..17ff53ea2761 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTable.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTable.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.util.state; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; - import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; @@ -39,7 +37,8 @@ public abstract class StateTable { * already present in this {@link StateTable}. */ public StateT get( - StateNamespace namespace, StateTag tag, StateContext c) { + StateNamespace namespace, + StateTag tag, StateContext c) { State storage = stateTable.get(namespace, tag); if (storage != null) { @SuppressWarnings("unchecked") @@ -47,7 +46,7 @@ public StateT get( return typedStorage; } - StateT typedStorage = tag.bind(binderForNamespace(namespace, c)); + StateT typedStorage = tag.getSpec().bind(tag.getId(), binderForNamespace(namespace, c)); stateTable.put(namespace, tag, typedStorage); return typedStorage; } @@ -77,8 +76,9 @@ public Set getNamespacesInUse() { } /** - * Provide the {@code StateBinder} to use for creating {@code Storage} instances - * in the specified {@code namespace}. + * Provide the {@code StateBinder} to use for creating {@code Storage} instances in the specified + * {@code namespace}. */ - protected abstract StateBinder binderForNamespace(StateNamespace namespace, StateContext c); + protected abstract StateBinder binderForNamespace( + StateNamespace namespace, StateContext c); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java index 388b5e1cc7f9..bffb58075a33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java @@ -19,13 +19,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import java.io.IOException; import java.io.Serializable; @@ -46,43 +40,6 @@ @Experimental(Kind.STATE) public interface StateTag extends Serializable { - /** - * Visitor for binding a {@link StateTag} and to the associated {@link State}. - * - * @param the type of key this binder embodies. - */ - public interface StateBinder { - ValueState bindValue(StateTag> address, Coder coder); - - BagState bindBag(StateTag> address, Coder elemCoder); - - AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, CombineFn combineFn); - - AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, - Coder accumCoder, KeyedCombineFn combineFn); - - AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, - Coder accumCoder, - KeyedCombineFnWithContext combineFn); - - /** - * Bind to a watermark {@link StateTag}. - * - *

This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps - * added to the returned {@link WatermarkHoldState} are to be combined. - */ - WatermarkHoldState bindWatermark( - StateTag> address, - OutputTimeFn outputTimeFn); - } - /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */ void appendTo(Appendable sb) throws IOException; @@ -92,7 +49,7 @@ WatermarkHoldState bindWatermark( String getId(); /** - * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. + * Returns the spec for the enclosed state cell. */ - StateT bind(StateBinder binder); + StateSpec getSpec(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java index e50ad8d00f9d..bc0931eb46e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java @@ -19,7 +19,6 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -63,11 +62,28 @@ private interface SystemStateTag { StateTag asKind(StateKind kind); } + /** Create a state tag for the given id and spec. */ + public static StateTag tagForSpec( + String id, StateSpec spec) { + return new SimpleStateTag<>(new StructuredId(id), spec); + } + + /** + * Quirk of type system unresolved. TODO: make this and the above method one method. + */ + public static + StateTag> tagforSpec( + String id, + StateSpec> spec) { + return new SimpleStateTag<>(new StructuredId(id), spec); + } + + /** * Create a simple state tag for values of type {@code T}. */ public static StateTag> value(String id, Coder valueCoder) { - return new ValueStateTag<>(new StructuredId(id), valueCoder); + return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder)); } /** @@ -78,7 +94,8 @@ public static StateTag> value(String id, Coder valu StateTag> combiningValue( String id, Coder accumCoder, CombineFn combineFn) { - return combiningValueInternal(id, accumCoder, combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn)); } /** @@ -90,7 +107,8 @@ public static StateTag> value(String id, Coder valu OutputT> StateTag> keyedCombiningValue(String id, Coder accumCoder, KeyedCombineFn combineFn) { - return keyedCombiningValueInternal(id, accumCoder, combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn)); } /** @@ -105,10 +123,8 @@ public static StateTag> value(String id, Coder valu String id, Coder accumCoder, KeyedCombineFnWithContext combineFn) { - return new KeyedCombiningValueWithContextStateTag( - new StructuredId(id), - accumCoder, - combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn)); } /** @@ -122,32 +138,8 @@ public static StateTag> value(String id, Coder valu StateTag> combiningValueFromInputInternal( String id, Coder inputCoder, CombineFn combineFn) { - try { - Coder accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); - return combiningValueInternal(id, accumCoder, combineFn); - } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException( - "Unable to determine accumulator coder for " + combineFn.getClass().getSimpleName() - + " from " + inputCoder, e); - } - } - - private static StateTag> - combiningValueInternal( - String id, Coder accumCoder, CombineFn combineFn) { - return - new CombiningValueStateTag( - new StructuredId(id), accumCoder, combineFn); - } - - private static - StateTag> keyedCombiningValueInternal( - String id, - Coder accumCoder, - KeyedCombineFn combineFn) { - return new KeyedCombiningValueStateTag( - new StructuredId(id), accumCoder, combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn)); } /** @@ -155,7 +147,7 @@ StateTag> keyedCombiningVa * occasionally retrieving all the values that have been added. */ public static StateTag> bag(String id, Coder elemCoder) { - return new BagStateTag(new StructuredId(id), elemCoder); + return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder)); } /** @@ -163,7 +155,8 @@ public static StateTag> bag(String id, Coder elemCode */ public static StateTag> watermarkStateInternal(String id, OutputTimeFn outputTimeFn) { - return new WatermarkStateTagInternal(new StructuredId(id), outputTimeFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn)); } /** @@ -173,7 +166,7 @@ public static StateTag> bag(String id, Coder elemCode public static StateTag makeSystemTagInternal( StateTag tag) { if (!(tag instanceof SystemStateTag)) { - throw new IllegalArgumentException("Expected subclass of StateTagBase, got " + tag); + throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag); } // Checked above @SuppressWarnings("unchecked") @@ -184,21 +177,9 @@ public static StateTag makeSystemTagInterna public static StateTag> convertToBagTagInternal( StateTag> combiningTag) { - if (combiningTag instanceof KeyedCombiningValueStateTag) { - // Checked above; conversion to a bag tag depends on the provided tag being one of those - // created via the factory methods in this class. - @SuppressWarnings("unchecked") - KeyedCombiningValueStateTag typedTag = - (KeyedCombiningValueStateTag) combiningTag; - return typedTag.asBagTag(); - } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) { - @SuppressWarnings("unchecked") - KeyedCombiningValueWithContextStateTag typedTag = - (KeyedCombiningValueWithContextStateTag) combiningTag; - return typedTag.asBagTag(); - } else { - throw new IllegalArgumentException("Unexpected StateTag " + combiningTag); - } + return new SimpleStateTag<>( + new StructuredId(combiningTag.getId()), + StateSpecs.convertToBagSpecInternal(combiningTag.getSpec())); } private static class StructuredId implements Serializable { @@ -256,15 +237,17 @@ public int hashCode() { } /** - * A base class that just manages the structured ids. + * A basic {@link StateTag} implementation that manages the structured ids. */ - private abstract static class StateTagBase + private static class SimpleStateTag implements StateTag, SystemStateTag { - protected final StructuredId id; + private final StateSpec spec; + private final StructuredId id; - protected StateTagBase(StructuredId id) { + public SimpleStateTag(StructuredId id, StateSpec spec) { this.id = id; + this.spec = spec; } @Override @@ -272,6 +255,11 @@ public String getId() { return id.getRawId(); } + @Override + public StateSpec getSpec() { + return spec; + } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) @@ -285,298 +273,24 @@ public void appendTo(Appendable sb) throws IOException { } @Override - public abstract StateTag asKind(StateKind kind); - } - - /** - * A value state cell for values of type {@code T}. - * - * @param the type of value being stored - */ - private static class ValueStateTag extends StateTagBase> - implements StateTag> { - - private final Coder coder; - - private ValueStateTag(StructuredId id, Coder coder) { - super(id); - this.coder = coder; - } - - @Override - public ValueState bind(StateBinder visitor) { - return visitor.bindValue(this, coder); + public StateTag asKind(StateKind kind) { + return new SimpleStateTag<>(id.asKind(kind), spec); } @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof ValueStateTag)) { + public boolean equals(Object other) { + if (!(other instanceof SimpleStateTag)) { return false; } - ValueStateTag that = (ValueStateTag) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.coder, that.coder); + SimpleStateTag otherTag = (SimpleStateTag) other; + return Objects.equals(this.getId(), otherTag.getId()) + && Objects.equals(this.getSpec(), otherTag.getSpec()); } @Override public int hashCode() { - return Objects.hash(getClass(), id, coder); - } - - @Override - public StateTag> asKind(StateKind kind) { - return new ValueStateTag(id.asKind(kind), coder); - } - } - - /** - * A state cell for values that are combined according to a {@link CombineFn}. - * - * @param the type of input values - * @param type of mutable accumulator values - * @param type of output values - */ - private static class CombiningValueStateTag - extends KeyedCombiningValueStateTag - implements StateTag>, - SystemStateTag> { - - private final Coder accumCoder; - private final CombineFn combineFn; - - private CombiningValueStateTag( - StructuredId id, - Coder accumCoder, CombineFn combineFn) { - super(id, accumCoder, combineFn.asKeyedFn()); - this.combineFn = combineFn; - this.accumCoder = accumCoder; - } - - @Override - public StateTag> - asKind(StateKind kind) { - return new CombiningValueStateTag( - id.asKind(kind), accumCoder, combineFn); - } - } - - /** - * A state cell for values that are combined according to a {@link KeyedCombineFnWithContext}. - * - * @param the type of keys - * @param the type of input values - * @param type of mutable accumulator values - * @param type of output values - */ - private static class KeyedCombiningValueWithContextStateTag - extends StateTagBase> - implements SystemStateTag> { - - private final Coder accumCoder; - private final KeyedCombineFnWithContext combineFn; - - protected KeyedCombiningValueWithContextStateTag( - StructuredId id, - Coder accumCoder, - KeyedCombineFnWithContext combineFn) { - super(id); - this.combineFn = combineFn; - this.accumCoder = accumCoder; - } - - @Override - public AccumulatorCombiningState bind( - StateBinder visitor) { - return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof KeyedCombiningValueWithContextStateTag)) { - return false; - } - - KeyedCombiningValueWithContextStateTag that = - (KeyedCombiningValueWithContextStateTag) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.accumCoder, that.accumCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id, accumCoder); - } - - @Override - public StateTag> asKind( - StateKind kind) { - return new KeyedCombiningValueWithContextStateTag<>( - id.asKind(kind), accumCoder, combineFn); - } - - private StateTag> asBagTag() { - return new BagStateTag(id, accumCoder); - } - } - - /** - * A state cell for values that are combined according to a {@link KeyedCombineFn}. - * - * @param the type of keys - * @param the type of input values - * @param type of mutable accumulator values - * @param type of output values - */ - private static class KeyedCombiningValueStateTag - extends StateTagBase> - implements SystemStateTag> { - - private final Coder accumCoder; - private final KeyedCombineFn keyedCombineFn; - - protected KeyedCombiningValueStateTag( - StructuredId id, - Coder accumCoder, KeyedCombineFn keyedCombineFn) { - super(id); - this.keyedCombineFn = keyedCombineFn; - this.accumCoder = accumCoder; - } - - @Override - public AccumulatorCombiningState bind( - StateBinder visitor) { - return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof CombiningValueStateTag)) { - return false; - } - - KeyedCombiningValueStateTag that = (KeyedCombiningValueStateTag) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.accumCoder, that.accumCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id, accumCoder); - } - - @Override - public StateTag> asKind( - StateKind kind) { - return new KeyedCombiningValueStateTag<>(id.asKind(kind), accumCoder, keyedCombineFn); - } - - private StateTag> asBagTag() { - return new BagStateTag(id, accumCoder); - } - } - - /** - * A state cell optimized for bag-like access patterns (frequent additions, occasional reads - * of all the values). - * - * @param the type of value in the bag - */ - private static class BagStateTag extends StateTagBase> - implements StateTag>{ - - private final Coder elemCoder; - - private BagStateTag(StructuredId id, Coder elemCoder) { - super(id); - this.elemCoder = elemCoder; - } - - @Override - public BagState bind(StateBinder visitor) { - return visitor.bindBag(this, elemCoder); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof BagStateTag)) { - return false; - } - - BagStateTag that = (BagStateTag) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.elemCoder, that.elemCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id, elemCoder); - } - - @Override - public StateTag> asKind(StateKind kind) { - return new BagStateTag<>(id.asKind(kind), elemCoder); - } - } - - private static class WatermarkStateTagInternal - extends StateTagBase> { - - /** - * When multiple output times are added to hold the watermark, this determines how they are - * combined, and also the behavior when merging windows. Does not contribute to equality/hash - * since we have at most one watermark hold tag per computation. - */ - private final OutputTimeFn outputTimeFn; - - private WatermarkStateTagInternal(StructuredId id, OutputTimeFn outputTimeFn) { - super(id); - this.outputTimeFn = outputTimeFn; - } - - @Override - public WatermarkHoldState bind(StateBinder visitor) { - return visitor.bindWatermark(this, outputTimeFn); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof WatermarkStateTagInternal)) { - return false; - } - - WatermarkStateTagInternal that = (WatermarkStateTagInternal) obj; - return Objects.equals(this.id, that.id); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id); - } - - @Override - public StateTag> asKind(StateKind kind) { - return new WatermarkStateTagInternal(id.asKind(kind), outputTimeFn); + return Objects.hash(getClass(), this.getId(), this.getSpec()); } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index 4892bbd23c72..9d7170175fd6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -365,12 +365,12 @@ public TestInMemoryStateInternals() { super(null); } - public Set> getTagsInUse(StateNamespace namespace) { - Set> inUse = new HashSet<>(); + public Set getTagsInUse(StateNamespace namespace) { + Set inUse = new HashSet<>(); for (Map.Entry, State> entry : inMemoryState.getTagsInUse(namespace).entrySet()) { if (!isEmptyForTesting(entry.getValue())) { - inUse.add(entry.getKey()); + inUse.add(entry.getKey().getId()); } } return inUse;