From fea460f650177524397ea88f53d67f9df411a36c Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 10 Mar 2016 16:26:18 -0800 Subject: [PATCH 1/4] Filter null columns in ColumnarBatch --- .../execution/vectorized/ColumnarBatch.java | 34 +++++++++++++++++-- .../spark/sql/execution/ExistingRDD.scala | 8 +++-- .../execution/WholeStageCodegenSuite.scala | 6 ++++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 09c001baaeafd..2ceaa2bf61145 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -19,9 +19,9 @@ import java.util.Arrays; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Vector; import org.apache.commons.lang.NotImplementedException; - import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericMutableRow; @@ -58,6 +58,9 @@ public final class ColumnarBatch { // True if the row is filtered. private final boolean[] filteredRows; + // Column indices that cannot have null values. + private final Vector nullFilteredColumns; + // Total number of rows that have been filtered. private int numRowsFiltered = 0; @@ -233,6 +236,21 @@ public final Object get(int ordinal, DataType dataType) { } } + /** + * Marks a given row as "filtered" if one of its attributes is part of a non-nullable column + * + * @return true if a given rowId can be filtered + */ + public boolean shouldSkipRow(int rowId) { + for (int ordinal : nullFilteredColumns) { + if (columns[ordinal].getIsNull(rowId)) { + filteredRows[rowId] = true; + break; + } + } + return filteredRows[rowId]; + } + /** * Returns an iterator over the rows in this batch. This skips rows that are filtered out. */ @@ -244,7 +262,7 @@ public Iterator rowIterator() { @Override public boolean hasNext() { - while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) { + while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId] && shouldSkipRow(rowId)) { ++rowId; } return rowId < maxRows; @@ -345,15 +363,25 @@ public ColumnarBatch.Row getRow(int rowId) { * in this batch will not include this row. */ public final void markFiltered(int rowId) { - assert(filteredRows[rowId] == false); + assert(!filteredRows[rowId]); filteredRows[rowId] = true; ++numRowsFiltered; } + /** + * Marks a given column as non-nullable. Any row that has a NULL value for the corresponding + * attribute is filtered out. + */ + public final void filterNullsInColumn(int ordinal) { + assert(!nullFilteredColumns.contains(ordinal)); + nullFilteredColumns.add(ordinal); + } + private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) { this.schema = schema; this.capacity = maxRows; this.columns = new ColumnVector[schema.size()]; + this.nullFilteredColumns = new Vector<>(); this.filteredRows = new boolean[maxRows]; for (int i = 0; i < schema.fields().length; ++i) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index e97c6be7f177a..87fd901e6de16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -199,6 +199,7 @@ private[sql] case class DataSourceScan( // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val columnarBatchRowClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row" val input = ctx.freshName("input") val idx = ctx.freshName("batchIdx") val batch = ctx.freshName("batch") @@ -226,10 +227,12 @@ private[sql] case class DataSourceScan( | private void $scanBatches() throws java.io.IOException { | while (true) { | int numRows = $batch.numRows(); + | java.util.Iterator<$columnarBatchRowClz> rowIterator = $batch.rowIterator(); | if ($idx == 0) $numOutputRows.add(numRows); | - | while (!shouldStop() && $idx < numRows) { - | InternalRow $row = $batch.getRow($idx++); + | while (!shouldStop() && rowIterator.hasNext()) { + | InternalRow $row = ($columnarBatchRowClz)rowIterator.next(); + | $idx++; | ${consume(ctx, columns1).trim} | } | if (shouldStop()) return; @@ -239,6 +242,7 @@ private[sql] case class DataSourceScan( | break; | } | $batch = ($columnarBatchClz)$input.next(); + | // $batch.filterNullsInColumn() | $idx = 0; | } | }""".stripMargin) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 716c367eae551..d9305a86ebee4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -27,6 +27,12 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { + setupTestData() + + test("test null filtering") { + println(sql("select count(*) from testData where value is not NULL AND key > 5").collect()) + } + test("range/filter should be combined") { val df = sqlContext.range(10).filter("id = 1").selectExpr("id + 1") val plan = df.queryExecution.executedPlan From 4fa94a221e0da8d27e1c1c35fbc06243711d34f3 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Mon, 14 Mar 2016 23:29:47 -0700 Subject: [PATCH 2/4] Parquet Read Benchmark --- .../execution/vectorized/ColumnarBatch.java | 33 +++--- .../spark/sql/execution/ExistingRDD.scala | 8 +- .../execution/WholeStageCodegenSuite.scala | 6 -- .../parquet/ParquetReadBenchmark.scala | 102 ++++++++++++++++++ 4 files changed, 119 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 2ceaa2bf61145..1cfc81139ff34 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -236,21 +236,6 @@ public final Object get(int ordinal, DataType dataType) { } } - /** - * Marks a given row as "filtered" if one of its attributes is part of a non-nullable column - * - * @return true if a given rowId can be filtered - */ - public boolean shouldSkipRow(int rowId) { - for (int ordinal : nullFilteredColumns) { - if (columns[ordinal].getIsNull(rowId)) { - filteredRows[rowId] = true; - break; - } - } - return filteredRows[rowId]; - } - /** * Returns an iterator over the rows in this batch. This skips rows that are filtered out. */ @@ -262,7 +247,7 @@ public Iterator rowIterator() { @Override public boolean hasNext() { - while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId] && shouldSkipRow(rowId)) { + while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) { ++rowId; } return rowId < maxRows; @@ -302,11 +287,23 @@ public void reset() { } /** - * Sets the number of rows that are valid. + * Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or + * more of their attributes are part of a non-nullable column. */ public void setNumRows(int numRows) { - assert(numRows <= this.capacity); + assert (numRows <= this.capacity); this.numRows = numRows; + + for (int ordinal : nullFilteredColumns) { + if (columns[ordinal].numNulls != 0) { + for (int rowId = 0; rowId < numRows; rowId++) { + if (!filteredRows[rowId] && columns[ordinal].getIsNull(rowId)) { + filteredRows[rowId] = true; + ++numRowsFiltered; + } + } + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 87fd901e6de16..e97c6be7f177a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -199,7 +199,6 @@ private[sql] case class DataSourceScan( // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" - val columnarBatchRowClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row" val input = ctx.freshName("input") val idx = ctx.freshName("batchIdx") val batch = ctx.freshName("batch") @@ -227,12 +226,10 @@ private[sql] case class DataSourceScan( | private void $scanBatches() throws java.io.IOException { | while (true) { | int numRows = $batch.numRows(); - | java.util.Iterator<$columnarBatchRowClz> rowIterator = $batch.rowIterator(); | if ($idx == 0) $numOutputRows.add(numRows); | - | while (!shouldStop() && rowIterator.hasNext()) { - | InternalRow $row = ($columnarBatchRowClz)rowIterator.next(); - | $idx++; + | while (!shouldStop() && $idx < numRows) { + | InternalRow $row = $batch.getRow($idx++); | ${consume(ctx, columns1).trim} | } | if (shouldStop()) return; @@ -242,7 +239,6 @@ private[sql] case class DataSourceScan( | break; | } | $batch = ($columnarBatchClz)$input.next(); - | // $batch.filterNullsInColumn() | $idx = 0; | } | }""".stripMargin) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index d9305a86ebee4..716c367eae551 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -27,12 +27,6 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { - setupTestData() - - test("test null filtering") { - println(sql("select count(*) from testData where value is not NULL AND key > 5").collect()) - } - test("range/filter should be combined") { val df = sqlContext.range(10).filter("id = 1").selectExpr("id + 1") val plan = df.queryExecution.executedPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index 38c3618a82ef9..f3fd3976c5a84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -299,10 +299,112 @@ object ParquetReadBenchmark { } } + def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = { + withTempPath { dir => + withTempTable("t1", "tempTable") { + sqlContext.range(values).registerTempTable("t1") + sqlContext.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " + + s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1") + .write.parquet(dir.getCanonicalPath) + sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable") + + val benchmark = new Benchmark("String with Nulls Scan", values) + + benchmark.addCase("SQL Parquet Vectorized") { iter => + sqlContext.sql("select sum(length(c2)) from tempTable where c1 is " + + "not NULL and c2 is not NULL").collect() + } + + val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray + benchmark.addCase("PR Vectorized") { num => + var sum = 0 + files.map(_.asInstanceOf[String]).foreach { p => + val reader = new UnsafeRowParquetRecordReader + try { + reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) + val batch = reader.resultBatch() + while (reader.nextBatch()) { + val rowIterator = batch.rowIterator() + while (rowIterator.hasNext) { + val row = rowIterator.next() + val value = row.getUTF8String(0) + if (!row.isNullAt(0) && !row.isNullAt(1)) sum += value.numBytes() + } + } + } finally { + reader.close() + } + } + } + + benchmark.addCase("PR Vectorized (Null Filtering)") { num => + var sum = 0L + files.map(_.asInstanceOf[String]).foreach { p => + val reader = new UnsafeRowParquetRecordReader + try { + reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) + val batch = reader.resultBatch() + batch.filterNullsInColumn(0) + batch.filterNullsInColumn(1) + while (reader.nextBatch()) { + val rowIterator = batch.rowIterator() + while (rowIterator.hasNext) { + sum += rowIterator.next().getUTF8String(0).numBytes() + } + } + } finally { + reader.close() + } + } + } + + /* + ======================= + Fraction of NULLs: 0 + ======================= + + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + SQL Parquet Vectorized 1164 / 1333 9.0 111.0 1.0X + PR Vectorized 809 / 882 13.0 77.1 1.4X + PR Vectorized (Null Filtering) 723 / 800 14.5 69.0 1.6X + + ======================= + Fraction of NULLs: 0.5 + ======================= + + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + SQL Parquet Vectorized 983 / 1001 10.7 93.8 1.0X + PR Vectorized 699 / 728 15.0 66.7 1.4X + PR Vectorized (Null Filtering) 722 / 746 14.5 68.9 1.4X + + ======================= + Fraction of NULLs: 0.95 + ======================= + + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + SQL Parquet Vectorized 332 / 343 31.6 31.6 1.0X + PR Vectorized 177 / 180 59.1 16.9 1.9X + PR Vectorized (Null Filtering) 168 / 175 62.4 16.0 2.0X + */ + + benchmark.run() + } + } + } + def main(args: Array[String]): Unit = { intScanBenchmark(1024 * 1024 * 15) intStringScanBenchmark(1024 * 1024 * 10) stringDictionaryScanBenchmark(1024 * 1024 * 10) partitionTableScanBenchmark(1024 * 1024 * 15) + for (fractionOfNulls <- List(0.0, 0.50, 0.95)) { + stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls) + } } } From 2d1066fc853d7d7a9b9ed84af129ab17168f0753 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 16 Mar 2016 12:29:23 -0700 Subject: [PATCH 3/4] unit test in ColumnarBatchSuite --- .../execution/vectorized/ColumnarBatch.java | 12 +++---- .../parquet/ParquetReadBenchmark.scala | 36 +++++++------------ .../vectorized/ColumnarBatchSuite.scala | 29 +++++++++++++++ 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 1cfc81139ff34..f701872ffb974 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -16,12 +16,10 @@ */ package org.apache.spark.sql.execution.vectorized; -import java.util.Arrays; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Vector; +import java.util.*; import org.apache.commons.lang.NotImplementedException; + import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericMutableRow; @@ -59,7 +57,7 @@ public final class ColumnarBatch { private final boolean[] filteredRows; // Column indices that cannot have null values. - private final Vector nullFilteredColumns; + private final Set nullFilteredColumns; // Total number of rows that have been filtered. private int numRowsFiltered = 0; @@ -291,7 +289,7 @@ public void reset() { * more of their attributes are part of a non-nullable column. */ public void setNumRows(int numRows) { - assert (numRows <= this.capacity); + assert(numRows <= this.capacity); this.numRows = numRows; for (int ordinal : nullFilteredColumns) { @@ -378,7 +376,7 @@ private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) { this.schema = schema; this.capacity = maxRows; this.columns = new ColumnVector[schema.size()]; - this.nullFilteredColumns = new Vector<>(); + this.nullFilteredColumns = new HashSet<>(); this.filteredRows = new boolean[maxRows]; for (int i = 0; i < schema.fields().length; ++i) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index f3fd3976c5a84..15bf00e6f47e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -359,38 +359,26 @@ object ParquetReadBenchmark { } /* - ======================= - Fraction of NULLs: 0 - ======================= - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + String with Nulls Scan (0%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - SQL Parquet Vectorized 1164 / 1333 9.0 111.0 1.0X - PR Vectorized 809 / 882 13.0 77.1 1.4X - PR Vectorized (Null Filtering) 723 / 800 14.5 69.0 1.6X - - ======================= - Fraction of NULLs: 0.5 - ======================= + SQL Parquet Vectorized 1229 / 1648 8.5 117.2 1.0X + PR Vectorized 833 / 846 12.6 79.4 1.5X + PR Vectorized (Null Filtering) 732 / 782 14.3 69.8 1.7X Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + String with Nulls Scan (50%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - SQL Parquet Vectorized 983 / 1001 10.7 93.8 1.0X - PR Vectorized 699 / 728 15.0 66.7 1.4X - PR Vectorized (Null Filtering) 722 / 746 14.5 68.9 1.4X - - ======================= - Fraction of NULLs: 0.95 - ======================= + SQL Parquet Vectorized 995 / 1053 10.5 94.9 1.0X + PR Vectorized 732 / 772 14.3 69.8 1.4X + PR Vectorized (Null Filtering) 725 / 790 14.5 69.1 1.4X Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + String with Nulls Scan (95%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - SQL Parquet Vectorized 332 / 343 31.6 31.6 1.0X - PR Vectorized 177 / 180 59.1 16.9 1.9X - PR Vectorized (Null Filtering) 168 / 175 62.4 16.0 2.0X + SQL Parquet Vectorized 326 / 333 32.2 31.1 1.0X + PR Vectorized 190 / 200 55.1 18.2 1.7X + PR Vectorized (Null Filtering) 168 / 172 62.2 16.1 1.9X */ benchmark.run() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index ed97f59ea1690..fa2c74431ab45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -727,4 +727,33 @@ class ColumnarBatchSuite extends SparkFunSuite { test("Random nested schema") { testRandomRows(false, 30) } + + test("null filtered columns") { + val NUM_ROWS = 10 + val schema = new StructType() + .add("key", IntegerType, nullable = false) + .add("value", StringType, nullable = true) + for (numNulls <- List(0, NUM_ROWS / 2, NUM_ROWS)) { + val rows = mutable.ArrayBuffer.empty[Row] + for (i <- 0 until NUM_ROWS) { + val row = if (i < numNulls) Row.fromSeq(Seq(i, null)) else Row.fromSeq(Seq(i, i.toString)) + rows += row + } + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava) + batch.filterNullsInColumn(1) + batch.setNumRows(NUM_ROWS) + assert(batch.numRows() == NUM_ROWS) + val it = batch.rowIterator() + // Top numNulls rows should be filtered + var k = numNulls + while (it.hasNext) { + assert(it.next().getInt(0) == k) + k += 1 + } + assert(k == NUM_ROWS) + batch.close() + }} + } + } } From 0688cf84958552132aaa8ada960b9c4880b437e6 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 16 Mar 2016 14:22:00 -0700 Subject: [PATCH 4/4] Nong's comments --- .../org/apache/spark/sql/execution/vectorized/ColumnarBatch.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index f701872ffb974..c462ab1a13bb3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -368,7 +368,6 @@ public final void markFiltered(int rowId) { * attribute is filtered out. */ public final void filterNullsInColumn(int ordinal) { - assert(!nullFilteredColumns.contains(ordinal)); nullFilteredColumns.add(ordinal); }