Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
juliuszsompolski committed Nov 26, 2018
1 parent ea93f10 commit 170073e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,8 @@ case class LocalTableScanExec(
taken
}

// Does not need to create an UnsafeProjection - input is already always UnsafeRows
override protected val createUnsafeProjection: Boolean = false

override def inputRDD: RDD[InternalRow] = rdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ trait InputRDDCodegen extends CodegenSupport {

def inputRDD: RDD[InternalRow]

// If the input is an RDD of InternalRow which are potentially not UnsafeRow,
// and there is no parent to consume it, it needs an UnsafeProjection.
protected val createUnsafeProjection: Boolean = (parent == null)

override def inputRDDs(): Seq[RDD[InternalRow]] = {
inputRDD :: Nil
}
Expand All @@ -431,10 +435,29 @@ trait InputRDDCodegen extends CodegenSupport {
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")

val outputVars = if (createUnsafeProjection) {
// creating the vars will make the parent consume add an unsafe projection.
ctx.INPUT_ROW = row
ctx.currentVars = null
output.zipWithIndex.map { case (a, i) =>
BoundReference(i, a.dataType, a.nullable).genCode(ctx)
}
} else {
null
}

val numOutputRowsCode = if (metrics.contains("numOutputRows")) {
val numOutputRows = metricTerm(ctx, "numOutputRows")
s"$numOutputRows.add(1);"
} else {
""
}
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${consume(ctx, null, row).trim}
| ${numOutputRowsCode}
| ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim}
| ${shouldStopCheckCode}
| }
""".stripMargin
Expand Down

0 comments on commit 170073e

Please sign in to comment.