From b5b18acd6db7a655a394ac47d07d686bd5a5c5bf Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 21 Dec 2016 15:37:49 -0800 Subject: [PATCH 1/4] [BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting See [BEAM-469] for more information about why this is correct. --- .../beam/runners/core/KeyedWorkItemCoder.java | 4 +- .../core/UnboundedReadFromBoundedSource.java | 10 ++-- .../SingletonKeyedWorkItemCoder.java | 10 ++-- .../beam/runners/dataflow/DataflowRunner.java | 4 +- ...ataflowUnboundedReadFromBoundedSource.java | 10 ++-- .../runners/dataflow/internal/IsmFormat.java | 6 +-- .../beam/sdk/coders/BigDecimalCoder.java | 6 +-- .../org/apache/beam/sdk/coders/KvCoder.java | 22 ++++----- .../org/apache/beam/sdk/coders/MapCoder.java | 39 +++++++++++----- .../beam/sdk/io/PubsubUnboundedSink.java | 12 ++--- .../beam/sdk/testing/ValueInSingleWindow.java | 2 +- .../beam/sdk/transforms/CombineFns.java | 6 ++- .../org/apache/beam/sdk/transforms/Mean.java | 10 ++-- .../beam/sdk/transforms/join/CoGbkResult.java | 8 +++- .../transforms/windowing/IntervalWindow.java | 4 +- .../apache/beam/sdk/util/TimerInternals.java | 4 +- .../apache/beam/sdk/util/WindowedValue.java | 2 +- .../beam/sdk/coders/BigDecimalCoderTest.java | 46 +++++++++---------- .../apache/beam/sdk/coders/JAXBCoderTest.java | 4 +- .../apache/beam/sdk/coders/MapCoderTest.java | 2 +- .../beam/sdk/transforms/CombineTest.java | 6 +-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +- .../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +- .../sdk/io/kinesis/KinesisRecordCoder.java | 4 +- 24 files changed, 119 insertions(+), 110 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java index 95be04732333d..dfd6a8d4ad1a2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -92,7 +92,7 @@ public void encode(KeyedWorkItem value, OutputStream outStream, Coder. Coder.Context nestedContext = context.nested(); keyCoder.encode(value.key(), outStream, nestedContext); timersCoder.encode(value.timersIterable(), outStream, nestedContext); - elemsCoder.encode(value.elementsIterable(), outStream, nestedContext); + elemsCoder.encode(value.elementsIterable(), outStream, context); } @Override @@ -101,7 +101,7 @@ public KeyedWorkItem decode(InputStream inStream, Coder.Context contex Coder.Context nestedContext = context.nested(); K key = keyCoder.decode(inStream, nestedContext); Iterable timers = timersCoder.decode(inStream, nestedContext); - Iterable> elems = elemsCoder.decode(inStream, nestedContext); + Iterable> elems = elemsCoder.decode(inStream, context); return KeyedWorkItems.workItem(key, timers, elems); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index f3f93e1bccec2..be1793c947d7a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -235,19 +235,17 @@ public static CheckpointCoder of( @Override public void encode(Checkpoint value, OutputStream outStream, Context context) throws CoderException, IOException { - Context nested = context.nested(); - elemsCoder.encode(value.residualElements, outStream, nested); - sourceCoder.encode(value.residualSource, outStream, nested); + elemsCoder.encode(value.residualElements, outStream, context.nested()); + sourceCoder.encode(value.residualSource, outStream, context); } @SuppressWarnings("unchecked") @Override public Checkpoint decode(InputStream inStream, Context context) throws CoderException, IOException { - Context nested = context.nested(); return new Checkpoint<>( - elemsCoder.decode(inStream, nested), - sourceCoder.decode(inStream, nested)); + elemsCoder.decode(inStream, context.nested()), + sourceCoder.decode(inStream, context)); } @Override diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index ad306886b856e..d95ed7c624439 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -90,17 +90,15 @@ public void encode(SingletonKeyedWorkItem value, OutputStream outStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); - keyCoder.encode(value.key(), outStream, nestedContext); - valueCoder.encode(value.value, outStream, nestedContext); + keyCoder.encode(value.key(), outStream, context.nested()); + valueCoder.encode(value.value, outStream, context); } @Override public SingletonKeyedWorkItem decode(InputStream inStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); - K key = keyCoder.decode(inStream, nestedContext); - WindowedValue value = valueCoder.decode(inStream, nestedContext); + K key = keyCoder.decode(inStream, context.nested()); + WindowedValue value = valueCoder.decode(inStream, context); return new SingletonKeyedWorkItem<>(key, value); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 1a15eaf232053..29c0058c5e5a9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1984,7 +1984,7 @@ public static TransformedMapCoder of( public void encode(TransformedMap value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { transformCoder.encode(value.transform, outStream, context.nested()); - originalMapCoder.encode(value.originalMap, outStream, context.nested()); + originalMapCoder.encode(value.originalMap, outStream, context); } @Override @@ -1992,7 +1992,7 @@ public TransformedMap decode( InputStream inStream, Coder.Context context) throws CoderException, IOException { return new TransformedMap<>( transformCoder.decode(inStream, context.nested()), - originalMapCoder.decode(inStream, context.nested())); + originalMapCoder.decode(inStream, context)); } @Override diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java index e1eedd8ae7749..65db8171e2373 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java @@ -254,19 +254,17 @@ public static CheckpointCoder of( @Override public void encode(Checkpoint value, OutputStream outStream, Context context) throws CoderException, IOException { - Context nested = context.nested(); - elemsCoder.encode(value.residualElements, outStream, nested); - sourceCoder.encode(value.residualSource, outStream, nested); + elemsCoder.encode(value.residualElements, outStream, context.nested()); + sourceCoder.encode(value.residualSource, outStream, context); } @SuppressWarnings("unchecked") @Override public Checkpoint decode(InputStream inStream, Context context) throws CoderException, IOException { - Context nested = context.nested(); return new Checkpoint<>( - elemsCoder.decode(inStream, nested), - sourceCoder.decode(inStream, nested)); + elemsCoder.decode(inStream, context.nested()), + sourceCoder.decode(inStream, context)); } @Override diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 2f83ffda276a6..6a244b0f29937 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -647,15 +647,15 @@ public void encode(IsmShard value, OutputStream outStream, Coder.Context context value); VarIntCoder.of().encode(value.getId(), outStream, context.nested()); VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested()); - VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested()); + VarLongCoder.of().encode(value.getIndexOffset(), outStream, context); } @Override public IsmShard decode( InputStream inStream, Coder.Context context) throws CoderException, IOException { return IsmShard.of( - VarIntCoder.of().decode(inStream, context), - VarLongCoder.of().decode(inStream, context), + VarIntCoder.of().decode(inStream, context.nested()), + VarLongCoder.of().decode(inStream, context.nested()), VarLongCoder.of().decode(inStream, context)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java index e2628821c96d2..36c8265b667a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java @@ -54,14 +54,14 @@ public void encode(BigDecimal value, OutputStream outStream, Context context) throws IOException, CoderException { checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); integerCoder.encode(value.scale(), outStream, context.nested()); - bigIntegerCoder.encode(value.unscaledValue(), outStream, context.nested()); + bigIntegerCoder.encode(value.unscaledValue(), outStream, context); } @Override public BigDecimal decode(InputStream inStream, Context context) throws IOException, CoderException { int scale = integerCoder.decode(inStream, context.nested()); - BigInteger bigInteger = bigIntegerCoder.decode(inStream, context.nested()); + BigInteger bigInteger = bigIntegerCoder.decode(inStream, context); return new BigDecimal(bigInteger, scale); } @@ -96,6 +96,6 @@ public boolean isRegisterByteSizeObserverCheap(BigDecimal value, Context context protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception { checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); return integerCoder.getEncodedElementByteSize(value.scale(), context.nested()) - + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context.nested()); + + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index ad13226b05c43..c0d3aa46ffaf9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -83,17 +83,15 @@ public void encode(KV kv, OutputStream outStream, Context context) if (kv == null) { throw new CoderException("cannot encode a null KV"); } - Context nestedContext = context.nested(); - keyCoder.encode(kv.getKey(), outStream, nestedContext); - valueCoder.encode(kv.getValue(), outStream, nestedContext); + keyCoder.encode(kv.getKey(), outStream, context.nested()); + valueCoder.encode(kv.getValue(), outStream, context); } @Override public KV decode(InputStream inStream, Context context) throws IOException, CoderException { - Context nestedContext = context.nested(); - K key = keyCoder.decode(inStream, nestedContext); - V value = valueCoder.decode(inStream, nestedContext); + K key = keyCoder.decode(inStream, context.nested()); + V value = valueCoder.decode(inStream, context); return KV.of(key, value); } @@ -135,10 +133,8 @@ public CloudObject asCloudObject() { */ @Override public boolean isRegisterByteSizeObserverCheap(KV kv, Context context) { - return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), - context.nested()) - && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), - context.nested()); + return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), context.nested()) + && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), context); } /** @@ -152,9 +148,7 @@ public void registerByteSizeObserver( if (kv == null) { throw new CoderException("cannot encode a null KV"); } - keyCoder.registerByteSizeObserver( - kv.getKey(), observer, context.nested()); - valueCoder.registerByteSizeObserver( - kv.getValue(), observer, context.nested()); + keyCoder.registerByteSizeObserver(kv.getKey(), observer, context.nested()); + valueCoder.registerByteSizeObserver(kv.getValue(), observer, context); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java index ebe705156ffbd..5c90a68d81472 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -81,10 +82,10 @@ public Coder getValueCoder() { ///////////////////////////////////////////////////////////////////////////// - Coder keyCoder; - Coder valueCoder; + private Coder keyCoder; + private Coder valueCoder; - MapCoder(Coder keyCoder, Coder valueCoder) { + private MapCoder(Coder keyCoder, Coder valueCoder) { this.keyCoder = keyCoder; this.valueCoder = valueCoder; } @@ -99,10 +100,17 @@ public void encode( throw new CoderException("cannot encode a null Map"); } DataOutputStream dataOutStream = new DataOutputStream(outStream); - dataOutStream.writeInt(map.size()); + int size = map.size(); + dataOutStream.writeInt(size); + int i = 0; for (Entry entry : map.entrySet()) { keyCoder.encode(entry.getKey(), outStream, context.nested()); - valueCoder.encode(entry.getValue(), outStream, context.nested()); + if (i < size - 1) { + valueCoder.encode(entry.getValue(), outStream, context.nested()); + } else { + valueCoder.encode(entry.getValue(), outStream, context); + } + ++i; } dataOutStream.flush(); } @@ -115,7 +123,12 @@ public Map decode(InputStream inStream, Context context) Map retval = Maps.newHashMapWithExpectedSize(size); for (int i = 0; i < size; ++i) { K key = keyCoder.decode(inStream, context.nested()); - V value = valueCoder.decode(inStream, context.nested()); + V value; + if (i < size - 1) { + value = valueCoder.decode(inStream, context.nested()); + } else { + value = valueCoder.decode(inStream, context); + } retval.put(key, value); } return retval; @@ -149,11 +162,15 @@ public void registerByteSizeObserver( Map map, ElementByteSizeObserver observer, Context context) throws Exception { observer.update(4L); - for (Entry entry : map.entrySet()) { - keyCoder.registerByteSizeObserver( - entry.getKey(), observer, context.nested()); - valueCoder.registerByteSizeObserver( - entry.getValue(), observer, context.nested()); + Iterator> entries = map.entrySet().iterator(); + while (entries.hasNext()) { + Entry entry = entries.next(); + keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested()); + if (entries.hasNext()) { + valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested()); + } else { + valueCoder.registerByteSizeObserver(entry.getValue(), observer, context); + } } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 1992cb8f535a7..58414c62cf6d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -110,17 +110,17 @@ private static class OutgoingMessageCoder extends AtomicCoder { public void encode( OutgoingMessage value, OutputStream outStream, Context context) throws CoderException, IOException { - ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED); - RECORD_ID_CODER.encode(value.recordId, outStream, Context.NESTED); + ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested()); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested()); + RECORD_ID_CODER.encode(value.recordId, outStream, context); } @Override public OutgoingMessage decode( InputStream inStream, Context context) throws CoderException, IOException { - byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED); - long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED); - @Nullable String recordId = RECORD_ID_CODER.decode(inStream, Context.NESTED); + byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested()); + long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested()); + @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context); return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java index 9ec030ff96b28..3ad7b6e499bdf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java @@ -109,7 +109,7 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro T value = valueCoder.decode(inStream, nestedContext); Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); BoundedWindow window = windowCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, context); return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index e4e1c50d8671d..fe7a53ac85b06 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -995,9 +995,10 @@ public void encode(Object[] value, OutputStream outStream, Context context) throws CoderException, IOException { checkArgument(value.length == codersCount); Context nestedContext = context.nested(); - for (int i = 0; i < codersCount; ++i) { + for (int i = 0; i < codersCount - 1; ++i) { coders.get(i).encode(value[i], outStream, nestedContext); } + coders.get(codersCount - 1).encode(value[codersCount - 1], outStream, context); } @Override @@ -1005,9 +1006,10 @@ public Object[] decode(InputStream inStream, Context context) throws CoderException, IOException { Object[] ret = new Object[codersCount]; Context nestedContext = context.nested(); - for (int i = 0; i < codersCount; ++i) { + for (int i = 0; i < codersCount - 1; ++i) { ret[i] = coders.get(i).decode(inStream, nestedContext); } + ret[codersCount - 1] = coders.get(codersCount - 1).decode(inStream, context); return ret; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java index 1a0791f993094..9eea3a0e174d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java @@ -185,18 +185,16 @@ static class CountSumCoder @Override public void encode(CountSum value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - LONG_CODER.encode(value.count, outStream, nestedContext); - DOUBLE_CODER.encode(value.sum, outStream, nestedContext); + LONG_CODER.encode(value.count, outStream, context.nested()); + DOUBLE_CODER.encode(value.sum, outStream, context); } @Override public CountSum decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); return new CountSum<>( - LONG_CODER.decode(inStream, nestedContext), - DOUBLE_CODER.decode(inStream, nestedContext)); + LONG_CODER.decode(inStream, context.nested()), + DOUBLE_CODER.decode(inStream, context)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 10ba3c9489da7..810aae098294c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -268,9 +268,11 @@ public void encode( if (!schema.equals(value.getSchema())) { throw new CoderException("input schema does not match coder schema"); } - for (int unionTag = 0; unionTag < schema.size(); unionTag++) { + int lastTag = schema.size() - 1; + for (int unionTag = 0; unionTag < lastTag; unionTag++) { tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, Context.NESTED); } + tagListCoder(lastTag).encode(value.valueMap.get(lastTag), outStream, context); } @Override @@ -279,9 +281,11 @@ public CoGbkResult decode( Context context) throws CoderException, IOException { List> valueMap = new ArrayList<>(); - for (int unionTag = 0; unionTag < schema.size(); unionTag++) { + int lastTag = schema.size() - 1; + for (int unionTag = 0; unionTag < lastTag; unionTag++) { valueMap.add(tagListCoder(unionTag).decode(inStream, Context.NESTED)); } + valueMap.add(tagListCoder(lastTag).decode(inStream, context)); return new CoGbkResult(schema, valueMap); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index af987966be922..fb0fc11d72043 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -185,14 +185,14 @@ public void encode(IntervalWindow window, Context context) throws IOException, CoderException { instantCoder.encode(window.end, outStream, context.nested()); - durationCoder.encode(new Duration(window.start, window.end), outStream, context.nested()); + durationCoder.encode(new Duration(window.start, window.end), outStream, context); } @Override public IntervalWindow decode(InputStream inStream, Context context) throws IOException, CoderException { Instant end = instantCoder.decode(inStream, context.nested()); - ReadableDuration duration = durationCoder.decode(inStream, context.nested()); + ReadableDuration duration = durationCoder.decode(inStream, context); return new IntervalWindow(end.minus(duration), end); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java index 0bfcddc3197b1..719548675a6f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java @@ -258,7 +258,7 @@ public void encode(TimerData timer, OutputStream outStream, Context context) STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext); STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext); INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext); - STRING_CODER.encode(timer.getDomain().name(), outStream, nestedContext); + STRING_CODER.encode(timer.getDomain().name(), outStream, context); } @Override @@ -269,7 +269,7 @@ public TimerData decode(InputStream inStream, Context context) StateNamespace namespace = StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder); Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext); - TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext)); + TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context)); return TimerData.of(timerId, namespace, timestamp, domain); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index a0b4cf5260962..f7035d70e5632 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -671,7 +671,7 @@ public WindowedValue decode(InputStream inStream, Context context) Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); Collection windows = windowsCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, context); return WindowedValue.of(value, timestamp, windows, pane); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java index f5d56cbc11f0f..9db50c8f0f627 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java @@ -75,29 +75,29 @@ public void testEncodingId() throws Exception { */ private static final List TEST_ENCODINGS = ImmutableList.of( - "swi4AjXYo7DnVscE2eM2Q-f7A-JpZD8CyiFN76ZfI4ZZzJtQa31MqLYUepjY4f6BXT8ZapFuM" - + "0AaxKyQjsrfF_0mZON_XJJle68X1L0-yBXeRP6yDKlpHD-LUwr-ivNwlWD-zdwQZONNjX9" - + "eivckz1QFUqJDLdWm6V_to30sySJAR87byZdCJvJS1cvjKgsjC6OijwCsL1JrJ9sfkeZTF" - + "f0bR7nTr9x5-hggYHJ-rczwYunX8wROP3tWJTfyN7GxeSCCrFeswbw87qkJg0Hw_-Nx_2L" - + "k5qCk9p96bXyCJnz6_d9Yk83re-Ru461odv273IZB-s3blouk1b2ihT15hsv7V7BIHAQ2m" - + "1whPlIyGS6VxVjp70vICpqYZZ2-dt6-nBbzitNCmR4l486Qi3obX-yV-RCfpUHDsnPdWQ", - "sgi4AjXYo7DnVscE2eM2Q-f7A-JpZD8CyiFN76ZfI4ZZzJtQa31MqLYUepjY4f6BXT8ZapFu" - + "M0AaxKyQjsrfF_0mZON_XJJle68X1L0-yBXeRP6yDKlpHD-LUwr-ivNwlWD-zdwQZONNj" - + "X9eivckz1QFUqJDLdWm6V_to30sySJAR87byZdCJvJS1cvjKgsjC6OijwCsL1JrJ9sfke" - + "ZTFf0bR7nTr9x5-hggYHJ-rczwYunX8wROP3tWJTfyN7GxeSCCrFeswbw87qkJg0Hw_-Nx" - + "_2Lk5qCk9p96bXyCJnz6_d9Yk83re-Ru461odv273IZB-s3blouk1b2ihT15hsv7V7BIHA" - + "Q2m1whPlIyGS6VxVjp70vICpqYZZ2-dt6-nBbzitNCmR4l486Qi3obX-yV-RCfpUHDsnPdWQ", - "AQGX", - "AAH_", - "AAEA", - "AAEB", - "MBUJEk1IAgE1H9Gsru39PDZgUqT1NnU", - "AIEBAP________gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", - "AIEBCf_______7AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"); + "swg12KOw51bHBNnjNkPn-wPiaWQ_AsohTe-mXyOGWcybUGt9TKi2FHqY2OH-gV0_GWqRbjNAGsSskI7K3xf9JmT" + + "jf1ySZXuvF9S9PsgV3kT-sgypaRw_i1MK_orzcJVg_s3cEGTjTY1_Xor3JM9UBVKiQy3Vpulf7aN9LMki" + + "QEfO28mXQibyUtXL4yoLIwujoo8ArC9SayfbH5HmUxX9G0e506_cefoYIGByfq3M8GLp1_METj97ViU38" + + "jexsXkggqxXrMG8PO6pCYNB8P_jcf9i5OagpPafem18giZ8-v3fWJPN63vkbuOtaHb9u9yGQfrN25aLpN" + + "W9ooU9eYbL-1ewSBwENptcIT5SMhkulcVY6e9LyAqamGWdvnbevpwW84rTQpkeJePOkIt6G1_slfkQn6V" + + "Bw7Jz3Vk", + "sgg12KOw51bHBNnjNkPn-wPiaWQ_AsohTe-mXyOGWcybUGt9TKi2FHqY2OH-gV0_GWqRbjNAGsSskI7K3xf9JmT" + + "jf1ySZXuvF9S9PsgV3kT-sgypaRw_i1MK_orzcJVg_s3cEGTjTY1_Xor3JM9UBVKiQy3Vpulf7aN9LMki" + + "QEfO28mXQibyUtXL4yoLIwujoo8ArC9SayfbH5HmUxX9G0e506_cefoYIGByfq3M8GLp1_METj97ViU38" + + "jexsXkggqxXrMG8PO6pCYNB8P_jcf9i5OagpPafem18giZ8-v3fWJPN63vkbuOtaHb9u9yGQfrN25aLpN" + + "W9ooU9eYbL-1ewSBwENptcIT5SMhkulcVY6e9LyAqamGWdvnbevpwW84rTQpkeJePOkIt6G1_slfkQn6V" + + "Bw7Jz3Vk", + "AZc", + "AP8", + "AAA", + "AAE", + "MAkSTUgCATUf0ayu7f08NmBSpPU2dQ", + "AAD________4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + + "AAAAAA", + "AAn_______-wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + + "AAAAAA"); @Test public void testWireFormatEncode() throws Exception { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java index 36190f9523c64..537cb59a16d88 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java @@ -175,7 +175,7 @@ public void encode(TestType value, OutputStream outStream, Context context) Context subContext = context.nested(); VarIntCoder.of().encode(3, outStream, subContext); jaxbCoder.encode(value, outStream, subContext); - VarLongCoder.of().encode(22L, outStream, subContext); + VarLongCoder.of().encode(22L, outStream, context); } @Override @@ -184,7 +184,7 @@ public TestType decode(InputStream inStream, Context context) Context subContext = context.nested(); VarIntCoder.of().decode(inStream, subContext); TestType result = jaxbCoder.decode(inStream, subContext); - VarLongCoder.of().decode(inStream, subContext); + VarLongCoder.of().decode(inStream, context); return result; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java index dc4a8b527f708..1053c79e1d1ef 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java @@ -85,7 +85,7 @@ public void testEncodingId() throws Exception { */ private static final List TEST_ENCODINGS = Arrays.asList( "AAAAAA", - "AAAAAv____8PA2ZvbwEFaGVsbG8"); + "AAAAAv____8PA2ZvbwFoZWxsbw"); @Test public void testWireFormatEncode() throws Exception { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 0ac950264d51f..f7839282d7de8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -905,14 +905,14 @@ private class CountSumCoder extends CustomCoder { @Override public void encode(CountSum value, OutputStream outStream, Context context) throws CoderException, IOException { - LONG_CODER.encode(value.count, outStream, context); + LONG_CODER.encode(value.count, outStream, context.nested()); DOUBLE_CODER.encode(value.sum, outStream, context); } @Override public CountSum decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { - long count = LONG_CODER.decode(inStream, context); + long count = LONG_CODER.decode(inStream, context.nested()); double sum = DOUBLE_CODER.decode(inStream, context); return new CountSum(count, sum); } @@ -930,7 +930,7 @@ public boolean isRegisterByteSizeObserverCheap( public void registerByteSizeObserver( CountSum value, ElementByteSizeObserver observer, Context context) throws Exception { - LONG_CODER.registerByteSizeObserver(value.count, observer, context); + LONG_CODER.registerByteSizeObserver(value.count, observer, context.nested()); DOUBLE_CODER.registerByteSizeObserver(value.sum, observer, context); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7bb1e510e994c..e3444aca338f9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2778,7 +2778,7 @@ public void encode(TableRowInfo value, OutputStream outStream, Context context) throw new CoderException("cannot encode a null value"); } tableRowCoder.encode(value.tableRow, outStream, context.nested()); - idCoder.encode(value.uniqueId, outStream, context.nested()); + idCoder.encode(value.uniqueId, outStream, context); } @Override @@ -2786,7 +2786,7 @@ public TableRowInfo decode(InputStream inStream, Context context) throws IOException { return new TableRowInfo( tableRowCoder.decode(inStream, context.nested()), - idCoder.decode(inStream, context.nested())); + idCoder.decode(inStream, context)); } @Override diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index 736a752f7741f..ea78f0934e851 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -66,7 +66,7 @@ public void encode(KafkaRecord value, OutputStream outStream, Context cont stringCoder.encode(value.getTopic(), outStream, nested); intCoder.encode(value.getPartition(), outStream, nested); longCoder.encode(value.getOffset(), outStream, nested); - kvCoder.encode(value.getKV(), outStream, nested); + kvCoder.encode(value.getKV(), outStream, context); } @Override @@ -77,7 +77,7 @@ public KafkaRecord decode(InputStream inStream, Context context) stringCoder.decode(inStream, nested), intCoder.decode(inStream, nested), longCoder.decode(inStream, nested), - kvCoder.decode(inStream, nested)); + kvCoder.decode(inStream, context)); } @Override diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java index fc087b50b6260..77fe127bcba6c 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java @@ -53,7 +53,7 @@ public void encode(KinesisRecord value, OutputStream outStream, Context context) VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested); INSTANT_CODER.encode(value.getReadTime(), outStream, nested); STRING_CODER.encode(value.getStreamName(), outStream, nested); - STRING_CODER.encode(value.getShardId(), outStream, nested); + STRING_CODER.encode(value.getShardId(), outStream, context); } @Override @@ -66,7 +66,7 @@ public KinesisRecord decode(InputStream inStream, Context context) throws IOExce long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested); Instant readTimestamp = INSTANT_CODER.decode(inStream, nested); String streamName = STRING_CODER.decode(inStream, nested); - String shardId = STRING_CODER.decode(inStream, nested); + String shardId = STRING_CODER.decode(inStream, context); return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey, approximateArrivalTimestamp, readTimestamp, streamName, shardId ); From 4e08e733de3088be7feef0f7d243345b1aa60552 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 22 Dec 2016 16:54:40 -0800 Subject: [PATCH 2/4] fix == 0 bugs in list-based standard-coders --- .../beam/sdk/transforms/CombineFns.java | 18 +++++++++++------ .../beam/sdk/transforms/join/CoGbkResult.java | 20 +++++++++++-------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index fe7a53ac85b06..d8e7cdd03d0ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -995,10 +995,13 @@ public void encode(Object[] value, OutputStream outStream, Context context) throws CoderException, IOException { checkArgument(value.length == codersCount); Context nestedContext = context.nested(); - for (int i = 0; i < codersCount - 1; ++i) { - coders.get(i).encode(value[i], outStream, nestedContext); + for (int i = 0; i < codersCount; ++i) { + if (i < codersCount - 1) { + coders.get(i).encode(value[i], outStream, nestedContext); + } else { + coders.get(i).encode(value[i], outStream, context); + } } - coders.get(codersCount - 1).encode(value[codersCount - 1], outStream, context); } @Override @@ -1006,10 +1009,13 @@ public Object[] decode(InputStream inStream, Context context) throws CoderException, IOException { Object[] ret = new Object[codersCount]; Context nestedContext = context.nested(); - for (int i = 0; i < codersCount - 1; ++i) { - ret[i] = coders.get(i).decode(inStream, nestedContext); + for (int i = 0; i < codersCount; ++i) { + if (i < codersCount - 1) { + ret[i] = coders.get(i).decode(inStream, nestedContext); + } else { + ret[i] = coders.get(i).decode(inStream, context); + } } - ret[codersCount - 1] = coders.get(codersCount - 1).decode(inStream, context); return ret; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 810aae098294c..53b3a8217a9ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -268,11 +268,13 @@ public void encode( if (!schema.equals(value.getSchema())) { throw new CoderException("input schema does not match coder schema"); } - int lastTag = schema.size() - 1; - for (int unionTag = 0; unionTag < lastTag; unionTag++) { - tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, Context.NESTED); + for (int unionTag = 0; unionTag < schema.size(); unionTag++) { + if (unionTag < schema.size() - 1) { + tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested()); + } else { + tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context); + } } - tagListCoder(lastTag).encode(value.valueMap.get(lastTag), outStream, context); } @Override @@ -281,11 +283,13 @@ public CoGbkResult decode( Context context) throws CoderException, IOException { List> valueMap = new ArrayList<>(); - int lastTag = schema.size() - 1; - for (int unionTag = 0; unionTag < lastTag; unionTag++) { - valueMap.add(tagListCoder(unionTag).decode(inStream, Context.NESTED)); + for (int unionTag = 0; unionTag < schema.size(); unionTag++) { + if (unionTag < schema.size() - 1) { + valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested())); + } else { + valueMap.add(tagListCoder(unionTag).decode(inStream, context)); + } } - valueMap.add(tagListCoder(lastTag).decode(inStream, context)); return new CoGbkResult(schema, valueMap); } From 78b0e5784a5e43db9c0ee2cf970163e2f6c111f7 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 28 Dec 2016 11:22:43 -0800 Subject: [PATCH 3/4] Rewrite loops using fewer ifs, reorder a few coders to put context-sensitive coders last --- .../org/apache/beam/sdk/coders/MapCoder.java | 56 +++++++++++-------- .../beam/sdk/testing/ValueInSingleWindow.java | 8 +-- .../beam/sdk/transforms/CombineFns.java | 24 ++++---- .../beam/sdk/transforms/join/CoGbkResult.java | 30 +++++----- .../apache/beam/sdk/util/WindowedValue.java | 13 +++-- .../apache/beam/sdk/coders/JAXBCoderTest.java | 12 ++-- 6 files changed, 80 insertions(+), 63 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java index 5c90a68d81472..18542efbe252c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -100,19 +101,25 @@ public void encode( throw new CoderException("cannot encode a null Map"); } DataOutputStream dataOutStream = new DataOutputStream(outStream); + int size = map.size(); dataOutStream.writeInt(size); - int i = 0; - for (Entry entry : map.entrySet()) { + if (size == 0) { + return; + } + + // Since we handled size == 0 above, entry is guaranteed to exist after before and after loop + Iterator> iterator = map.entrySet().iterator(); + Entry entry = iterator.next(); + while (iterator.hasNext()) { keyCoder.encode(entry.getKey(), outStream, context.nested()); - if (i < size - 1) { - valueCoder.encode(entry.getValue(), outStream, context.nested()); - } else { - valueCoder.encode(entry.getValue(), outStream, context); - } - ++i; + valueCoder.encode(entry.getValue(), outStream, context.nested()); + entry = iterator.next(); } - dataOutStream.flush(); + + keyCoder.encode(entry.getKey(), outStream, context.nested()); + valueCoder.encode(entry.getValue(), outStream, context); + // no flush needed as DataOutputStream does not buffer } @Override @@ -120,17 +127,20 @@ public Map decode(InputStream inStream, Context context) throws IOException, CoderException { DataInputStream dataInStream = new DataInputStream(inStream); int size = dataInStream.readInt(); + if (size == 0) { + return Collections.emptyMap(); + } + Map retval = Maps.newHashMapWithExpectedSize(size); - for (int i = 0; i < size; ++i) { + for (int i = 0; i < size - 1; ++i) { K key = keyCoder.decode(inStream, context.nested()); - V value; - if (i < size - 1) { - value = valueCoder.decode(inStream, context.nested()); - } else { - value = valueCoder.decode(inStream, context); - } + V value = valueCoder.decode(inStream, context.nested()); retval.put(key, value); } + + K key = keyCoder.decode(inStream, context.nested()); + V value = valueCoder.decode(inStream, context); + retval.put(key, value); return retval; } @@ -162,15 +172,17 @@ public void registerByteSizeObserver( Map map, ElementByteSizeObserver observer, Context context) throws Exception { observer.update(4L); + if (map.isEmpty()){ + return; + } Iterator> entries = map.entrySet().iterator(); + Entry entry = entries.next(); while (entries.hasNext()) { - Entry entry = entries.next(); keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested()); - if (entries.hasNext()) { - valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested()); - } else { - valueCoder.registerByteSizeObserver(entry.getValue(), observer, context); - } + valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested()); + entry = entries.next(); } + keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested()); + valueCoder.registerByteSizeObserver(entry.getValue(), observer, context); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java index 3ad7b6e499bdf..b746f6d723f94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java @@ -97,19 +97,19 @@ public static Coder of( public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, Context context) throws IOException { Context nestedContext = context.nested(); - valueCoder.encode(windowedElem.getValue(), outStream, nestedContext); InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext); windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext); - PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, context); + PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext); + valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override public ValueInSingleWindow decode(InputStream inStream, Context context) throws IOException { Context nestedContext = context.nested(); - T value = valueCoder.decode(inStream, nestedContext); Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); BoundedWindow window = windowCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, context); + PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + T value = valueCoder.decode(inStream, context); return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index d8e7cdd03d0ba..5a8eb0fe6b1b5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -994,28 +994,28 @@ public static ComposedAccumulatorCoder of( public void encode(Object[] value, OutputStream outStream, Context context) throws CoderException, IOException { checkArgument(value.length == codersCount); + if (value.length == 0) { + return; + } Context nestedContext = context.nested(); - for (int i = 0; i < codersCount; ++i) { - if (i < codersCount - 1) { - coders.get(i).encode(value[i], outStream, nestedContext); - } else { - coders.get(i).encode(value[i], outStream, context); - } + for (int i = 0; i < codersCount - 1; ++i) { + coders.get(i).encode(value[i], outStream, nestedContext); } + coders.get(codersCount - 1).encode(value[codersCount - 1], outStream, context); } @Override public Object[] decode(InputStream inStream, Context context) throws CoderException, IOException { Object[] ret = new Object[codersCount]; + if (codersCount == 0) { + return ret; + } Context nestedContext = context.nested(); - for (int i = 0; i < codersCount; ++i) { - if (i < codersCount - 1) { - ret[i] = coders.get(i).decode(inStream, nestedContext); - } else { - ret[i] = coders.get(i).decode(inStream, context); - } + for (int i = 0; i < codersCount - 1; ++i) { + ret[i] = coders.get(i).decode(inStream, nestedContext); } + ret[codersCount - 1] = coders.get(codersCount - 1).decode(inStream, context); return ret; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 53b3a8217a9ee..05fc61f58e7a5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; import java.io.IOException; import java.io.InputStream; @@ -268,13 +270,14 @@ public void encode( if (!schema.equals(value.getSchema())) { throw new CoderException("input schema does not match coder schema"); } - for (int unionTag = 0; unionTag < schema.size(); unionTag++) { - if (unionTag < schema.size() - 1) { - tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested()); - } else { - tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context); - } + if (schema.size() == 0) { + return; + } + int nested = schema.size() - 1; + for (int unionTag = 0; unionTag < nested; unionTag++) { + tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested()); } + tagListCoder(nested).encode(value.valueMap.get(nested), outStream, context); } @Override @@ -282,14 +285,15 @@ public CoGbkResult decode( InputStream inStream, Context context) throws CoderException, IOException { - List> valueMap = new ArrayList<>(); - for (int unionTag = 0; unionTag < schema.size(); unionTag++) { - if (unionTag < schema.size() - 1) { - valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested())); - } else { - valueMap.add(tagListCoder(unionTag).decode(inStream, context)); - } + if (schema.size() == 0) { + return new CoGbkResult(schema, ImmutableList.>of()); + } + int nested = schema.size() - 1; + List> valueMap = Lists.newArrayListWithExpectedSize(schema.size()); + for (int unionTag = 0; unionTag < nested; unionTag++) { + valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested())); } + valueMap.add(tagListCoder(nested).decode(inStream, context)); return new CoGbkResult(schema, valueMap); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index f7035d70e5632..1b3e6483ef177 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -656,22 +656,22 @@ public void encode(WindowedValue windowedElem, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - valueCoder.encode(windowedElem.getValue(), outStream, nestedContext); InstantCoder.of().encode( windowedElem.getTimestamp(), outStream, nestedContext); windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext); - PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, context); + PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext); + valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override public WindowedValue decode(InputStream inStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - T value = valueCoder.decode(inStream, nestedContext); Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); Collection windows = windowsCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, context); + PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + T value = valueCoder.decode(inStream, context); return WindowedValue.of(value, timestamp, windows, pane); } @@ -689,9 +689,10 @@ public void verifyDeterministic() throws NonDeterministicException { public void registerByteSizeObserver(WindowedValue value, ElementByteSizeObserver observer, Context context) throws Exception { + InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context.nested()); + windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context.nested()); + PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer, context.nested()); valueCoder.registerByteSizeObserver(value.getValue(), observer, context); - InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context); - windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java index 537cb59a16d88..c023278bc77ad 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java @@ -172,18 +172,18 @@ public TestCoder(JAXBCoder jaxbCoder) { @Override public void encode(TestType value, OutputStream outStream, Context context) throws CoderException, IOException { - Context subContext = context.nested(); - VarIntCoder.of().encode(3, outStream, subContext); - jaxbCoder.encode(value, outStream, subContext); + Context nestedContext = context.nested(); + VarIntCoder.of().encode(3, outStream, nestedContext); + jaxbCoder.encode(value, outStream, nestedContext); VarLongCoder.of().encode(22L, outStream, context); } @Override public TestType decode(InputStream inStream, Context context) throws CoderException, IOException { - Context subContext = context.nested(); - VarIntCoder.of().decode(inStream, subContext); - TestType result = jaxbCoder.decode(inStream, subContext); + Context nestedContext = context.nested(); + VarIntCoder.of().decode(inStream, nestedContext); + TestType result = jaxbCoder.decode(inStream, nestedContext); VarLongCoder.of().decode(inStream, context); return result; } From fb834cb35c5b725bf1ea22c4cb97facedde7b134 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 28 Dec 2016 15:29:24 -0800 Subject: [PATCH 4/4] fixup --- .../java/org/apache/beam/sdk/coders/MapCoder.java | 4 ++-- .../org/apache/beam/sdk/transforms/CombineFns.java | 10 ++++++---- .../apache/beam/sdk/transforms/join/CoGbkResult.java | 12 ++++++------ 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java index 18542efbe252c..94099be89bd33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java @@ -108,7 +108,7 @@ public void encode( return; } - // Since we handled size == 0 above, entry is guaranteed to exist after before and after loop + // Since we handled size == 0 above, entry is guaranteed to exist before and after loop Iterator> iterator = map.entrySet().iterator(); Entry entry = iterator.next(); while (iterator.hasNext()) { @@ -172,7 +172,7 @@ public void registerByteSizeObserver( Map map, ElementByteSizeObserver observer, Context context) throws Exception { observer.update(4L); - if (map.isEmpty()){ + if (map.isEmpty()) { return; } Iterator> entries = map.entrySet().iterator(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 5a8eb0fe6b1b5..79b2ab8b65771 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -997,11 +997,12 @@ public void encode(Object[] value, OutputStream outStream, Context context) if (value.length == 0) { return; } + int lastIndex = codersCount - 1; Context nestedContext = context.nested(); - for (int i = 0; i < codersCount - 1; ++i) { + for (int i = 0; i < lastIndex; ++i) { coders.get(i).encode(value[i], outStream, nestedContext); } - coders.get(codersCount - 1).encode(value[codersCount - 1], outStream, context); + coders.get(lastIndex).encode(value[lastIndex], outStream, context); } @Override @@ -1011,11 +1012,12 @@ public Object[] decode(InputStream inStream, Context context) if (codersCount == 0) { return ret; } + int lastIndex = codersCount - 1; Context nestedContext = context.nested(); - for (int i = 0; i < codersCount - 1; ++i) { + for (int i = 0; i < lastIndex; ++i) { ret[i] = coders.get(i).decode(inStream, nestedContext); } - ret[codersCount - 1] = coders.get(codersCount - 1).decode(inStream, context); + ret[lastIndex] = coders.get(lastIndex).decode(inStream, context); return ret; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 05fc61f58e7a5..7b849e786adbd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -273,11 +273,11 @@ public void encode( if (schema.size() == 0) { return; } - int nested = schema.size() - 1; - for (int unionTag = 0; unionTag < nested; unionTag++) { + int lastIndex = schema.size() - 1; + for (int unionTag = 0; unionTag < lastIndex; unionTag++) { tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested()); } - tagListCoder(nested).encode(value.valueMap.get(nested), outStream, context); + tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context); } @Override @@ -288,12 +288,12 @@ public CoGbkResult decode( if (schema.size() == 0) { return new CoGbkResult(schema, ImmutableList.>of()); } - int nested = schema.size() - 1; + int lastIndex = schema.size() - 1; List> valueMap = Lists.newArrayListWithExpectedSize(schema.size()); - for (int unionTag = 0; unionTag < nested; unionTag++) { + for (int unionTag = 0; unionTag < lastIndex; unionTag++) { valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested())); } - valueMap.add(tagListCoder(nested).decode(inStream, context)); + valueMap.add(tagListCoder(lastIndex).decode(inStream, context)); return new CoGbkResult(schema, valueMap); }