From 1fc048385fb0fea93eef85f614586448a3ea7c2a Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Thu, 14 Apr 2016 14:50:34 +0100 Subject: [PATCH 1/5] Support columnar in memory representation on Big Endian platforms --- .../parquet/VectorizedPlainValuesReader.java | 16 ++++++ .../vectorized/OffHeapColumnVector.java | 56 +++++++++++++++---- .../vectorized/OnHeapColumnVector.java | 43 ++++++++++---- .../vectorized/ColumnarBatchSuite.scala | 9 +++ 4 files changed, 104 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 2672e0453b392..1931e9f0e0f7c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.spark.sql.execution.vectorized.ColumnVector; import org.apache.spark.unsafe.Platform; @@ -31,6 +33,8 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori private byte[] buffer; private int offset; private int bitOffset; // Only used for booleans. + + private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); public VectorizedPlainValuesReader() { } @@ -103,6 +107,9 @@ public final boolean readBoolean() { @Override public final int readInteger() { int v = Platform.getInt(buffer, offset); + if (bigEndianPlatform) { + v = java.lang.Integer.reverseBytes(v); + } offset += 4; return v; } @@ -110,6 +117,9 @@ public final int readInteger() { @Override public final long readLong() { long v = Platform.getLong(buffer, offset); + if (bigEndianPlatform) { + v = java.lang.Long.reverseBytes(v); + } offset += 8; return v; } @@ -122,6 +132,9 @@ public final byte readByte() { @Override public final float readFloat() { float v = Platform.getFloat(buffer, offset); + if (bigEndianPlatform) { + v = ByteBuffer.allocate(4).putFloat(v).order(ByteOrder.LITTLE_ENDIAN).getFloat(0); + } offset += 4; return v; } @@ -129,6 +142,9 @@ public final float readFloat() { @Override public final double readDouble() { double v = Platform.getDouble(buffer, offset); + if (bigEndianPlatform) { + v = ByteBuffer.allocate(8).putDouble(v).order(ByteOrder.LITTLE_ENDIAN).getDouble(0); + } offset += 8; return v; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index b1901411351a2..7a6ed72ef6cfa 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.vectorized; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import org.apache.commons.lang.NotImplementedException; @@ -28,6 +29,9 @@ * Column data backed using offheap memory. */ public final class OffHeapColumnVector extends ColumnVector { + + private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); + // The data stored in these two allocations need to maintain binary compatible. We can // directly pass this buffer to external components. private long nulls; @@ -39,9 +43,7 @@ public final class OffHeapColumnVector extends ColumnVector { protected OffHeapColumnVector(int capacity, DataType type) { super(capacity, type, MemoryMode.OFF_HEAP); - if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) { - throw new NotImplementedException("Only little endian is supported."); - } + nulls = 0; data = 0; lengthData = 0; @@ -221,8 +223,16 @@ public void putInts(int rowId, int count, int[] src, int srcIndex) { @Override public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { - Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET, - null, data + 4 * rowId, count * 4); + if (!bigEndianPlatform) { + Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET, + null, data + 4 * rowId, count * 4); + } else { + int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; + long offset = data + 4 * rowId; + for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) { + Platform.putInt(null, offset, java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset))); + } + } } @Override @@ -259,8 +269,16 @@ public void putLongs(int rowId, int count, long[] src, int srcIndex) { @Override public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { - Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET, - null, data + 8 * rowId, count * 8); + if (!bigEndianPlatform) { + Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET, + null, data + 8 * rowId, count * 8); + } else { + int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; + long offset = data + 8 * rowId; + for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) { + Platform.putLong(null, offset, java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset))); + } + } } @Override @@ -297,8 +315,17 @@ public void putFloats(int rowId, int count, float[] src, int srcIndex) { @Override public void putFloats(int rowId, int count, byte[] src, int srcIndex) { - Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, - null, data + rowId * 4, count * 4); + if (!bigEndianPlatform) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, + null, data + rowId * 4, count * 4); + } else { + int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; + long offset = data + 4 * rowId; + for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) { + Platform.putFloat(null, offset, ByteBuffer.allocate(4).putFloat(Platform.getFloat(src, srcOffset)) + .order(ByteOrder.LITTLE_ENDIAN).getFloat(0)); + } + } } @Override @@ -336,8 +363,17 @@ public void putDoubles(int rowId, int count, double[] src, int srcIndex) { @Override public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { - Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, + if (!bigEndianPlatform) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId * 8, count * 8); + } else { + int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; + long offset = data + 8 * rowId; + for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) { + Platform.putDouble(null, offset, ByteBuffer.allocate(8).putDouble(Platform.getDouble(src, srcOffset)) + .order(ByteOrder.LITTLE_ENDIAN).getDouble(0)); + } + } } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index e97276800daa8..83e95db2f242f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.vectorized; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; import org.apache.spark.memory.MemoryMode; @@ -27,6 +29,9 @@ * and a java array for the values. */ public final class OnHeapColumnVector extends ColumnVector { + + private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); + // The data stored in these arrays need to maintain binary compatible. We can // directly pass this buffer to external components. @@ -211,10 +216,11 @@ public void putInts(int rowId, int count, int[] src, int srcIndex) { @Override public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; - for (int i = 0; i < count; ++i) { + for (int i = 0; i < count; ++i, srcOffset += 4) { intData[i + rowId] = Platform.getInt(src, srcOffset); - srcIndex += 4; - srcOffset += 4; + if (bigEndianPlatform) { + intData[i + rowId] = java.lang.Integer.reverseBytes(intData[i + rowId]); + } } } @@ -251,10 +257,11 @@ public void putLongs(int rowId, int count, long[] src, int srcIndex) { @Override public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; - for (int i = 0; i < count; ++i) { + for (int i = 0; i < count; ++i, srcOffset += 8) { longData[i + rowId] = Platform.getLong(src, srcOffset); - srcIndex += 8; - srcOffset += 8; + if (bigEndianPlatform) { + longData[i + rowId] = java.lang.Long.reverseBytes(longData[i + rowId]); + } } } @@ -286,8 +293,16 @@ public void putFloats(int rowId, int count, float[] src, int srcIndex) { @Override public void putFloats(int rowId, int count, byte[] src, int srcIndex) { - Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, - floatData, Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4); + if (!bigEndianPlatform) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, floatData, + Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4); + } else { + int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; + for (int i = 0; i < count; ++i, srcOffset += 4) { + floatData[i + rowId] = ByteBuffer.allocate(4).putFloat(Platform.getFloat(src, srcOffset)) + .order(ByteOrder.LITTLE_ENDIAN).getFloat(0); + } + } } @Override @@ -320,8 +335,16 @@ public void putDoubles(int rowId, int count, double[] src, int srcIndex) { @Override public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { - Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData, - Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8); + if (!bigEndianPlatform) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData, + Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8); + } else { + int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; + for (int i = 0; i < count; ++i, srcOffset += 8) { + doubleData[i + rowId] = ByteBuffer.allocate(8).putDouble(Platform.getDouble(src, srcOffset)) + .order(ByteOrder.LITTLE_ENDIAN).getDouble(0); + } + } } @Override diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 31b63f2ce13d5..f99e0aa7f1f9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.vectorized import java.nio.charset.StandardCharsets +import java.nio.ByteBuffer +import java.nio.ByteOrder import scala.collection.JavaConverters._ import scala.collection.mutable @@ -280,6 +282,13 @@ class ColumnarBatchSuite extends SparkFunSuite { Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234) Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123) + if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { + // Ensure array contains Liitle Endian doubles + var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) + Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0)) + Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8)) + } + column.putDoubles(idx, 1, buffer, 8) column.putDoubles(idx + 1, 1, buffer, 0) reference += 1.123 From 3eb481d8c30639c5b9a219e4891ccaccf73075b0 Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Thu, 14 Apr 2016 20:24:48 +0100 Subject: [PATCH 2/5] Use ByteBuffer.wrap instead of allocate --- .../parquet/VectorizedPlainValuesReader.java | 16 ++++++++++------ .../vectorized/OffHeapColumnVector.java | 8 ++++---- .../execution/vectorized/OnHeapColumnVector.java | 8 ++++---- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 1931e9f0e0f7c..e9eec443c149d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -131,9 +131,11 @@ public final byte readByte() { @Override public final float readFloat() { - float v = Platform.getFloat(buffer, offset); - if (bigEndianPlatform) { - v = ByteBuffer.allocate(4).putFloat(v).order(ByteOrder.LITTLE_ENDIAN).getFloat(0); + float v; + if (!bigEndianPlatform) { + v = Platform.getFloat(buffer, offset); + } else { + v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getFloat(offset); } offset += 4; return v; @@ -141,9 +143,11 @@ public final float readFloat() { @Override public final double readDouble() { - double v = Platform.getDouble(buffer, offset); - if (bigEndianPlatform) { - v = ByteBuffer.allocate(8).putDouble(v).order(ByteOrder.LITTLE_ENDIAN).getDouble(0); + double v; + if (!bigEndianPlatform) { + v = Platform.getDouble(buffer, offset); + } else { + v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getDouble(offset); } offset += 8; return v; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 7a6ed72ef6cfa..174a93bbcbe98 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -319,11 +319,11 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId * 4, count * 4); } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; long offset = data + 4 * rowId; for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) { - Platform.putFloat(null, offset, ByteBuffer.allocate(4).putFloat(Platform.getFloat(src, srcOffset)) - .order(ByteOrder.LITTLE_ENDIAN).getFloat(0)); + Platform.putFloat(null, offset, bb.getFloat(srcOffset)); } } } @@ -367,11 +367,11 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId * 8, count * 8); } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; long offset = data + 8 * rowId; for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) { - Platform.putDouble(null, offset, ByteBuffer.allocate(8).putDouble(Platform.getDouble(src, srcOffset)) - .order(ByteOrder.LITTLE_ENDIAN).getDouble(0)); + Platform.putDouble(null, offset, bb.getDouble(srcOffset)); } } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 83e95db2f242f..fd408b50ffec1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -297,10 +297,10 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, floatData, Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4); } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; for (int i = 0; i < count; ++i, srcOffset += 4) { - floatData[i + rowId] = ByteBuffer.allocate(4).putFloat(Platform.getFloat(src, srcOffset)) - .order(ByteOrder.LITTLE_ENDIAN).getFloat(0); + floatData[i + rowId] = bb.getFloat(srcOffset); } } } @@ -339,10 +339,10 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData, Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8); } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; for (int i = 0; i < count; ++i, srcOffset += 8) { - doubleData[i + rowId] = ByteBuffer.allocate(8).putDouble(Platform.getDouble(src, srcOffset)) - .order(ByteOrder.LITTLE_ENDIAN).getDouble(0); + doubleData[i + rowId] = bb.getDouble(srcOffset); } } } From 69fc667266c5efe97f796c6b4e8d14470168867d Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Fri, 15 Apr 2016 11:10:09 +0100 Subject: [PATCH 3/5] Fix offsets --- .../parquet/VectorizedPlainValuesReader.java | 4 ++-- .../sql/execution/vectorized/OffHeapColumnVector.java | 10 ++++------ .../sql/execution/vectorized/OnHeapColumnVector.java | 10 ++++------ 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index e9eec443c149d..3b3364d53a242 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -135,7 +135,7 @@ public final float readFloat() { if (!bigEndianPlatform) { v = Platform.getFloat(buffer, offset); } else { - v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getFloat(offset); + v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getFloat(offset - Platform.BYTE_ARRAY_OFFSET); } offset += 4; return v; @@ -147,7 +147,7 @@ public final double readDouble() { if (!bigEndianPlatform) { v = Platform.getDouble(buffer, offset); } else { - v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getDouble(offset); + v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getDouble(offset - Platform.BYTE_ARRAY_OFFSET); } offset += 8; return v; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 174a93bbcbe98..b8dd16227ec17 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -320,10 +320,9 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { null, data + rowId * 4, count * 4); } else { ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); - int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; long offset = data + 4 * rowId; - for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) { - Platform.putFloat(null, offset, bb.getFloat(srcOffset)); + for (int i = 0; i < count; ++i, offset += 4) { + Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i))); } } } @@ -368,10 +367,9 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { null, data + rowId * 8, count * 8); } else { ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); - int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; long offset = data + 8 * rowId; - for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) { - Platform.putDouble(null, offset, bb.getDouble(srcOffset)); + for (int i = 0; i < count; ++i, offset += 8) { + Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i))); } } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index fd408b50ffec1..b1ffe4c21049b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -298,9 +298,8 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4); } else { ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); - int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; - for (int i = 0; i < count; ++i, srcOffset += 4) { - floatData[i + rowId] = bb.getFloat(srcOffset); + for (int i = 0; i < count; ++i) { + floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i)); } } } @@ -340,9 +339,8 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8); } else { ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); - int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; - for (int i = 0; i < count; ++i, srcOffset += 8) { - doubleData[i + rowId] = bb.getDouble(srcOffset); + for (int i = 0; i < count; ++i) { + doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i)); } } } From a1f06106d321ca40bef6dcd7865484fd79976b08 Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Fri, 15 Apr 2016 12:55:21 +0100 Subject: [PATCH 4/5] Wrap byte array once --- .../datasources/parquet/VectorizedPlainValuesReader.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 3b3364d53a242..6523520b92a42 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -33,6 +33,7 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori private byte[] buffer; private int offset; private int bitOffset; // Only used for booleans. + private ByteBuffer byteBuffer; // used to wrap the byte array buffer private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); @@ -43,6 +44,9 @@ public VectorizedPlainValuesReader() { public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException { this.buffer = bytes; this.offset = offset + Platform.BYTE_ARRAY_OFFSET; + if (bigEndianPlatform) { + byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + } } @Override @@ -135,7 +139,7 @@ public final float readFloat() { if (!bigEndianPlatform) { v = Platform.getFloat(buffer, offset); } else { - v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getFloat(offset - Platform.BYTE_ARRAY_OFFSET); + v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET); } offset += 4; return v; @@ -147,7 +151,7 @@ public final double readDouble() { if (!bigEndianPlatform) { v = Platform.getDouble(buffer, offset); } else { - v = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getDouble(offset - Platform.BYTE_ARRAY_OFFSET); + v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET); } offset += 8; return v; From a652865e9f59ca4cf4fc596141ae0511284462b4 Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Fri, 15 Apr 2016 13:06:37 +0100 Subject: [PATCH 5/5] remove trailing spaces --- .../datasources/parquet/VectorizedPlainValuesReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 6523520b92a42..9475c853a03ff 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -149,7 +149,7 @@ public final float readFloat() { public final double readDouble() { double v; if (!bigEndianPlatform) { - v = Platform.getDouble(buffer, offset); + v = Platform.getDouble(buffer, offset); } else { v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET); }