From 835754344e041f8d8264c1309eb10b5a3e26f508 Mon Sep 17 00:00:00 2001 From: jinxing Date: Fri, 23 Jun 2017 20:11:40 +0800 Subject: [PATCH 1/4] Fail the putNullmethod when containsNull=false. --- .../execution/vectorized/ColumnVector.java | 26 +++++++++++++---- .../vectorized/OffHeapColumnVector.java | 7 +++-- .../vectorized/OnHeapColumnVector.java | 7 +++-- .../vectorized/ColumnarBatchSuite.scala | 29 +++++++++++++++++++ 4 files changed, 60 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 0c027f80d48cc..d74c61cf98c38 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -62,10 +62,18 @@ public abstract class ColumnVector implements AutoCloseable { * in number of elements, not number of bytes. */ public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) { + return allocate(capacity, type, mode, true); + } + + public static ColumnVector allocate( + int capacity, + DataType type, + MemoryMode mode, + Boolean containsNull) { if (mode == MemoryMode.OFF_HEAP) { - return new OffHeapColumnVector(capacity, type); + return new OffHeapColumnVector(capacity, type, containsNull); } else { - return new OnHeapColumnVector(capacity, type); + return new OnHeapColumnVector(capacity, type, containsNull); } } @@ -958,6 +966,11 @@ public final int appendStruct(boolean isNull) { */ protected final DataType type; + /** + * Indicates if values can be `null`. + */ + protected boolean containsNull; + /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. */ @@ -1049,22 +1062,25 @@ public ColumnVector getDictionaryIds() { * Sets up the common state and also handles creating the child columns if this is a nested * type. */ - protected ColumnVector(int capacity, DataType type, MemoryMode memMode) { + protected ColumnVector(int capacity, DataType type, MemoryMode memMode, Boolean containsNull) { this.capacity = capacity; this.type = type; + this.containsNull = containsNull; if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType || DecimalType.isByteArrayDecimalType(type)) { DataType childType; int childCapacity = capacity; + this.childColumns = new ColumnVector[1]; if (type instanceof ArrayType) { childType = ((ArrayType)type).elementType(); + this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode, + ((ArrayType) type).containsNull()); } else { childType = DataTypes.ByteType; childCapacity *= DEFAULT_ARRAY_LENGTH; + this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode); } - this.childColumns = new ColumnVector[1]; - this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode); this.resultArray = new Array(this.childColumns[0]); this.resultStruct = null; } else if (type instanceof StructType) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 2d1f3da8e7463..1cb8879bacb4b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -40,8 +40,8 @@ public final class OffHeapColumnVector extends ColumnVector { private long lengthData; private long offsetData; - protected OffHeapColumnVector(int capacity, DataType type) { - super(capacity, type, MemoryMode.OFF_HEAP); + protected OffHeapColumnVector(int capacity, DataType type, Boolean containsNull) { + super(capacity, type, MemoryMode.OFF_HEAP, containsNull); nulls = 0; data = 0; @@ -85,6 +85,9 @@ public void putNotNull(int rowId) { @Override public void putNull(int rowId) { + if(!containsNull) { + throw new RuntimeException("Not allowed to put null in this column."); + } Platform.putByte(null, nulls + rowId, (byte) 1); ++numNulls; anyNullsSet = true; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 506434364be48..a2e456ea1b78e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -51,8 +51,8 @@ public final class OnHeapColumnVector extends ColumnVector { private int[] arrayLengths; private int[] arrayOffsets; - protected OnHeapColumnVector(int capacity, DataType type) { - super(capacity, type, MemoryMode.ON_HEAP); + protected OnHeapColumnVector(int capacity, DataType type, Boolean containsNull) { + super(capacity, type, MemoryMode.ON_HEAP, containsNull); reserveInternal(capacity); reset(); } @@ -81,6 +81,9 @@ public void putNotNull(int rowId) { @Override public void putNull(int rowId) { + if(!containsNull) { + throw new RuntimeException("Not allowed to put null in this column."); + } nulls[rowId] = (byte)1; ++numNulls; anyNullsSet = true; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index ccf7aa7022a2a..624f766b4541e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -758,6 +758,35 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } + test("Putting null should fail when null is forbidden in array.") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => + val column = ColumnVector.allocate(10, new ArrayType(IntegerType, false), memMode) + val data = column.arrayData(); + data.putInt(0, 0) + data.putInt(1, 1) + assert(data.getInt(0) === 0) + assert(data.getInt(1) === 1) + val ex = intercept[RuntimeException] { + data.putNull(2) + } + assert(ex.getMessage.contains("Not allowed to put null in this column.")) + } + } + + test("Putting null should fail when null is forbidden in column.") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => + val column = ColumnVector.allocate(10, IntegerType, memMode, false) + column.putInt(0, 0) + column.putInt(1, 1) + assert(column.getInt(0) === 0) + assert(column.getInt(1) === 1) + val ex = intercept[RuntimeException] { + column.putNull(2) + } + assert(ex.getMessage.contains("Not allowed to put null in this column.")) + } + } + test("Struct Column") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val schema = new StructType().add("int", IntegerType).add("double", DoubleType) From 58e27140909106f45925d38465c2612c3e1cdb8f Mon Sep 17 00:00:00 2001 From: jinxing Date: Mon, 26 Jun 2017 08:27:37 +0800 Subject: [PATCH 2/4] fix code style --- .../apache/spark/sql/execution/vectorized/ColumnVector.java | 4 ++-- .../spark/sql/execution/vectorized/OffHeapColumnVector.java | 4 ++-- .../spark/sql/execution/vectorized/OnHeapColumnVector.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index d74c61cf98c38..efe33f7656479 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -69,7 +69,7 @@ public static ColumnVector allocate( int capacity, DataType type, MemoryMode mode, - Boolean containsNull) { + boolean containsNull) { if (mode == MemoryMode.OFF_HEAP) { return new OffHeapColumnVector(capacity, type, containsNull); } else { @@ -1062,7 +1062,7 @@ public ColumnVector getDictionaryIds() { * Sets up the common state and also handles creating the child columns if this is a nested * type. */ - protected ColumnVector(int capacity, DataType type, MemoryMode memMode, Boolean containsNull) { + protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean containsNull) { this.capacity = capacity; this.type = type; this.containsNull = containsNull; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 1cb8879bacb4b..504bfebbac2ec 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -40,7 +40,7 @@ public final class OffHeapColumnVector extends ColumnVector { private long lengthData; private long offsetData; - protected OffHeapColumnVector(int capacity, DataType type, Boolean containsNull) { + protected OffHeapColumnVector(int capacity, DataType type, boolean containsNull) { super(capacity, type, MemoryMode.OFF_HEAP, containsNull); nulls = 0; @@ -85,7 +85,7 @@ public void putNotNull(int rowId) { @Override public void putNull(int rowId) { - if(!containsNull) { + if (!containsNull) { throw new RuntimeException("Not allowed to put null in this column."); } Platform.putByte(null, nulls + rowId, (byte) 1); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index a2e456ea1b78e..fb26f4c2bed41 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -51,7 +51,7 @@ public final class OnHeapColumnVector extends ColumnVector { private int[] arrayLengths; private int[] arrayOffsets; - protected OnHeapColumnVector(int capacity, DataType type, Boolean containsNull) { + protected OnHeapColumnVector(int capacity, DataType type, boolean containsNull) { super(capacity, type, MemoryMode.ON_HEAP, containsNull); reserveInternal(capacity); reset(); @@ -81,7 +81,7 @@ public void putNotNull(int rowId) { @Override public void putNull(int rowId) { - if(!containsNull) { + if (!containsNull) { throw new RuntimeException("Not allowed to put null in this column."); } nulls[rowId] = (byte)1; From 1752a1abd1efa3118f06bbca9ab8cf46dd79de8a Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 29 Jun 2017 09:06:10 +0800 Subject: [PATCH 3/4] resolve cloud-fan's comments --- .../apache/spark/sql/execution/vectorized/ColumnVector.java | 4 ++++ .../spark/sql/execution/vectorized/OffHeapColumnVector.java | 3 +++ .../spark/sql/execution/vectorized/OnHeapColumnVector.java | 3 +++ 3 files changed, 10 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index efe33f7656479..e36f6f0c93337 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -1076,6 +1076,10 @@ protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean childType = ((ArrayType)type).elementType(); this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode, ((ArrayType) type).containsNull()); + } else if (type instanceof BinaryType || type instanceof StringType) { + childType = DataTypes.ByteType; + childCapacity *= DEFAULT_ARRAY_LENGTH; + this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode, false); } else { childType = DataTypes.ByteType; childCapacity *= DEFAULT_ARRAY_LENGTH; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 504bfebbac2ec..0ab0a8b02034a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -95,6 +95,9 @@ public void putNull(int rowId) { @Override public void putNulls(int rowId, int count) { + if (!containsNull) { + throw new RuntimeException("Not allowed to put nulls in this column."); + } long offset = nulls + rowId; for (int i = 0; i < count; ++i, ++offset) { Platform.putByte(null, offset, (byte) 1); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index fb26f4c2bed41..c13e632caa1c6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -91,6 +91,9 @@ public void putNull(int rowId) { @Override public void putNulls(int rowId, int count) { + if (!containsNull) { + throw new RuntimeException("Not allowed to put nulls in this column."); + } for (int i = 0; i < count; ++i) { nulls[rowId + i] = (byte)1; } From 32bc6fd4ec3ec1e388faa17624553a685a974b7f Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 11 Jul 2017 11:00:11 +0800 Subject: [PATCH 4/4] test putNulls. --- .../spark/sql/execution/vectorized/ColumnVector.java | 2 +- .../sql/execution/vectorized/ColumnarBatchSuite.scala | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index e36f6f0c93337..f12f9056e02ae 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -1079,7 +1079,7 @@ protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean } else if (type instanceof BinaryType || type instanceof StringType) { childType = DataTypes.ByteType; childCapacity *= DEFAULT_ARRAY_LENGTH; - this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode, false); + this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode, containsNull = false); } else { childType = DataTypes.ByteType; childCapacity *= DEFAULT_ARRAY_LENGTH; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 624f766b4541e..618f7569d4fc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -758,7 +758,7 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } - test("Putting null should fail when null is forbidden in array.") { + test("Putting null(s) should fail when null is forbidden in array.") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => val column = ColumnVector.allocate(10, new ArrayType(IntegerType, false), memMode) val data = column.arrayData(); @@ -766,10 +766,14 @@ class ColumnarBatchSuite extends SparkFunSuite { data.putInt(1, 1) assert(data.getInt(0) === 0) assert(data.getInt(1) === 1) - val ex = intercept[RuntimeException] { + val ex0 = intercept[RuntimeException] { data.putNull(2) } - assert(ex.getMessage.contains("Not allowed to put null in this column.")) + assert(ex0.getMessage.contains("Not allowed to put null in this column.")) + val ex1 = intercept[RuntimeException] { + data.putNulls(2, 2) + } + assert(ex1.getMessage.contains("Not allowed to put nulls in this column.")) } }