From 33b71aed2d81013d5a8013356965b7db76685223 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 31 Jan 2018 10:38:42 -0800 Subject: [PATCH 1/5] google-java-format --- .../direct/FlattenEvaluatorFactoryTest.java | 14 +- .../runners/dataflow/BatchViewOverrides.java | 548 +++++++++--------- .../apache/beam/sdk/util/WindowedValue.java | 184 ++---- .../beam/sdk/util/WindowedValueTest.java | 18 +- .../beam/sdk/nexmark/queries/Query4.java | 17 +- .../beam/sdk/nexmark/queries/Query6.java | 4 +- .../beam/sdk/nexmark/queries/Query9.java | 4 +- .../beam/sdk/nexmark/queries/WinningBids.java | 86 +-- 8 files changed, 391 insertions(+), 484 deletions(-) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index a9f106442ba3..3c0012639ee3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -41,15 +41,12 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link FlattenEvaluatorFactory}. - */ +/** Tests for {@link FlattenEvaluatorFactory}. */ @RunWith(JUnit4.class) public class FlattenEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); - @Rule - public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Test public void testFlattenInMemoryEvaluator() throws Exception { @@ -59,10 +56,8 @@ public void testFlattenInMemoryEvaluator() throws Exception { PCollection flattened = list.apply(Flatten.pCollections()); - CommittedBundle leftBundle = - bundleFactory.createBundle(left).commit(Instant.now()); - CommittedBundle rightBundle = - bundleFactory.createBundle(right).commit(Instant.now()); + CommittedBundle leftBundle = bundleFactory.createBundle(left).commit(Instant.now()); + CommittedBundle rightBundle = bundleFactory.createBundle(right).commit(Instant.now()); EvaluationContext context = mock(EvaluationContext.class); @@ -141,5 +136,4 @@ public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Except leftSideResult.getTransform(), Matchers.>equalTo(flattendProducer)); } - } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 7c4df9f91dc3..727707fa729d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -86,68 +86,68 @@ */ class BatchViewOverrides { /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} for the - * Dataflow runner in batch mode. + * Specialized implementation for {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} for + * the Dataflow runner in batch mode. + * + *

Creates a set of {@code Ism} files sharded by the hash of the key's byte representation. + * Each record is structured as follows: * - *

Creates a set of {@code Ism} files sharded by the hash of the key's byte - * representation. Each record is structured as follows: *

    - *
  • Key 1: User key K
  • - *
  • Key 2: Window
  • - *
  • Key 3: 0L (constant)
  • - *
  • Value: Windowed value
  • + *
  • Key 1: User key K + *
  • Key 2: Window + *
  • Key 3: 0L (constant) + *
  • Value: Windowed value *
* *

Alongside the data records, there are the following metadata records: + * *

    - *
  • Key 1: Metadata Key
  • - *
  • Key 2: Window
  • - *
  • Key 3: Index [0, size of map]
  • - *
  • Value: variable length long byte representation of size of map if index is 0, - * otherwise the byte representation of a key
  • + *
  • Key 1: Metadata Key + *
  • Key 2: Window + *
  • Key 3: Index [0, size of map] + *
  • Value: variable length long byte representation of size of map if index is 0, otherwise + * the byte representation of a key *
- * The {@code [META, Window, 0]} record stores the number of unique keys per window, while - * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. - * This allows for one to access the size of the map by looking at {@code [META, Window, 0]} - * and iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in - * {@code [1, size of map]}. * - *

Note that in the case of a non-deterministic key coder, we fallback to using - * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} printing - * a warning to users to specify a deterministic key coder. + *

The {@code [META, Window, 0]} record stores the number of unique keys per window, while + * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. + * This allows for one to access the size of the map by looking at {@code [META, Window, 0]} and + * iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in {@code [1, + * size of map]}. + * + *

