Skip to content

Commit

Permalink
merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
kiszk committed Jan 20, 2017
1 parent 3eb4ebe commit b15d9d5
Showing 1 changed file with 0 additions and 164 deletions.
@@ -1,4 +1,3 @@
<<<<<<< fc1f6aa66c67e7d97ded9a9816ad0fc3c7439ca2
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -150,166 +149,3 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
}

}
=======
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector, OnHeapUnsafeColumnVector}
import org.apache.spark.sql.types.DataType


/**
* Helper trait for abstracting scan functionality using
* [[org.apache.spark.sql.execution.vectorized.ColumnarBatch]]es.
*/
private[sql] trait ColumnarBatchScan extends CodegenSupport {

val columnIndexes: Array[Int] = null

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))

val inMemoryTableScan: InMemoryTableScanExec = null

lazy val enableScanStatistics: Boolean =
sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean

/**
* Generate [[ColumnVector]] expressions for our parent to consume as rows.
* This is called once per [[ColumnarBatch]].
*/
private def genCodeColumnVector(
ctx: CodegenContext,
columnVar: String,
ordinal: String,
dataType: DataType,
nullable: Boolean): ExprCode = {
val javaType = ctx.javaType(dataType)
val value = ctx.getValue(columnVar, dataType, ordinal)
val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
val valueVar = ctx.freshName("value")
val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
s"""
boolean $isNullVar = $columnVar.isNullAt($ordinal);
$javaType $valueVar = $isNullVar ? ${ctx.defaultValue(dataType)} : ($value);
"""
} else {
s"$javaType $valueVar = $value;"
}).trim
ExprCode(code, isNullVar, valueVar)
}

/**
* Produce code to process the input iterator as [[ColumnarBatch]]es.
* This produces an [[UnsafeRow]] for each row in each batch.
*/
// TODO: return ColumnarBatch.Rows instead
override protected def doProduce(ctx: CodegenContext): String = {
val input = ctx.freshName("input")
// PhysicalRDD always just has one input
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")

// metrics
val numOutputRows = metricTerm(ctx, "numOutputRows")
val scanTimeMetric = metricTerm(ctx, "scanTime")
val scanTimeTotalNs = ctx.freshName("scanTime")
ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
val incReadBatches = if (!enableScanStatistics) "" else {
val readPartitions = ctx.addReferenceObj("readPartitions", inMemoryTableScan.readPartitions)
val readBatches = ctx.addReferenceObj("readBatches", inMemoryTableScan.readBatches)
ctx.addMutableState("int", "initializeInMemoryTableScanStatistics",
s"""
|$readPartitions.setValue(0);
|$readBatches.setValue(0);
|if ($input.hasNext()) { $readPartitions.add(1); }
""".stripMargin)
s"$readBatches.add(1);"
}

val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
val batch = ctx.freshName("batch")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")

val generateDecompress = if (inMemoryTableScan != null) true else false
val confVar = if (!generateDecompress) null else {
val conf = inMemoryTableScan.sparkContext.conf
ctx.addReferenceObj("conf", conf, classOf[SparkConf].getName)
}
val onHeapUnsafeColumnVectorCls = classOf[OnHeapUnsafeColumnVector].getName

val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val idx = ctx.freshName("batchIdx")
ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
val index = if (columnIndexes == null) i else columnIndexes(i)
val decompress = if (!generateDecompress) ""
else s" (($onHeapUnsafeColumnVectorCls)$name).decompress($confVar);"
s"$name = $batch.column($index);$decompress"
}

val nextBatch = ctx.freshName("nextBatch")
ctx.addNewFunction(nextBatch,
s"""
|private void $nextBatch() throws java.io.IOException {
| long getBatchStart = System.nanoTime();
| if ($input.hasNext()) {
| $batch = ($columnarBatchClz)$input.next();
| $incReadBatches
| $numOutputRows.add($batch.numRows());
| $idx = 0;
| ${columnAssigns.mkString("", "\n", "\n")}
| }
| $scanTimeTotalNs += System.nanoTime() - getBatchStart;
|}""".stripMargin)

ctx.currentVars = null
val rowidx = ctx.freshName("rowIdx")
val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
}
s"""
|if ($batch == null) {
| $nextBatch();
|}
|while ($batch != null) {
| int numRows = $batch.numRows();
| while ($idx < numRows) {
| int $rowidx = $idx++;
| ${consume(ctx, columnsBatchInput).trim}
| if (shouldStop()) return;
| }
| $batch = null;
| $nextBatch();
|}
|$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
|$scanTimeTotalNs = 0;
""".stripMargin
}

}
>>>>>>> support UDT and column pruning

0 comments on commit b15d9d5

Please sign in to comment.