From 373073c0f7330098cce463c2b43fc1d2f4479bdf Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 19 Jan 2016 13:46:59 -0500 Subject: [PATCH] Use bit vectors to represent null fields for reducing memory footprint Use BitSet for OnHeapColumnVector Use BitSetMethod for OffHeapColumnVector --- .../vectorized/OffHeapColumnVector.java | 19 +++++++++---------- .../vectorized/OnHeapColumnVector.java | 17 +++++++++-------- .../vectorized/ColumnarBatchSuite.scala | 3 ++- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 6180dd308e5e3..cb74bb05728a4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -22,6 +22,7 @@ import org.apache.spark.sql.types.DoubleType; import org.apache.spark.sql.types.IntegerType; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.bitset.BitSetMethods; import org.apache.commons.lang.NotImplementedException; @@ -41,7 +42,7 @@ protected OffHeapColumnVector(int capacity, DataType type) { throw new NotImplementedException("Only little endian is supported."); } - this.nulls = Platform.allocateMemory(capacity); + this.nulls = Platform.allocateMemory(((capacity + 63) / 64) * 8); if (type instanceof IntegerType) { this.data = Platform.allocateMemory(capacity * 4); } else if (type instanceof DoubleType) { @@ -77,21 +78,20 @@ public final void close() { @Override public final void putNotNull(int rowId) { - Platform.putByte(null, nulls + rowId, (byte) 0); + BitSetMethods.unset(null, nulls, rowId); } @Override public final void putNull(int rowId) { - Platform.putByte(null, nulls + rowId, (byte) 1); + BitSetMethods.set(null, nulls, rowId); ++numNulls; anyNullsSet = true; } @Override public final void putNulls(int rowId, int count) { - long offset = nulls + rowId; - for (int i = 0; i < count; ++i, ++offset) { - Platform.putByte(null, offset, (byte) 1); + for (int i = 0; i < count; ++i) { + BitSetMethods.set(null, nulls, rowId + i); } anyNullsSet = true; numNulls += count; @@ -100,15 +100,14 @@ public final void putNulls(int rowId, int count) { @Override public final void putNotNulls(int rowId, int count) { if (!anyNullsSet) return; - long offset = nulls + rowId; - for (int i = 0; i < count; ++i, ++offset) { - Platform.putByte(null, offset, (byte) 0); + for (int i = 0; i < count; ++i) { + BitSetMethods.unset(null, nulls, rowId + i); } } @Override public final boolean getIsNull(int rowId) { - return Platform.getByte(null, nulls + rowId) == 1; + return BitSetMethods.isSet(null, nulls, rowId); } // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 76d9956c3842f..e10613e2c9233 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -20,6 +20,7 @@ import org.apache.spark.sql.types.DoubleType; import org.apache.spark.sql.types.IntegerType; import org.apache.spark.unsafe.Platform; +import org.apache.spark.util.collection.BitSet; import java.nio.ByteBuffer; import java.nio.DoubleBuffer; @@ -33,8 +34,8 @@ public final class OnHeapColumnVector extends ColumnVector { // The data stored in these arrays need to maintain binary compatible. We can // directly pass this buffer to external components. - // This is faster than a boolean array and we optimize this over memory footprint. - private byte[] nulls; + // We use BitSet to reduce memory footprint + private BitSet nulls; // Array for each type. Only 1 is populated for any type. private int[] intData; @@ -49,7 +50,7 @@ protected OnHeapColumnVector(int capacity, DataType type) { } else { throw new RuntimeException("Unhandled " + type); } - this.nulls = new byte[capacity]; + this.nulls = new BitSet(capacity); reset(); } @@ -76,12 +77,12 @@ public final void close() { @Override public final void putNotNull(int rowId) { - nulls[rowId] = (byte)0; + nulls.unset(rowId); } @Override public final void putNull(int rowId) { - nulls[rowId] = (byte)1; + nulls.set(rowId); ++numNulls; anyNullsSet = true; } @@ -89,7 +90,7 @@ public final void putNull(int rowId) { @Override public final void putNulls(int rowId, int count) { for (int i = 0; i < count; ++i) { - nulls[rowId + i] = (byte)1; + nulls.set(rowId + i); } anyNullsSet = true; numNulls += count; @@ -99,13 +100,13 @@ public final void putNulls(int rowId, int count) { public final void putNotNulls(int rowId, int count) { if (!anyNullsSet) return; for (int i = 0; i < count; ++i) { - nulls[rowId + i] = (byte)0; + nulls.unset(rowId + i); } } @Override public final boolean getIsNull(int rowId) { - return nulls[rowId] == 1; + return nulls.get(rowId); } // diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index d5e517c7f56be..562ab962893be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.bitset.BitSetMethods; class ColumnarBatchSuite extends SparkFunSuite { test("Null Apis") { @@ -67,7 +68,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(v._1 == column.getIsNull(v._2)) if (memMode == MemoryMode.OFF_HEAP) { val addr = column.nullsNativeAddress() - assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2) + assert(v._1 == BitSetMethods.isSet(null, addr, v._2), "index=" + v._2) } } column.close