Note that in the case of a non-deterministic key coder, we fallback to using {@link + * org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} printing a warning to users + * to specify a deterministic key coder. */ - static class BatchViewAsMap - extends PTransform>, PCollection> { + static class BatchViewAsMap extends PTransform>, PCollection> { /** - * A {@link DoFn} which groups elements by window boundaries. For each group, - * the group of elements is transformed into a {@link TransformedMap}. - * The transformed {@code Map} is backed by a {@code Map>} - * and contains a function {@code WindowedValue -> V}. + * A {@link DoFn} which groups elements by window boundaries. For each group, the group of + * elements is transformed into a {@link TransformedMap}. The transformed {@code Map} is + * backed by a {@code Map>} and contains a function {@code WindowedValue + * -> V}. * *

Outputs {@link IsmRecord}s having: + * *

    - *
  • Key 1: Window
  • - *
  • Value: Transformed map containing a transform that removes the encapsulation - * of the window around each value, - * {@code Map> -> Map}.
  • + *
  • Key 1: Window + *
  • Value: Transformed map containing a transform that removes the encapsulation of the + * window around each value, {@code Map> -> Map}. *
*/ static class ToMapDoFn - extends DoFn>>>>, - IsmRecord, - V>>>> { + extends DoFn< + KV>>>>, + IsmRecord, V>>>> { private final Coder windowCoder; + ToMapDoFn(Coder windowCoder) { this.windowCoder = windowCoder; } @ProcessElement - public void processElement(ProcessContext c) - throws Exception { + public void processElement(ProcessContext c) throws Exception { Optional previousWindowStructuralValue = Optional.absent(); Optional previousWindow = Optional.absent(); Map> map = new HashMap<>(); @@ -165,12 +165,14 @@ public void processElement(ProcessContext c) } // Verify that the user isn't trying to insert the same key multiple times. - checkState(!map.containsKey(kv.getValue().getValue().getKey()), + checkState( + !map.containsKey(kv.getValue().getValue().getKey()), "Multiple values [%s, %s] found for single key [%s] within window [%s].", map.get(kv.getValue().getValue().getKey()), kv.getValue().getValue().getValue(), kv.getKey()); - map.put(kv.getValue().getValue().getKey(), + map.put( + kv.getValue().getValue().getKey(), kv.getValue().withValue(kv.getValue().getValue().getValue())); previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); previousWindow = Optional.of(kv.getKey()); @@ -201,8 +203,7 @@ public PCollection expand(PCollection> input) { return this.applyInternal(input); } - private PCollection - applyInternal(PCollection> input) { + private PCollection applyInternal(PCollection> input) { try { return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */); } catch (NonDeterministicException e) { @@ -220,11 +221,10 @@ protected String getKindString() { } /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */ - private PCollection - applyForSingletonFallback(PCollection> input) { + private PCollection applyForSingletonFallback( + PCollection> input) { @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); @SuppressWarnings({"rawtypes", "unchecked"}) KvCoder inputCoder = (KvCoder) input.getCoder(); @@ -246,54 +246,57 @@ protected String getKindString() { } /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the - * Dataflow runner in batch mode. + * Specialized implementation for {@link org.apache.beam.sdk.transforms.View.AsMultimap + * View.AsMultimap} for the Dataflow runner in batch mode. + * + *

Creates a set of {@code Ism} files sharded by the hash of the key's byte representation. + * Each record is structured as follows: * - *

Creates a set of {@code Ism} files sharded by the hash of the key's byte - * representation. Each record is structured as follows: *

    - *
  • Key 1: User key K
  • - *
  • Key 2: Window
  • - *
  • Key 3: Index offset for a given key and window.
  • - *
  • Value: Windowed value
  • + *
  • Key 1: User key K + *
  • Key 2: Window + *
  • Key 3: Index offset for a given key and window. + *
  • Value: Windowed value *
* *

Alongside the data records, there are the following metadata records: + * *

    - *
  • Key 1: Metadata Key
  • - *
  • Key 2: Window
  • - *
  • Key 3: Index [0, size of map]
  • - *
  • Value: variable length long byte representation of size of map if index is 0, - * otherwise the byte representation of a key
  • + *
  • Key 1: Metadata Key + *
  • Key 2: Window + *
  • Key 3: Index [0, size of map] + *
  • Value: variable length long byte representation of size of map if index is 0, otherwise + * the byte representation of a key *
- * The {@code [META, Window, 0]} record stores the number of unique keys per window, while - * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. - * This allows for one to access the size of the map by looking at {@code [META, Window, 0]} - * and iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in - * {@code [1, size of map]}. * - *

Note that in the case of a non-deterministic key coder, we fallback to using - * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} printing - * a warning to users to specify a deterministic key coder. + *

The {@code [META, Window, 0]} record stores the number of unique keys per window, while + * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. + * This allows for one to access the size of the map by looking at {@code [META, Window, 0]} and + * iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in {@code [1, + * size of map]}. + * + *

Note that in the case of a non-deterministic key coder, we fallback to using {@link + * org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} printing a warning to users + * to specify a deterministic key coder. */ - static class BatchViewAsMultimap - extends PTransform>, PCollection> { + static class BatchViewAsMultimap extends PTransform>, PCollection> { /** - * A {@link PTransform} that groups elements by the hash of window's byte representation - * if the input {@link PCollection} is not within the global window. Otherwise by the hash - * of the window and key's byte representation. This {@link PTransform} also sorts - * the values by the combination of the window and key's byte representations. + * A {@link PTransform} that groups elements by the hash of window's byte representation if the + * input {@link PCollection} is not within the global window. Otherwise by the hash of the + * window and key's byte representation. This {@link PTransform} also sorts the values by the + * combination of the window and key's byte representations. */ private static class GroupByKeyHashAndSortByKeyAndWindow - extends PTransform>, - PCollection, WindowedValue>>>>> { + extends PTransform< + PCollection>, + PCollection, WindowedValue>>>>> { @SystemDoFnInternal private static class GroupByKeyHashAndSortByKeyAndWindowDoFn extends DoFn, KV, WindowedValue>>> { private final IsmRecordCoder coder; + private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder coder) { this.coder = coder; } @@ -304,38 +307,38 @@ public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws W window = (W) untypedWindow; c.output( - KV.of(coder.hash(ImmutableList.of(c.element().getKey())), - KV.of(KV.of(c.element().getKey(), window), + KV.of( + coder.hash(ImmutableList.of(c.element().getKey())), + KV.of( + KV.of(c.element().getKey(), window), WindowedValue.of( - c.element().getValue(), - c.timestamp(), - untypedWindow, - c.pane())))); + c.element().getValue(), c.timestamp(), untypedWindow, c.pane())))); } } private final IsmRecordCoder coder; + public GroupByKeyHashAndSortByKeyAndWindow(IsmRecordCoder coder) { this.coder = coder; } @Override - public PCollection, WindowedValue>>>> - expand(PCollection> input) { + public PCollection, WindowedValue>>>> expand( + PCollection> input) { @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); @SuppressWarnings("unchecked") KvCoder inputCoder = (KvCoder) input.getCoder(); PCollection, WindowedValue>>> keyedByHash; - keyedByHash = input.apply( - ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn(coder))); + keyedByHash = + input.apply(ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn(coder))); keyedByHash.setCoder( KvCoder.of( VarIntCoder.of(), - KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), windowCoder), + KvCoder.of( + KvCoder.of(inputCoder.getKeyCoder(), windowCoder), FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder)))); return keyedByHash.apply(new GroupByKeyAndSortValuesOnly<>()); @@ -343,13 +346,14 @@ public GroupByKeyHashAndSortByKeyAndWindow(IsmRecordCoder coder) { } /** - * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows - * and keys to locate window and key boundaries. The main output {@link IsmRecord}s have: + * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows and + * keys to locate window and key boundaries. The main output {@link IsmRecord}s have: + * *

    - *
  • Key 1: Window
  • - *
  • Key 2: User key K
  • - *
  • Key 3: Index offset for a given key and window.
  • - *
  • Value: Windowed value
  • + *
  • Key 1: Window + *
  • Key 2: User key K + *
  • Key 3: Index offset for a given key and window. + *
  • Value: Windowed value *
* *

Additionally, we output all the unique keys per window seen to {@code outputForEntrySet} @@ -359,8 +363,8 @@ public GroupByKeyHashAndSortByKeyAndWindow(IsmRecordCoder coder) { * throw an {@link IllegalStateException} if more than one key per window is found. */ static class ToIsmRecordForMapLikeDoFn - extends DoFn, WindowedValue>>>, - IsmRecord>> { + extends DoFn< + KV, WindowedValue>>>, IsmRecord>> { private final TupleTag>> outputForSize; private final TupleTag>> outputForEntrySet; @@ -368,6 +372,7 @@ static class ToIsmRecordForMapLikeDoFn private final Coder keyCoder; private final IsmRecordCoder> ismCoder; private final boolean uniqueKeysExpected; + ToIsmRecordForMapLikeDoFn( TupleTag>> outputForSize, TupleTag>> outputForEntrySet, @@ -391,15 +396,13 @@ public void processElement(ProcessContext c) throws Exception { Iterator, WindowedValue>> iterator = c.element().getValue().iterator(); KV, WindowedValue> currentValue = iterator.next(); - Object currentKeyStructuralValue = - keyCoder.structuralValue(currentValue.getKey().getKey()); + Object currentKeyStructuralValue = keyCoder.structuralValue(currentValue.getKey().getKey()); Object currentWindowStructuralValue = windowCoder.structuralValue(currentValue.getKey().getValue()); while (iterator.hasNext()) { KV, WindowedValue> nextValue = iterator.next(); - Object nextKeyStructuralValue = - keyCoder.structuralValue(nextValue.getKey().getKey()); + Object nextKeyStructuralValue = keyCoder.structuralValue(nextValue.getKey().getKey()); Object nextWindowStructuralValue = windowCoder.structuralValue(nextValue.getKey().getValue()); @@ -418,7 +421,7 @@ public void processElement(ProcessContext c) throws Exception { nextKeyIndex = 0; nextUniqueKeyCounter = 1; - } else if (!currentKeyStructuralValue.equals(nextKeyStructuralValue)){ + } else if (!currentKeyStructuralValue.equals(nextKeyStructuralValue)) { // It is a new key within the same window so output the key for the entry set, // reset the key index and increase the count of unique keys seen within this window. outputMetadataRecordForEntrySet(c, currentValue); @@ -432,12 +435,13 @@ public void processElement(ProcessContext c) throws Exception { nextKeyIndex = currentKeyIndex + 1; nextUniqueKeyCounter = currentUniqueKeyCounter; } else { - throw new IllegalStateException(String.format( - "Unique keys are expected but found key %s with values %s and %s in window %s.", - currentValue.getKey().getKey(), - currentValue.getValue().getValue(), - nextValue.getValue().getValue(), - currentValue.getKey().getValue())); + throw new IllegalStateException( + String.format( + "Unique keys are expected but found key %s with values %s and %s in window %s.", + currentValue.getKey().getKey(), + currentValue.getValue().getValue(), + nextValue.getValue().getValue(), + currentValue.getKey().getValue())); } currentValue = nextValue; @@ -457,12 +461,10 @@ public void processElement(ProcessContext c) throws Exception { /** This outputs the data record. */ private void outputDataRecord( ProcessContext c, KV, WindowedValue> value, long keyIndex) { - IsmRecord> ismRecord = IsmRecord.of( - ImmutableList.of( - value.getKey().getKey(), - value.getKey().getValue(), - keyIndex), - value.getValue()); + IsmRecord> ismRecord = + IsmRecord.of( + ImmutableList.of(value.getKey().getKey(), value.getKey().getValue(), keyIndex), + value.getValue()); c.output(ismRecord); } @@ -471,37 +473,43 @@ private void outputDataRecord( */ private void outputMetadataRecordForSize( ProcessContext c, KV, WindowedValue> value, long uniqueKeyCount) { - c.output(outputForSize, - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), - value.getKey().getValue())), + c.output( + outputForSize, + KV.of( + ismCoder.hash( + ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), uniqueKeyCount))); } /** This outputs records which will be used to construct the entry set. */ private void outputMetadataRecordForEntrySet( ProcessContext c, KV, WindowedValue> value) { - c.output(outputForEntrySet, - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), - value.getKey().getValue())), + c.output( + outputForEntrySet, + KV.of( + ismCoder.hash( + ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), value.getKey().getKey()))); } } /** * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of: + * *

    - *
  • Key 1: META key
  • - *
  • Key 2: window
  • - *
  • Key 3: 0L (constant)
  • - *
  • Value: sum of values for window
  • + *
  • Key 1: META key + *
  • Key 2: window + *
  • Key 3: 0L (constant) + *
  • Value: sum of values for window *
* - *

This {@link DoFn} is meant to be used to compute the number of unique keys - * per window for map and multimap side inputs. + *

This {@link DoFn} is meant to be used to compute the number of unique keys per window for + * map and multimap side inputs. */ static class ToIsmMetadataRecordForSizeDoFn extends DoFn>>, IsmRecord>> { private final Coder windowCoder; + ToIsmMetadataRecordForSizeDoFn(Coder windowCoder) { this.windowCoder = windowCoder; } @@ -540,21 +548,23 @@ public void processElement(ProcessContext c) throws Exception { /** * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of: + * *

    - *
  • Key 1: META key
  • - *
  • Key 2: window
  • - *
  • Key 3: index offset (1-based index)
  • - *
  • Value: key
  • + *
  • Key 1: META key + *
  • Key 2: window + *
  • Key 3: index offset (1-based index) + *
  • Value: key *
* - *

This {@link DoFn} is meant to be used to output index to key records - * per window for map and multimap side inputs. + *

This {@link DoFn} is meant to be used to output index to key records per window for map + * and multimap side inputs. */ static class ToIsmMetadataRecordForKeyDoFn extends DoFn>>, IsmRecord>> { private final Coder keyCoder; private final Coder windowCoder; + ToIsmMetadataRecordForKeyDoFn(Coder keyCoder, Coder windowCoder) { this.keyCoder = keyCoder; this.windowCoder = windowCoder; @@ -595,34 +605,33 @@ public void processElement(ProcessContext c) throws Exception { } /** - * A {@link DoFn} which partitions sets of elements by window boundaries. Within each - * partition, the set of elements is transformed into a {@link TransformedMap}. - * The transformed {@code Map>} is backed by a - * {@code Map>>} and contains a function - * {@code Iterable> -> Iterable}. + * A {@link DoFn} which partitions sets of elements by window boundaries. Within each partition, + * the set of elements is transformed into a {@link TransformedMap}. The transformed {@code + * Map>} is backed by a {@code Map>>} and contains a + * function {@code Iterable> -> Iterable}. * *

Outputs {@link IsmRecord}s having: + * *

    - *
  • Key 1: Window
  • - *
  • Value: Transformed map containing a transform that removes the encapsulation - * of the window around each value, - * {@code Map>> -> Map>}.
  • + *
  • Key 1: Window + *
  • Value: Transformed map containing a transform that removes the encapsulation of the + * window around each value, {@code Map>> -> Map>}. *
*/ static class ToMultimapDoFn - extends DoFn>>>>, - IsmRecord>, - Iterable>>>> { + extends DoFn< + KV>>>>, + IsmRecord>, Iterable>>>> { private final Coder windowCoder; + ToMultimapDoFn(Coder windowCoder) { this.windowCoder = windowCoder; } @ProcessElement - public void processElement(ProcessContext c) - throws Exception { + public void processElement(ProcessContext c) throws Exception { Optional previousWindowStructuralValue = Optional.absent(); Optional previousWindow = Optional.absent(); Multimap> multimap = HashMultimap.create(); @@ -643,7 +652,8 @@ public void processElement(ProcessContext c) multimap = HashMultimap.create(); } - multimap.put(kv.getValue().getValue().getKey(), + multimap.put( + kv.getValue().getValue().getKey(), kv.getValue().withValue(kv.getValue().getValue().getValue())); previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); previousWindow = Optional.of(kv.getKey()); @@ -677,8 +687,7 @@ public PCollection expand(PCollection> input) { return this.applyInternal(input); } - private PCollection - applyInternal(PCollection> input) { + private PCollection applyInternal(PCollection> input) { try { return applyForMapLike(runner, input, view, false /* unique keys not expected */); } catch (NonDeterministicException e) { @@ -691,11 +700,10 @@ public PCollection expand(PCollection> input) { } /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */ - private PCollection - applyForSingletonFallback(PCollection> input) { + private PCollection applyForSingletonFallback( + PCollection> input) { @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); @SuppressWarnings({"rawtypes", "unchecked"}) KvCoder inputCoder = (KvCoder) input.getCoder(); @@ -725,11 +733,11 @@ private static PCollection applyForMap DataflowRunner runner, PCollection> input, PCollectionView view, - boolean uniqueKeysExpected) throws NonDeterministicException { + boolean uniqueKeysExpected) + throws NonDeterministicException { @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); @SuppressWarnings({"rawtypes", "unchecked"}) KvCoder inputCoder = (KvCoder) input.getCoder(); @@ -776,8 +784,7 @@ private static PCollection applyForMap // for each window. PCollection>> outputForSize = outputTuple.get(outputForSizeTag); outputForSize.setCoder( - KvCoder.of(VarIntCoder.of(), - KvCoder.of(windowCoder, VarLongCoder.of()))); + KvCoder.of(VarIntCoder.of(), KvCoder.of(windowCoder, VarLongCoder.of()))); PCollection>> windowMapSizeMetadata = outputForSize .apply("GBKaSVForSize", new GroupByKeyAndSortValuesOnly<>()) @@ -786,11 +793,9 @@ private static PCollection applyForMap // Set the coder on the metadata output destined to build the entry set and process the // entries producing a [META, Window, Index] record per window key pair storing the key. - PCollection>> outputForEntrySet = - outputTuple.get(outputForEntrySetTag); + PCollection>> outputForEntrySet = outputTuple.get(outputForEntrySetTag); outputForEntrySet.setCoder( - KvCoder.of(VarIntCoder.of(), - KvCoder.of(windowCoder, inputCoder.getKeyCoder()))); + KvCoder.of(VarIntCoder.of(), KvCoder.of(windowCoder, inputCoder.getKeyCoder()))); PCollection>> windowMapKeysMetadata = outputForEntrySet .apply("GBKaSVForKeys", new GroupByKeyAndSortValuesOnly<>()) @@ -806,8 +811,9 @@ private static PCollection applyForMap runner.addPCollectionRequiringIndexedFormat(windowMapKeysMetadata); PCollectionList>> outputs = - PCollectionList.of(ImmutableList.of( - perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata)); + PCollectionList.of( + ImmutableList.of( + perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata)); PCollection>> flattenedOutputs = Pipeline.applyTransform(outputs, Flatten.pCollections()); @@ -828,41 +834,38 @@ static IsmRecordCoder> coderForMapLike( 1, // We use only the key for hashing when producing value records 2, // Since the key is not present, we add the window to the hash when // producing metadata records - ImmutableList.of( - MetadataKeyCoder.of(keyCoder), - windowCoder, - BigEndianLongCoder.of()), + ImmutableList.of(MetadataKeyCoder.of(keyCoder), windowCoder, BigEndianLongCoder.of()), FullWindowedValueCoder.of(valueCoder, windowCoder)); } } /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} for the - * Dataflow runner in batch mode. + * Specialized implementation for {@link org.apache.beam.sdk.transforms.View.AsSingleton + * View.AsSingleton} for the Dataflow runner in batch mode. + * + *

Creates a set of files in the {@link IsmFormat} sharded by the hash of the windows byte + * representation and with records having: * - *

Creates a set of files in the {@link IsmFormat} sharded by the hash of the windows - * byte representation and with records having: *

    - *
  • Key 1: Window
  • - *
  • Value: Windowed value
  • + *
  • Key 1: Window + *
  • Value: Windowed value *
*/ - static class BatchViewAsSingleton - extends PTransform, PCollection> { + static class BatchViewAsSingleton extends PTransform, PCollection> { /** * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows: + * *
    *
  • Key 1: Window *
  • Value: Windowed value *
*/ static class IsmRecordForSingularValuePerWindowDoFn - extends DoFn>>>, - IsmRecord>> { + extends DoFn>>>, IsmRecord>> { private final Coder windowCoder; + IsmRecordForSingularValuePerWindowDoFn(Coder windowCoder) { this.windowCoder = windowCoder; } @@ -913,8 +916,8 @@ public BatchViewAsSingleton( public PCollection expand(PCollection input) { input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout)); @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = + (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); return BatchViewAsSingleton.applyForSingleton( runner, @@ -924,25 +927,23 @@ public PCollection expand(PCollection input) { view); } - static PCollection - applyForSingleton( + static PCollection applyForSingleton( DataflowRunner runner, PCollection input, - DoFn>>>, - IsmRecord>> doFn, + DoFn>>>, IsmRecord>> doFn, Coder defaultValueCoder, PCollectionView view) { @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); IsmRecordCoder> ismCoder = coderForSingleton(windowCoder, defaultValueCoder); - PCollection>> reifiedPerWindowAndSorted = input - .apply(new GroupByWindowHashAsKeyAndWindowAsSortKey(ismCoder)) - .apply(ParDo.of(doFn)); + PCollection>> reifiedPerWindowAndSorted = + input + .apply(new GroupByWindowHashAsKeyAndWindowAsSortKey(ismCoder)) + .apply(ParDo.of(doFn)); reifiedPerWindowAndSorted.setCoder(ismCoder); runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); @@ -966,34 +967,34 @@ static IsmRecordCoder> coderForSingleton( } /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the - * Dataflow runner in batch mode. + * Specialized implementation for {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} + * for the Dataflow runner in batch mode. * *

Creates a set of {@code Ism} files sharded by the hash of the window's byte representation * and with records having: + * *

    - *
  • Key 1: Window
  • - *
  • Key 2: Index offset within window
  • - *
  • Value: Windowed value
  • + *
  • Key 1: Window + *
  • Key 2: Index offset within window + *
  • Value: Windowed value *
*/ - static class BatchViewAsList - extends PTransform, PCollection> { + static class BatchViewAsList extends PTransform, PCollection> { /** * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the * global window. Each {@link IsmRecord} has + * *
    - *
  • Key 1: Global window
  • - *
  • Key 2: Index offset within window
  • - *
  • Value: Windowed value
  • + *
  • Key 1: Global window + *
  • Key 2: Index offset within window + *
  • Value: Windowed value *
*/ @SystemDoFnInternal - static class ToIsmRecordForGlobalWindowDoFn - extends DoFn>> { + static class ToIsmRecordForGlobalWindowDoFn extends DoFn>> { long indexInBundle; + @StartBundle public void startBundle() throws Exception { indexInBundle = 0; @@ -1001,32 +1002,30 @@ public void startBundle() throws Exception { @ProcessElement public void processElement(ProcessContext c) throws Exception { - c.output(IsmRecord.of( - ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle), - WindowedValue.of( - c.element(), - c.timestamp(), - GlobalWindow.INSTANCE, - c.pane()))); + c.output( + IsmRecord.of( + ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle), + WindowedValue.of(c.element(), c.timestamp(), GlobalWindow.INSTANCE, c.pane()))); indexInBundle += 1; } } /** - * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows - * to locate the window boundaries. The {@link IsmRecord} has: + * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows to + * locate the window boundaries. The {@link IsmRecord} has: + * *
    - *
  • Key 1: Window
  • - *
  • Key 2: Index offset within window
  • - *
  • Value: Windowed value
  • + *
  • Key 1: Window + *
  • Key 2: Index offset within window + *
  • Value: Windowed value *
*/ @SystemDoFnInternal static class ToIsmRecordForNonGlobalWindowDoFn - extends DoFn>>>, - IsmRecord>> { + extends DoFn>>>, IsmRecord>> { private final Coder windowCoder; + ToIsmRecordForNonGlobalWindowDoFn(Coder windowCoder) { this.windowCoder = windowCoder; } @@ -1043,9 +1042,8 @@ public void processElement(ProcessContext c) throws Exception { // Reset i since we have a new window. elementsInWindow = 0; } - c.output(IsmRecord.of( - ImmutableList.of(value.getKey(), elementsInWindow), - value.getValue())); + c.output( + IsmRecord.of(ImmutableList.of(value.getKey(), elementsInWindow), value.getValue())); previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); elementsInWindow += 1; } @@ -1054,9 +1052,7 @@ public void processElement(ProcessContext c) throws Exception { private final DataflowRunner runner; private final PCollectionView> view; - /** - * Builds an instance of this class from the overridden transform. - */ + /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() public BatchViewAsList(DataflowRunner runner, CreatePCollectionView> transform) { this.runner = runner; @@ -1069,13 +1065,10 @@ public PCollection expand(PCollection input) { } static PCollection applyForIterableLike( - DataflowRunner runner, - PCollection input, - PCollectionView view) { + DataflowRunner runner, PCollection input, PCollectionView view) { @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); IsmRecordCoder> ismCoder = coderForListLike(windowCoder, input.getCoder()); @@ -1123,20 +1116,19 @@ static IsmRecordCoder> coderForListLike( } /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the - * Dataflow runner in batch mode. + * Specialized implementation for {@link org.apache.beam.sdk.transforms.View.AsIterable + * View.AsIterable} for the Dataflow runner in batch mode. * *

Creates a set of {@code Ism} files sharded by the hash of the windows byte representation * and with records having: + * *

    - *
  • Key 1: Window
  • - *
  • Key 2: Index offset within window
  • - *
  • Value: Windowed value
  • + *
  • Key 1: Window + *
  • Key 2: Index offset within window + *
  • Value: Windowed value *
*/ - static class BatchViewAsIterable - extends PTransform, PCollection> { + static class BatchViewAsIterable extends PTransform, PCollection> { private final DataflowRunner runner; private final PCollectionView> view; @@ -1154,12 +1146,9 @@ public PCollection expand(PCollection input) { } } - - /** - * A {@link Function} which converts {@code WindowedValue} to {@code V}. - */ - private static class WindowedValueToValue implements - Function, V>, Serializable { + /** A {@link Function} which converts {@code WindowedValue} to {@code V}. */ + private static class WindowedValueToValue + implements Function, V>, Serializable { private static final WindowedValueToValue INSTANCE = new WindowedValueToValue<>(); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -1176,8 +1165,8 @@ public V apply(WindowedValue input) { /** * A {@link Function} which converts {@code Iterable>} to {@code Iterable}. */ - private static class IterableWithWindowedValuesToIterable implements - Function>, Iterable>, Serializable { + private static class IterableWithWindowedValuesToIterable + implements Function>, Iterable>, Serializable { private static final IterableWithWindowedValuesToIterable INSTANCE = new IterableWithWindowedValuesToIterable<>(); @@ -1193,22 +1182,24 @@ public Iterable apply(Iterable> input) { } /** - * A {@link PTransform} that groups the values by a hash of the window's byte representation - * and sorts the values using the windows byte representation. + * A {@link PTransform} that groups the values by a hash of the window's byte representation and + * sorts the values using the windows byte representation. */ - private static class GroupByWindowHashAsKeyAndWindowAsSortKey extends - PTransform, PCollection>>>>> { + private static class GroupByWindowHashAsKeyAndWindowAsSortKey + extends PTransform< + PCollection, PCollection>>>>> { /** - * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for - * grouping by the hash of the window's byte representation and sorting the grouped values - * using the window's byte representation. + * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for grouping by + * the hash of the window's byte representation and sorting the grouped values using the + * window's byte representation. */ @SystemDoFnInternal private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn extends DoFn>>> { private final IsmRecordCoder ismCoderForHash; + private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder ismCoderForHash) { this.ismCoderForHash = ismCoderForHash; } @@ -1218,17 +1209,14 @@ public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws @SuppressWarnings("unchecked") W window = (W) untypedWindow; c.output( - KV.of(ismCoderForHash.hash(ImmutableList.of(window)), - KV.of(window, - WindowedValue.of( - c.element(), - c.timestamp(), - window, - c.pane())))); + KV.of( + ismCoderForHash.hash(ImmutableList.of(window)), + KV.of(window, WindowedValue.of(c.element(), c.timestamp(), window, c.pane())))); } } private final IsmRecordCoder ismCoderForHash; + private GroupByWindowHashAsKeyAndWindowAsSortKey(IsmRecordCoder ismCoderForHash) { this.ismCoderForHash = ismCoderForHash; } @@ -1237,33 +1225,29 @@ private GroupByWindowHashAsKeyAndWindowAsSortKey(IsmRecordCoder ismCoderForHa public PCollection>>>> expand( PCollection input) { @SuppressWarnings("unchecked") - Coder windowCoder = (Coder) - input.getWindowingStrategy().getWindowFn().windowCoder(); + Coder windowCoder = (Coder) input.getWindowingStrategy().getWindowFn().windowCoder(); PCollection>>> rval = - input.apply(ParDo.of( - new UseWindowHashAsKeyAndWindowAsSortKeyDoFn(ismCoderForHash))); + input.apply( + ParDo.of(new UseWindowHashAsKeyAndWindowAsSortKeyDoFn(ismCoderForHash))); rval.setCoder( KvCoder.of( VarIntCoder.of(), - KvCoder.of(windowCoder, - FullWindowedValueCoder.of(input.getCoder(), windowCoder)))); + KvCoder.of(windowCoder, FullWindowedValueCoder.of(input.getCoder(), windowCoder)))); return rval.apply(new GroupByKeyAndSortValuesOnly<>()); } } /** - * A {@link GroupByKey} transform for the {@link DataflowRunner} which sorts - * values using the secondary key {@code K2}. + * A {@link GroupByKey} transform for the {@link DataflowRunner} which sorts values using the + * secondary key {@code K2}. * - *

The {@link PCollection} created created by this {@link PTransform} will have values in - * the empty window. Care must be taken *afterwards* to either re-window - * (using {@link Window#into}) or only use {@link PTransform}s that do not depend on the - * values being within a window. + *

The {@link PCollection} created created by this {@link PTransform} will have values in the + * empty window. Care must be taken *afterwards* to either re-window (using {@link Window#into}) + * or only use {@link PTransform}s that do not depend on the values being within a window. */ static class GroupByKeyAndSortValuesOnly extends PTransform>>, PCollection>>>> { - GroupByKeyAndSortValuesOnly() { - } + GroupByKeyAndSortValuesOnly() {} @Override public PCollection>>> expand(PCollection>> input) { @@ -1277,13 +1261,11 @@ public PCollection>>> expand(PCollection} backed by a {@code Map} and a function that transforms - * {@code V1 -> V2}. + * A {@code Map} backed by a {@code Map} and a function that transforms {@code V1 -> + * V2}. */ - static class TransformedMap - extends ForwardingMap { + static class TransformedMap extends ForwardingMap { private final Function transform; private final Map originalMap; private final Map transformedMap; @@ -1300,11 +1282,8 @@ protected Map delegate() { } } - /** - * A {@link Coder} for {@link TransformedMap}s. - */ - static class TransformedMapCoder - extends StructuredCoder> { + /** A {@link Coder} for {@link TransformedMap}s. */ + static class TransformedMapCoder extends StructuredCoder> { private final Coder> transformCoder; private final Coder> originalMapCoder; @@ -1330,8 +1309,7 @@ public void encode(TransformedMap value, OutputStream outStream) public TransformedMap decode(InputStream inStream) throws CoderException, IOException { return new TransformedMap<>( - transformCoder.decode(inStream), - originalMapCoder.decode(inStream)); + transformCoder.decode(inStream), originalMapCoder.decode(inStream)); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 66ffcc2a534f..820a68b7a904 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -50,15 +50,9 @@ */ public abstract class WindowedValue { - /** - * Returns a {@code WindowedValue} with the given value, timestamp, - * and windows. - */ + /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ public static WindowedValue of( - T value, - Instant timestamp, - Collection windows, - PaneInfo pane) { + T value, Instant timestamp, Collection windows, PaneInfo pane) { checkNotNull(pane); if (windows.size() == 0 && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { @@ -70,14 +64,9 @@ public static WindowedValue of( } } - /** - * Returns a {@code WindowedValue} with the given value, timestamp, and window. - */ + /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( - T value, - Instant timestamp, - BoundedWindow window, - PaneInfo pane) { + T value, Instant timestamp, BoundedWindow window, PaneInfo pane) { checkNotNull(pane); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); @@ -107,8 +96,8 @@ public static WindowedValue valueInGlobalWindow(T value, PaneInfo pane) { } /** - * Returns a {@code WindowedValue} with the given value and timestamp, - * {@code GlobalWindow} and default pane. + * Returns a {@code WindowedValue} with the given value and timestamp, {@code GlobalWindow} and + * default pane. */ public static WindowedValue timestampedValueInGlobalWindow(T value, Instant timestamp) { if (BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { @@ -148,24 +137,16 @@ public static WindowedValue valueInEmptyWindows(T value, PaneInfo pane) { */ public abstract WindowedValue withValue(NewT value); - /** - * Returns the value of this {@code WindowedValue}. - */ + /** Returns the value of this {@code WindowedValue}. */ public abstract T getValue(); - /** - * Returns the timestamp of this {@code WindowedValue}. - */ + /** Returns the timestamp of this {@code WindowedValue}. */ public abstract Instant getTimestamp(); - /** - * Returns the windows of this {@code WindowedValue}. - */ + /** Returns the windows of this {@code WindowedValue}. */ public abstract Collection getWindows(); - /** - * Returns the pane of this {@code WindowedValue} in its window. - */ + /** Returns the pane of this {@code WindowedValue} in its window. */ public abstract PaneInfo getPane(); /** @@ -210,8 +191,8 @@ public int hashCode() { Collections.singletonList(GlobalWindow.INSTANCE); /** - * An abstract superclass for implementations of {@link WindowedValue} that stores the value - * and pane info. + * An abstract superclass for implementations of {@link WindowedValue} that stores the value and + * pane info. */ private abstract static class SimpleWindowedValue extends WindowedValue { private final T value; @@ -226,18 +207,15 @@ protected SimpleWindowedValue(T value, PaneInfo pane) { public PaneInfo getPane() { return pane; } + @Override public T getValue() { return value; } } - /** - * The abstract superclass of WindowedValue representations where - * timestamp == MIN. - */ - private abstract static class MinTimestampWindowedValue - extends SimpleWindowedValue { + /** The abstract superclass of WindowedValue representations where timestamp == MIN. */ + private abstract static class MinTimestampWindowedValue extends SimpleWindowedValue { public MinTimestampWindowedValue(T value, PaneInfo pane) { super(value, pane); } @@ -248,12 +226,8 @@ public Instant getTimestamp() { } } - /** - * The representation of a WindowedValue where timestamp == MIN and - * windows == {GlobalWindow}. - */ - private static class ValueInGlobalWindow - extends MinTimestampWindowedValue { + /** The representation of a WindowedValue where timestamp == MIN and windows == {GlobalWindow}. */ + private static class ValueInGlobalWindow extends MinTimestampWindowedValue { public ValueInGlobalWindow(T value, PaneInfo pane) { super(value, pane); } @@ -340,17 +314,11 @@ public String toString() { } } - /** - * The abstract superclass of WindowedValue representations where - * timestamp is arbitrary. - */ - private abstract static class TimestampedWindowedValue - extends SimpleWindowedValue { + /** The abstract superclass of WindowedValue representations where timestamp is arbitrary. */ + private abstract static class TimestampedWindowedValue extends SimpleWindowedValue { private final Instant timestamp; - public TimestampedWindowedValue(T value, - Instant timestamp, - PaneInfo pane) { + public TimestampedWindowedValue(T value, Instant timestamp, PaneInfo pane) { super(value, pane); this.timestamp = checkNotNull(timestamp); } @@ -362,14 +330,11 @@ public Instant getTimestamp() { } /** - * The representation of a WindowedValue where timestamp {@code >} - * MIN and windows == {GlobalWindow}. + * The representation of a WindowedValue where timestamp {@code >} MIN and windows == + * {GlobalWindow}. */ - private static class TimestampedValueInGlobalWindow - extends TimestampedWindowedValue { - public TimestampedValueInGlobalWindow(T value, - Instant timestamp, - PaneInfo pane) { + private static class TimestampedValueInGlobalWindow extends TimestampedWindowedValue { + public TimestampedValueInGlobalWindow(T value, Instant timestamp, PaneInfo pane) { super(value, timestamp, pane); } @@ -386,8 +351,7 @@ public Collection getWindows() { @Override public boolean equals(Object o) { if (o instanceof TimestampedValueInGlobalWindow) { - TimestampedValueInGlobalWindow that = - (TimestampedValueInGlobalWindow) o; + TimestampedValueInGlobalWindow that = (TimestampedValueInGlobalWindow) o; // Compare timestamps first as they are most likely to differ. // Also compare timestamps according to millis-since-epoch because otherwise expensive // comparisons are made on their Chronology objects. @@ -416,17 +380,14 @@ public String toString() { } /** - * The representation of a WindowedValue where timestamp is arbitrary and - * windows == a single non-Global window. + * The representation of a WindowedValue where timestamp is arbitrary and windows == a single + * non-Global window. */ - private static class TimestampedValueInSingleWindow - extends TimestampedWindowedValue { + private static class TimestampedValueInSingleWindow extends TimestampedWindowedValue { private final BoundedWindow window; - public TimestampedValueInSingleWindow(T value, - Instant timestamp, - BoundedWindow window, - PaneInfo pane) { + public TimestampedValueInSingleWindow( + T value, Instant timestamp, BoundedWindow window, PaneInfo pane) { super(value, timestamp, pane); this.window = checkNotNull(window); } @@ -444,8 +405,7 @@ public Collection getWindows() { @Override public boolean equals(Object o) { if (o instanceof TimestampedValueInSingleWindow) { - TimestampedValueInSingleWindow that = - (TimestampedValueInSingleWindow) o; + TimestampedValueInSingleWindow that = (TimestampedValueInSingleWindow) o; // Compare timestamps first as they are most likely to differ. // Also compare timestamps according to millis-since-epoch because otherwise expensive // comparisons are made on their Chronology objects. @@ -475,19 +435,12 @@ public String toString() { } } - /** - * The representation of a WindowedValue, excluding the special - * cases captured above. - */ - private static class TimestampedValueInMultipleWindows - extends TimestampedWindowedValue { + /** The representation of a WindowedValue, excluding the special cases captured above. */ + private static class TimestampedValueInMultipleWindows extends TimestampedWindowedValue { private Collection windows; public TimestampedValueInMultipleWindows( - T value, - Instant timestamp, - Collection windows, - PaneInfo pane) { + T value, Instant timestamp, Collection windows, PaneInfo pane) { super(value, timestamp, pane); this.windows = checkNotNull(windows); } @@ -505,8 +458,7 @@ public Collection getWindows() { @Override public boolean equals(Object o) { if (o instanceof TimestampedValueInMultipleWindows) { - TimestampedValueInMultipleWindows that = - (TimestampedValueInMultipleWindows) o; + TimestampedValueInMultipleWindows that = (TimestampedValueInMultipleWindows) o; // Compare timestamps first as they are most likely to differ. // Also compare timestamps according to millis-since-epoch because otherwise expensive // comparisons are made on their Chronology objects. @@ -548,67 +500,54 @@ private void ensureWindowsAreASet() { } } - ///////////////////////////////////////////////////////////////////////////// /** - * Returns the {@code Coder} to use for a {@code WindowedValue}, - * using the given valueCoder and windowCoder. + * Returns the {@code Coder} to use for a {@code WindowedValue}, using the given valueCoder and + * windowCoder. */ public static FullWindowedValueCoder getFullCoder( - Coder valueCoder, - Coder windowCoder) { + Coder valueCoder, Coder windowCoder) { return FullWindowedValueCoder.of(valueCoder, windowCoder); } - /** - * Returns the {@code ValueOnlyCoder} from the given valueCoder. - */ + /** Returns the {@code ValueOnlyCoder} from the given valueCoder. */ public static ValueOnlyWindowedValueCoder getValueOnlyCoder(Coder valueCoder) { return ValueOnlyWindowedValueCoder.of(valueCoder); } - /** - * Abstract class for {@code WindowedValue} coder. - */ - public abstract static class WindowedValueCoder - extends StructuredCoder> { + /** Abstract class for {@code WindowedValue} coder. */ + public abstract static class WindowedValueCoder extends StructuredCoder> { final Coder valueCoder; WindowedValueCoder(Coder valueCoder) { this.valueCoder = checkNotNull(valueCoder); } - /** - * Returns the value coder. - */ + /** Returns the value coder. */ public Coder getValueCoder() { return valueCoder; } /** - * Returns a new {@code WindowedValueCoder} that is a copy of this one, - * but with a different value coder. + * Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different + * value coder. */ public abstract WindowedValueCoder withValueCoder(Coder valueCoder); } - /** - * Coder for {@code WindowedValue}. - */ + /** Coder for {@code WindowedValue}. */ public static class FullWindowedValueCoder extends WindowedValueCoder { private final Coder windowCoder; // Precompute and cache the coder for a list of windows. private final Coder> windowsCoder; public static FullWindowedValueCoder of( - Coder valueCoder, - Coder windowCoder) { + Coder valueCoder, Coder windowCoder) { return new FullWindowedValueCoder<>(valueCoder, windowCoder); } - FullWindowedValueCoder(Coder valueCoder, - Coder windowCoder) { + FullWindowedValueCoder(Coder valueCoder, Coder windowCoder) { super(valueCoder); this.windowCoder = checkNotNull(windowCoder); // It's not possible to statically type-check correct use of the @@ -642,9 +581,7 @@ public void encode(WindowedValue windowedElem, OutputStream outStream) } @Override - public void encode(WindowedValue windowedElem, - OutputStream outStream, - Context context) + public void encode(WindowedValue windowedElem, OutputStream outStream, Context context) throws CoderException, IOException { InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); windowsCoder.encode(windowedElem.getWindows(), outStream); @@ -661,8 +598,7 @@ public WindowedValue decode(InputStream inStream) throws CoderException, IOEx public WindowedValue decode(InputStream inStream, Context context) throws CoderException, IOException { Instant timestamp = InstantCoder.of().decode(inStream); - Collection windows = - windowsCoder.decode(inStream); + Collection windows = windowsCoder.decode(inStream); PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); return WindowedValue.of(value, timestamp, windows, pane); @@ -677,8 +613,8 @@ public void verifyDeterministic() throws NonDeterministicException { } @Override - public void registerByteSizeObserver(WindowedValue value, - ElementByteSizeObserver observer) throws Exception { + public void registerByteSizeObserver(WindowedValue value, ElementByteSizeObserver observer) + throws Exception { InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer); windowsCoder.registerByteSizeObserver(value.getWindows(), observer); PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer); @@ -688,8 +624,8 @@ public void registerByteSizeObserver(WindowedValue value, /** * {@inheritDoc}. * - * @return a singleton list containing the {@code valueCoder} of this - * {@link FullWindowedValueCoder}. + * @return a singleton list containing the {@code valueCoder} of this {@link + * FullWindowedValueCoder}. */ @Override public List> getCoderArguments() { @@ -707,12 +643,11 @@ public List> getComponents() { /** * Coder for {@code WindowedValue}. * - *

A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops - * timestamp and windows for encoding, and uses defaults timestamp, and windows for decoding. + *

A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops timestamp + * and windows for encoding, and uses defaults timestamp, and windows for decoding. */ public static class ValueOnlyWindowedValueCoder extends WindowedValueCoder { - public static ValueOnlyWindowedValueCoder of( - Coder valueCoder) { + public static ValueOnlyWindowedValueCoder of(Coder valueCoder) { return new ValueOnlyWindowedValueCoder<>(valueCoder); } @@ -752,14 +687,11 @@ public WindowedValue decode(InputStream inStream, Context context) @Override public void verifyDeterministic() throws NonDeterministicException { verifyDeterministic( - this, - "ValueOnlyWindowedValueCoder requires a deterministic valueCoder", - valueCoder); + this, "ValueOnlyWindowedValueCoder requires a deterministic valueCoder", valueCoder); } @Override - public void registerByteSizeObserver( - WindowedValue value, ElementByteSizeObserver observer) + public void registerByteSizeObserver(WindowedValue value, ElementByteSizeObserver observer) throws Exception { valueCoder.registerByteSizeObserver(value.getValue(), observer); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index b2bb818907df..97836a644d6c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -46,12 +46,14 @@ public class WindowedValueTest { @Test public void testWindowedValueCoder() throws CoderException { Instant timestamp = new Instant(1234); - WindowedValue value = WindowedValue.of( - "abc", - new Instant(1234), - Arrays.asList(new IntervalWindow(timestamp, timestamp.plus(1000)), - new IntervalWindow(timestamp.plus(1000), timestamp.plus(2000))), - PaneInfo.NO_FIRING); + WindowedValue value = + WindowedValue.of( + "abc", + new Instant(1234), + Arrays.asList( + new IntervalWindow(timestamp, timestamp.plus(1000)), + new IntervalWindow(timestamp.plus(1000), timestamp.plus(2000))), + PaneInfo.NO_FIRING); Coder> windowedValueCoder = WindowedValue.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()); @@ -67,8 +69,8 @@ public void testWindowedValueCoder() throws CoderException { @Test public void testFullWindowedValueCoderIsSerializableWithWellKnownCoderType() { - CoderProperties.coderSerializable(WindowedValue.getFullCoder( - GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE)); + CoderProperties.coderSerializable( + WindowedValue.getFullCoder(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE)); } @Test diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java index b59d173dd66b..6caeec84e487 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java @@ -50,13 +50,14 @@ * } * *

For extra spiciness our implementation differs slightly from the above: + * *

    - *
  • We select both the average winning price and the category. - *
  • We don't bother joining with a static category table, since it's contents are never used. - *
  • We only consider bids which are above the auction's reserve price. - *
  • We accept the highest-price, earliest valid bid as the winner. - *
  • We calculate the averages oven a sliding window of size {@code windowSizeSec} and - * period {@code windowPeriodSec}. + *
  • We select both the average winning price and the category. + *
  • We don't bother joining with a static category table, since it's contents are never used. + *
  • We only consider bids which are above the auction's reserve price. + *
  • We accept the highest-price, earliest valid bid as the winner. + *
  • We calculate the averages oven a sliding window of size {@code windowSizeSec} and period + * {@code windowPeriodSec}. *
*/ public class Query4 extends NexmarkQuery { @@ -74,8 +75,8 @@ private PCollection applyTyped(PCollection events) { .apply(new WinningBids(name + ".WinningBids", configuration)); // Monitor winning bids - winningBids = winningBids.apply(name + ".WinningBidsMonitor", - winningBidsMonitor.getTransform()); + winningBids = + winningBids.apply(name + ".WinningBidsMonitor", winningBidsMonitor.getTransform()); return winningBids // Key the winning bid price by the auction category. diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java index f7bb38656cf1..0cc09bf9bea3 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java @@ -40,8 +40,8 @@ import org.joda.time.Duration; /** - * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the - * last 10 closed auctions by the same seller. In CQL syntax: + * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the last 10 + * closed auctions by the same seller. In CQL syntax: * *
{@code
  * SELECT Istream(AVG(Q.final), Q.seller)
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
index 5f11e4e6a106..5bc758bca73f 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
@@ -25,8 +25,8 @@
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but
- * handy for testing. See {@link WinningBids} for the details.
+ * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but handy
+ * for testing. See {@link WinningBids} for the details.
  */
 public class Query9 extends NexmarkQuery {
   public Query9(NexmarkConfiguration configuration) {
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index 7ccdc951aa1e..bde7c6dbdc84 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -69,12 +69,11 @@
  * GROUP BY A.id
  * }
* - *

We will also check that the winning bid is above the auction reserve. Note that - * we ignore the auction opening bid value since it has no impact on which bid eventually wins, - * if any. + *

We will also check that the winning bid is above the auction reserve. Note that we ignore the + * auction opening bid value since it has no impact on which bid eventually wins, if any. * - *

Our implementation will use a custom windowing function in order to bring bids and - * auctions together without requiring global state. + *

Our implementation will use a custom windowing function in order to bring bids and auctions + * together without requiring global state. */ public class WinningBids extends PTransform, PCollection> { /** Windows for open auctions and bids. */ @@ -83,9 +82,9 @@ private static class AuctionOrBidWindow extends IntervalWindow { public final long auction; /** - * True if this window represents an actual auction, and thus has a start/end - * time matching that of the auction. False if this window represents a bid, and - * thus has an unbounded start/end time. + * True if this window represents an actual auction, and thus has a start/end time matching that + * of the auction. False if this window represents a bid, and thus has an unbounded start/end + * time. */ public final boolean isAuctionWindow; @@ -109,10 +108,9 @@ public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) } /** - * Return a bid window for {@code bid}. It should later be merged into - * the corresponding auction window. However, it is possible this bid is for an already - * expired auction, or for an auction which the system has not yet seen. So we - * give the bid a bit of wiggle room in its interval. + * Return a bid window for {@code bid}. It should later be merged into the corresponding auction + * window. However, it is possible this bid is for an already expired auction, or for an auction + * which the system has not yet seen. So we give the bid a bit of wiggle room in its interval. */ public static AuctionOrBidWindow forBid( long expectedAuctionDurationMs, Instant timestamp, Bid bid) { @@ -137,11 +135,13 @@ public boolean isAuctionWindow() { @Override public String toString() { - return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", + return String.format( + "AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", start(), end(), auction, isAuctionWindow); } - @Override public boolean equals(Object o) { + @Override + public boolean equals(Object o) { if (this == o) { return true; } @@ -155,14 +155,13 @@ public String toString() { return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction); } - @Override public int hashCode() { + @Override + public int hashCode() { return Objects.hash(super.hashCode(), isAuctionWindow, auction); } } - /** - * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. - */ + /** Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. */ private static class AuctionOrBidWindowCoder extends CustomCoder { private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder(); private static final Coder SUPER_CODER = IntervalWindow.getCoder(); @@ -183,8 +182,7 @@ public void encode(AuctionOrBidWindow window, OutputStream outStream) } @Override - public AuctionOrBidWindow decode(InputStream inStream) - throws IOException, CoderException { + public AuctionOrBidWindow decode(InputStream inStream) throws IOException, CoderException { IntervalWindow superWindow = SUPER_CODER.decode(inStream); long auction = ID_CODER.decode(inStream); boolean isAuctionWindow = INT_CODER.decode(inStream) != 0; @@ -192,7 +190,8 @@ public AuctionOrBidWindow decode(InputStream inStream) superWindow.start(), superWindow.end(), auction, isAuctionWindow); } - @Override public void verifyDeterministic() throws NonDeterministicException {} + @Override + public void verifyDeterministic() throws NonDeterministicException {} @Override public Object structuralValue(AuctionOrBidWindow value) { @@ -214,8 +213,8 @@ public Collection assignWindows(AssignContext c) { Event event = c.element(); if (event.newAuction != null) { // Assign auctions to an auction window which expires at the auction's close. - return Collections - .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); + return Collections.singletonList( + AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); } else if (event.bid != null) { // Assign bids to a temporary bid window which will later be merged into the appropriate // auction window. @@ -281,27 +280,25 @@ public WindowMappingFn getDefaultWindowMappingFn() { } /** - * Below we will GBK auctions and bids on their auction ids. Then we will reduce those - * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at - * least one valid bid. We would like those output pairs to have a timestamp of the auction's - * expiry (since that's the earliest we know for sure we have the correct winner). We would - * also like to make that winning results are available to following stages at the auction's - * expiry. + * Below we will GBK auctions and bids on their auction ids. Then we will reduce those per id to + * emit {@code (auction, winning bid)} pairs for auctions which have expired with at least one + * valid bid. We would like those output pairs to have a timestamp of the auction's expiry + * (since that's the earliest we know for sure we have the correct winner). We would also like + * to make that winning results are available to following stages at the auction's expiry. * *

Each result of the GBK will have a timestamp of the min of the result of this object's * assignOutputTime over all records which end up in one of its iterables. Thus we get the * desired behavior if we ignore each record's timestamp and always return the auction window's * 'maxTimestamp', which will correspond to the auction's expiry. * - *

In contrast, if this object's assignOutputTime were to return 'inputTimestamp' - * (the usual implementation), then each GBK record will take as its timestamp the minimum of - * the timestamps of all bids and auctions within it, which will always be the auction's - * timestamp. An auction which expires well into the future would thus hold up the watermark - * of the GBK results until that auction expired. That in turn would hold up all winning pairs. + *

In contrast, if this object's assignOutputTime were to return 'inputTimestamp' (the usual + * implementation), then each GBK record will take as its timestamp the minimum of the + * timestamps of all bids and auctions within it, which will always be the auction's timestamp. + * An auction which expires well into the future would thus hold up the watermark of the GBK + * results until that auction expired. That in turn would hold up all winning pairs. */ @Override - public Instant getOutputTime( - Instant inputTimestamp, AuctionOrBidWindow window) { + public Instant getOutputTime(Instant inputTimestamp, AuctionOrBidWindow window) { return window.maxTimestamp(); } } @@ -311,9 +308,10 @@ public Instant getOutputTime( public WinningBids(String name, NexmarkConfiguration configuration) { super(name); // What's the expected auction time (when the system is running at the lowest event rate). - long[] interEventDelayUs = configuration.rateShape.interEventDelayUs( - configuration.firstEventRate, configuration.nextEventRate, - configuration.rateUnit, configuration.numEventGenerators); + long[] interEventDelayUs = + configuration.rateShape.interEventDelayUs( + configuration.firstEventRate, configuration.nextEventRate, + configuration.rateUnit, configuration.numEventGenerators); long longestDelayUs = 0; for (long interEventDelayU : interEventDelayUs) { longestDelayUs = Math.max(longestDelayUs, interEventDelayU); @@ -321,7 +319,7 @@ public WinningBids(String name, NexmarkConfiguration configuration) { // Adjust for proportion of auction events amongst all events. longestDelayUs = (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR) - / GeneratorConfig.AUCTION_PROPORTION; + / GeneratorConfig.AUCTION_PROPORTION; // Adjust for number of in-flight auctions. longestDelayUs = longestDelayUs * configuration.numInFlightAuctions; long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000; @@ -338,8 +336,9 @@ public PCollection expand(PCollection events) { // Key auctions by their id. PCollection> auctionsById = - events.apply(NexmarkQuery.JUST_NEW_AUCTIONS) - .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID); + events + .apply(NexmarkQuery.JUST_NEW_AUCTIONS) + .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID); // Key bids by their auction id. PCollection> bidsByAuctionId = @@ -403,7 +402,8 @@ public int hashCode() { return Objects.hash(auctionOrBidWindowFn); } - @Override public boolean equals(Object o) { + @Override + public boolean equals(Object o) { if (this == o) { return true; } From 87f546b56728d8755908d0ff03063e05cd416c0c Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 16 Oct 2017 20:50:39 -0700 Subject: [PATCH 2/5] Fix empty window assignments in Nexmark --- .../sdk/nexmark/queries/AuctionOrBid.java | 29 +++++++++++++++++++ .../beam/sdk/nexmark/queries/Query4.java | 2 ++ .../beam/sdk/nexmark/queries/Query6.java | 2 ++ .../beam/sdk/nexmark/queries/Query9.java | 5 +++- .../beam/sdk/nexmark/queries/WinningBids.java | 6 ++-- 5 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java new file mode 100644 index 000000000000..2c8b12fd2fc1 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark.queries; + +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** A predicate to filter for only auctions and bids. */ +public class AuctionOrBid implements SerializableFunction { + @Override + public Boolean apply(Event input) { + return input.bid != null || input.newAuction != null; + } +} diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java index 6caeec84e487..d3b1e233b092 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.nexmark.model.Event; import org.apache.beam.sdk.nexmark.model.KnownSize; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Mean; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; @@ -71,6 +72,7 @@ public Query4(NexmarkConfiguration configuration) { private PCollection applyTyped(PCollection events) { PCollection winningBids = events + .apply(Filter.by(new AuctionOrBid())) // Find the winning bid for each closed auction. .apply(new WinningBids(name + ".WinningBids", configuration)); diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java index 0cc09bf9bea3..eeae79acc92a 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.nexmark.model.SellerPrice; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -113,6 +114,7 @@ public Query6(NexmarkConfiguration configuration) { private PCollection applyTyped(PCollection events) { return events + .apply(Filter.by(new AuctionOrBid())) // Find the winning bid for each closed auction. .apply(new WinningBids(name + ".WinningBids", configuration)) diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java index 5bc758bca73f..af0f514a5c0a 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.nexmark.model.AuctionBid; import org.apache.beam.sdk.nexmark.model.Event; import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.values.PCollection; /** @@ -34,7 +35,9 @@ public Query9(NexmarkConfiguration configuration) { } private PCollection applyTyped(PCollection events) { - return events.apply(new WinningBids(name, configuration)); + return events + .apply(Filter.by(new AuctionOrBid())) + .apply(new WinningBids(name, configuration)); } @Override diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java index bde7c6dbdc84..fea096be18ae 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java @@ -221,8 +221,10 @@ public Collection assignWindows(AssignContext c) { return Collections.singletonList( AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid)); } else { - // Don't assign people to any window. They will thus be dropped. - return Collections.emptyList(); + throw new IllegalArgumentException( + String.format( + "%s can only assign windows to auctions and bids, but received %s", + getClass().getSimpleName(), c.element())); } } From 168b32f31f16ef7c46fd2b88c0cfef148c0a20ee Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 27 Oct 2017 11:24:59 -0700 Subject: [PATCH 3/5] Fix empty window assignment in FlattenEvaluatorFactoryTest --- .../beam/runners/direct/FlattenEvaluatorFactoryTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 3c0012639ee3..2aa4ab1e923d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -77,9 +77,9 @@ public void testFlattenInMemoryEvaluator() throws Exception { rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1)); leftSideEvaluator.processElement( WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024))); - leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING)); + leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING)); rightSideEvaluator.processElement( - WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING)); + WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING)); rightSideEvaluator.processElement( WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096))); @@ -99,12 +99,12 @@ public void testFlattenInMemoryEvaluator() throws Exception { flattenedLeftBundle.commit(Instant.now()).getElements(), containsInAnyOrder( WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)), - WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING), + WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING), WindowedValue.valueInGlobalWindow(1))); assertThat( flattenedRightBundle.commit(Instant.now()).getElements(), containsInAnyOrder( - WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), + WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)), WindowedValue.valueInGlobalWindow(-1))); } From 93e7b65fc579fc5b6d5a38399c9223f30fba5061 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 27 Oct 2017 11:25:17 -0700 Subject: [PATCH 4/5] Switch DataflowRunner to its own private ValueInEmptyWindows --- .../runners/dataflow/BatchViewOverrides.java | 66 ++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 727707fa729d..87b3437ca849 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.dataflow; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows; import com.google.common.base.Function; +import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.collect.ForwardingMap; import com.google.common.collect.HashMultimap; @@ -34,11 +34,13 @@ import java.io.OutputStream; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.beam.runners.dataflow.internal.IsmFormat; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; @@ -66,6 +68,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.SystemDoFnInternal; @@ -80,6 +83,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Instant; /** * Dataflow batch overrides for {@link CreatePCollectionView}, specialized for different view types. @@ -1324,4 +1328,64 @@ public void verifyDeterministic() verifyDeterministic(this, "Expected map coder to be deterministic.", originalMapCoder); } } + + /** + * A hack to put a simple value (aka globally windowed) in a place where a WindowedValue is + * expected. + * + *

This is not actually valid for Beam elements, because values in no windows do not really + * exist and may be dropped at any time without further justification. + */ + private static WindowedValue valueInEmptyWindows(T value) { + return new ValueInEmptyWindows<>(value); + } + + private static class ValueInEmptyWindows extends WindowedValue { + + private final T value; + + private ValueInEmptyWindows(T value) { + this.value = value; + } + + @Override + public WindowedValue withValue(NewT value) { + return new ValueInEmptyWindows<>(value); + } + + @Override + public T getValue() { + return value; + } + + @Override + public Instant getTimestamp() { + return BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + @Override + public Collection getWindows() { + return Collections.emptyList(); + } + + @Override + public PaneInfo getPane() { + return PaneInfo.NO_FIRING; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()).add("value", getValue()).toString(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ValueInEmptyWindows) { + ValueInEmptyWindows that = (ValueInEmptyWindows) o; + return Objects.equals(that.getValue(), this.getValue()); + } else { + return super.equals(o); + } + } + } } From de9adea9a0d2cde4d732f9d9ac27700fbd908df6 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 27 Oct 2017 11:25:41 -0700 Subject: [PATCH 5/5] Remove deprecated valueInEmptyWindows --- .../apache/beam/sdk/util/WindowedValue.java | 77 +------------------ .../beam/sdk/util/WindowedValueTest.java | 15 ++-- 2 files changed, 12 insertions(+), 80 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 820a68b7a904..bfc30834c085 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.util; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects; @@ -54,10 +55,9 @@ public abstract class WindowedValue { public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo pane) { checkNotNull(pane); + checkArgument(windows.size() > 0); - if (windows.size() == 0 && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { - return valueInEmptyWindows(value, pane); - } else if (windows.size() == 1) { + if (windows.size() == 1) { return of(value, timestamp, windows.iterator().next(), pane); } else { return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); @@ -107,30 +107,6 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta } } - /** - * Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp - * and pane. - * - * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed to drop - * it at any point, and benign runner implementation details could cause silent data loss. - */ - @Deprecated - public static WindowedValue valueInEmptyWindows(T value) { - return new ValueInEmptyWindows<>(value, PaneInfo.NO_FIRING); - } - - /** - * Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp - * and the specified pane. - * - * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed to drop - * it at any point, and benign runner implementation details could cause silent data loss. - */ - @Deprecated - public static WindowedValue valueInEmptyWindows(T value, PaneInfo pane) { - return new ValueInEmptyWindows<>(value, pane); - } - /** * Returns a new {@code WindowedValue} that is a copy of this one, but with a different value, * which may have a new type {@code NewT}. @@ -267,53 +243,6 @@ public String toString() { } } - /** - * The representation of a WindowedValue where timestamp == MIN and windows == {}. - * - * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed to drop - * it at any point, and benign runner implementation details could cause silent data loss. - */ - @Deprecated - private static class ValueInEmptyWindows extends MinTimestampWindowedValue { - public ValueInEmptyWindows(T value, PaneInfo pane) { - super(value, pane); - } - - @Override - public WindowedValue withValue(NewT newValue) { - return new ValueInEmptyWindows<>(newValue, getPane()); - } - - @Override - public Collection getWindows() { - return Collections.emptyList(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof ValueInEmptyWindows) { - ValueInEmptyWindows that = (ValueInEmptyWindows) o; - return Objects.equals(that.getPane(), this.getPane()) - && Objects.equals(that.getValue(), this.getValue()); - } else { - return super.equals(o); - } - } - - @Override - public int hashCode() { - return Objects.hash(getValue(), getPane()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("value", getValue()) - .add("pane", getPane()) - .toString(); - } - } - /** The abstract superclass of WindowedValue representations where timestamp is arbitrary. */ private abstract static class TimestampedWindowedValue extends SimpleWindowedValue { private final Instant timestamp; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 97836a644d6c..6c2333863cdd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.util; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -36,13 +35,19 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.joda.time.Instant; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Test case for {@link WindowedValue}. */ @RunWith(JUnit4.class) public class WindowedValueTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testWindowedValueCoder() throws CoderException { Instant timestamp = new Instant(1234); @@ -79,11 +84,9 @@ public void testValueOnlyWindowedValueCoderIsSerializableWithWellKnownCoderType( } @Test - public void testExplodeWindowsInNoWindowsEmptyIterable() { - WindowedValue value = - WindowedValue.of("foo", Instant.now(), ImmutableList.of(), PaneInfo.NO_FIRING); - - assertThat(value.explodeWindows(), emptyIterable()); + public void testExplodeWindowsInNoWindowsCrash() { + thrown.expect(IllegalArgumentException.class); + WindowedValue.of("foo", Instant.now(), ImmutableList.of(), PaneInfo.NO_FIRING); } @Test