Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -514,22 +514,26 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType)
override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
val rowClass = classOf[GenericRowWithSchema].getName
val values = ctx.freshName("values")
val schemaField = ctx.addReferenceObj("schema", schema)
s"""
boolean ${ev.isNull} = false;
final Object[] $values = new Object[${children.size}];
""" +
children.zipWithIndex.map { case (e, i) =>
val eval = e.gen(ctx)
eval.code + s"""
ctx.addMutableState("Object[]", values, "")

val childrenCodes = children.zipWithIndex.map { case (e, i) =>
val eval = e.gen(ctx)
eval.code + s"""
if (${eval.isNull}) {
$values[$i] = null;
} else {
$values[$i] = ${eval.value};
}
"""
}.mkString("\n") +
s"final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);"
}
val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes)
val schemaField = ctx.addReferenceObj("schema", schema)
s"""
boolean ${ev.isNull} = false;
$values = new Object[${children.size}];
$childrenCode
final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);
"""
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
Expand Down Expand Up @@ -99,20 +100,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;

/**
* Tries to initialize the reader for this split. Returns true if this reader supports reading
* this split and false otherwise.
*/
public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
try {
initialize(inputSplit, taskAttemptContext);
return true;
} catch (UnsupportedOperationException e) {
return false;
}
}

/**
* Implementation of RecordReader API.
*/
Expand Down Expand Up @@ -222,7 +209,7 @@ public ColumnarBatch resultBatch() {
return columnarBatch;
}

/**
/*
* Can be called before any rows are returned to enable returning columnar batches directly.
*/
public void enableReturningBatches() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class SQLContext private[sql](
*/
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
protected[sql] def conf: SQLConf = sessionState.conf
protected[spark] def conf: SQLConf = sessionState.conf

/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
Expand Down
Loading