From ea7fd2ff6454e8d819a39bf49901074e49b5714e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 14 Jun 2015 09:34:35 -0700 Subject: [PATCH] [SPARK-8354] [SQL] Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMap UnsafeFixedWidthAggregationMap contains an off-by-factor-of-8 error when allocating row conversion scratch space: we take a size requirement, measured in bytes, then allocate a long array of that size. This means that we end up allocating 8x too much conversion space. This patch fixes this by allocating a `byte[]` array instead. This doesn't impose any new limitations on the maximum sizes of UnsafeRows, since UnsafeRowConverter already used integers when calculating the size requirements for rows. Author: Josh Rosen Closes #6809 from JoshRosen/sql-bytes-vs-words-fix and squashes the following commits: 6520339 [Josh Rosen] Updates to reflect fact that UnsafeRow max size is constrained by max byte[] size --- .../UnsafeFixedWidthAggregationMap.java | 30 +++++++++---------- .../expressions/UnsafeRowConverter.scala | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java index b23e0efc83332..f7849ebebc573 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java @@ -39,7 +39,7 @@ public final class UnsafeFixedWidthAggregationMap { * An empty aggregation buffer, encoded in UnsafeRow format. When inserting a new key into the * map, we copy this buffer and use it as the value. */ - private final long[] emptyAggregationBuffer; + private final byte[] emptyAggregationBuffer; private final StructType aggregationBufferSchema; @@ -63,10 +63,10 @@ public final class UnsafeFixedWidthAggregationMap { /** * Scratch space that is used when encoding grouping keys into UnsafeRow format. * - * By default, this is a 1MB array, but it will grow as necessary in case larger keys are + * By default, this is a 8 kb array, but it will grow as necessary in case larger keys are * encountered. */ - private long[] groupingKeyConversionScratchSpace = new long[1024 / 8]; + private byte[] groupingKeyConversionScratchSpace = new byte[1024 * 8]; private final boolean enablePerfMetrics; @@ -123,13 +123,13 @@ public UnsafeFixedWidthAggregationMap( } /** - * Convert a Java object row into an UnsafeRow, allocating it into a new long array. + * Convert a Java object row into an UnsafeRow, allocating it into a new byte array. */ - private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) { + private static byte[] convertToUnsafeRow(InternalRow javaRow, StructType schema) { final UnsafeRowConverter converter = new UnsafeRowConverter(schema); - final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)]; - final long writtenLength = - converter.writeRow(javaRow, unsafeRow, PlatformDependent.LONG_ARRAY_OFFSET); + final byte[] unsafeRow = new byte[converter.getSizeRequirement(javaRow)]; + final int writtenLength = + converter.writeRow(javaRow, unsafeRow, PlatformDependent.BYTE_ARRAY_OFFSET); assert (writtenLength == unsafeRow.length): "Size requirement calculation was wrong!"; return unsafeRow; } @@ -143,34 +143,34 @@ public UnsafeRow getAggregationBuffer(InternalRow groupingKey) { // Make sure that the buffer is large enough to hold the key. If it's not, grow it: if (groupingKeySize > groupingKeyConversionScratchSpace.length) { // This new array will be initially zero, so there's no need to zero it out here - groupingKeyConversionScratchSpace = new long[groupingKeySize]; + groupingKeyConversionScratchSpace = new byte[groupingKeySize]; } else { // Zero out the buffer that's used to hold the current row. This is necessary in order // to ensure that rows hash properly, since garbage data from the previous row could // otherwise end up as padding in this row. As a performance optimization, we only zero out // the portion of the buffer that we'll actually write to. - Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, 0); + Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, (byte) 0); } - final long actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow( + final int actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow( groupingKey, groupingKeyConversionScratchSpace, - PlatformDependent.LONG_ARRAY_OFFSET); + PlatformDependent.BYTE_ARRAY_OFFSET); assert (groupingKeySize == actualGroupingKeySize) : "Size requirement calculation was wrong!"; // Probe our map using the serialized key final BytesToBytesMap.Location loc = map.lookup( groupingKeyConversionScratchSpace, - PlatformDependent.LONG_ARRAY_OFFSET, + PlatformDependent.BYTE_ARRAY_OFFSET, groupingKeySize); if (!loc.isDefined()) { // This is the first time that we've seen this grouping key, so we'll insert a copy of the // empty aggregation buffer into the map: loc.putNewKey( groupingKeyConversionScratchSpace, - PlatformDependent.LONG_ARRAY_OFFSET, + PlatformDependent.BYTE_ARRAY_OFFSET, groupingKeySize, emptyAggregationBuffer, - PlatformDependent.LONG_ARRAY_OFFSET, + PlatformDependent.BYTE_ARRAY_OFFSET, emptyAggregationBuffer.length ); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala index d771e454b5170..5c92f41c639fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala @@ -68,7 +68,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) { * @param baseOffset the base offset of the destination address * @return the number of bytes written. This should be equal to `getSizeRequirement(row)`. */ - def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Long = { + def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Int = { unsafeRow.pointTo(baseObject, baseOffset, writers.length, null) var fieldNumber = 0 var appendCursor: Int = fixedLengthSize