Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.bitset.BitSetMethods;


import org.apache.commons.lang.NotImplementedException;
Expand All @@ -41,7 +42,7 @@ protected OffHeapColumnVector(int capacity, DataType type) {
throw new NotImplementedException("Only little endian is supported.");
}

this.nulls = Platform.allocateMemory(capacity);
this.nulls = Platform.allocateMemory(((capacity + 63) / 64) * 8);
if (type instanceof IntegerType) {
this.data = Platform.allocateMemory(capacity * 4);
} else if (type instanceof DoubleType) {
Expand Down Expand Up @@ -77,21 +78,20 @@ public final void close() {

@Override
public final void putNotNull(int rowId) {
Platform.putByte(null, nulls + rowId, (byte) 0);
BitSetMethods.unset(null, nulls, rowId);
}

@Override
public final void putNull(int rowId) {
Platform.putByte(null, nulls + rowId, (byte) 1);
BitSetMethods.set(null, nulls, rowId);
++numNulls;
anyNullsSet = true;
}

@Override
public final void putNulls(int rowId, int count) {
long offset = nulls + rowId;
for (int i = 0; i < count; ++i, ++offset) {
Platform.putByte(null, offset, (byte) 1);
for (int i = 0; i < count; ++i) {
BitSetMethods.set(null, nulls, rowId + i);
}
anyNullsSet = true;
numNulls += count;
Expand All @@ -100,15 +100,14 @@ public final void putNulls(int rowId, int count) {
@Override
public final void putNotNulls(int rowId, int count) {
if (!anyNullsSet) return;
long offset = nulls + rowId;
for (int i = 0; i < count; ++i, ++offset) {
Platform.putByte(null, offset, (byte) 0);
for (int i = 0; i < count; ++i) {
BitSetMethods.unset(null, nulls, rowId + i);
}
}

@Override
public final boolean getIsNull(int rowId) {
return Platform.getByte(null, nulls + rowId) == 1;
return BitSetMethods.isSet(null, nulls, rowId);
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.BitSet;

import java.nio.ByteBuffer;
import java.nio.DoubleBuffer;
Expand All @@ -33,8 +34,8 @@ public final class OnHeapColumnVector extends ColumnVector {
// The data stored in these arrays need to maintain binary compatible. We can
// directly pass this buffer to external components.

// This is faster than a boolean array and we optimize this over memory footprint.
private byte[] nulls;
// We use BitSet to reduce memory footprint
private BitSet nulls;

// Array for each type. Only 1 is populated for any type.
private int[] intData;
Expand All @@ -49,7 +50,7 @@ protected OnHeapColumnVector(int capacity, DataType type) {
} else {
throw new RuntimeException("Unhandled " + type);
}
this.nulls = new byte[capacity];
this.nulls = new BitSet(capacity);
reset();
}

Expand All @@ -76,20 +77,20 @@ public final void close() {

@Override
public final void putNotNull(int rowId) {
nulls[rowId] = (byte)0;
nulls.unset(rowId);
}

@Override
public final void putNull(int rowId) {
nulls[rowId] = (byte)1;
nulls.set(rowId);
++numNulls;
anyNullsSet = true;
}

@Override
public final void putNulls(int rowId, int count) {
for (int i = 0; i < count; ++i) {
nulls[rowId + i] = (byte)1;
nulls.set(rowId + i);
}
anyNullsSet = true;
numNulls += count;
Expand All @@ -99,13 +100,13 @@ public final void putNulls(int rowId, int count) {
public final void putNotNulls(int rowId, int count) {
if (!anyNullsSet) return;
for (int i = 0; i < count; ++i) {
nulls[rowId + i] = (byte)0;
nulls.unset(rowId + i);
}
}

@Override
public final boolean getIsNull(int rowId) {
return nulls[rowId] == 1;
return nulls.get(rowId);
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.bitset.BitSetMethods;

class ColumnarBatchSuite extends SparkFunSuite {
test("Null Apis") {
Expand Down Expand Up @@ -67,7 +68,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(v._1 == column.getIsNull(v._2))
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.nullsNativeAddress()
assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
assert(v._1 == BitSetMethods.isSet(null, addr, v._2), "index=" + v._2)
}
}
column.close
Expand Down