Skip to content

Commit

Permalink
[SPARK-13996] [SQL] Add more not null attributes for Filter codegen
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-13996

Filter codegen finds the attributes not null by checking IsNotNull(a) expression with a condition if child.output.contains(a). However, the current approach to checking it is not comprehensive. We can improve it.

E.g., for this plan:

    val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), Row(2, "2")))
    val schema = new StructType().add("k", IntegerType).add("v", StringType)
    val smallDF = sqlContext.createDataFrame(rdd, schema)
    val df = smallDF.filter("isnotnull(k + 1)")

The code snippet generated without this patch:

    /* 031 */   protected void processNext() throws java.io.IOException {
    /* 032 */     /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
    /* 033 */
    /* 034 */     /*** PRODUCE: INPUT */
    /* 035 */
    /* 036 */     while (!shouldStop() && inputadapter_input.hasNext()) {
    /* 037 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
    /* 038 */       /*** CONSUME: Filter isnotnull((k#0 + 1)) */
    /* 039 */       /* input[0, int] */
    /* 040 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
    /* 041 */       int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
    /* 042 */
    /* 043 */       /* isnotnull((input[0, int] + 1)) */
    /* 044 */       /* (input[0, int] + 1) */
    /* 045 */       boolean filter_isNull3 = true;
    /* 046 */       int filter_value3 = -1;
    /* 047 */
    /* 048 */       if (!filter_isNull) {
    /* 049 */         filter_isNull3 = false; // resultCode could change nullability.
    /* 050 */         filter_value3 = filter_value + 1;
    /* 051 */
    /* 052 */       }
    /* 053 */       if (!(!(filter_isNull3))) continue;
    /* 054 */
    /* 055 */       filter_metricValue.add(1);

With this patch:

    /* 031 */   protected void processNext() throws java.io.IOException {
    /* 032 */     /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
    /* 033 */
    /* 034 */     /*** PRODUCE: INPUT */
    /* 035 */
    /* 036 */     while (!shouldStop() && inputadapter_input.hasNext()) {
    /* 037 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
    /* 038 */       /*** CONSUME: Filter isnotnull((k#0 + 1)) */
    /* 039 */       /* input[0, int] */
    /* 040 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
    /* 041 */       int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
    /* 042 */
    /* 043 */       if (filter_isNull) continue;
    /* 044 */
    /* 045 */       filter_metricValue.add(1);

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #11810 from viirya/add-more-not-null-attrs.
  • Loading branch information
viirya authored and davies committed Apr 3, 2016
1 parent 1cf7018 commit c2f25b1
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,20 @@ case class Filter(condition: Expression, child: SparkPlan)

// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
case IsNotNull(a) if child.output.exists(_.semanticEquals(a)) => true
case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true
case _ => false
}

// The columns that will filtered out by `IsNotNull` could be considered as not nullable.
private val notNullAttributes = notNullPreds.flatMap(_.references)
private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId)

// Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate
// all the variables at the beginning to take advantage of short circuiting.
override def usedInputs: AttributeSet = AttributeSet.empty

override def output: Seq[Attribute] = {
child.output.map { a =>
if (a.nullable && notNullAttributes.exists(_.semanticEquals(a))) {
if (a.nullable && notNullAttributes.contains(a.exprId)) {
a.withNullability(false)
} else {
a
Expand Down Expand Up @@ -179,7 +179,7 @@ case class Filter(condition: Expression, child: SparkPlan)
// Reset the isNull to false for the not-null columns, then the followed operators could
// generate better code (remove dead branches).
val resultVars = input.zipWithIndex.map { case (ev, i) =>
if (notNullAttributes.exists(_.semanticEquals(child.output(i)))) {
if (notNullAttributes.contains(child.output(i).exprId)) {
ev.isNull = "false"
}
ev
Expand Down

0 comments on commit c2f25b1

Please sign in to comment.