From 903de0191870278237fc59f087a4e62216c0bd65 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 14 Aug 2015 12:59:42 -0700 Subject: [PATCH 1/2] [SPARK-9984][SQL] Create local physical operator interface. --- .../sql/execution/local/FilterNode.scala | 47 ++++++++++ .../spark/sql/execution/local/LocalNode.scala | 89 +++++++++++++++++++ .../sql/execution/local/ProjectNode.scala | 42 +++++++++ .../sql/execution/local/SeqScanNode.scala | 49 ++++++++++ 4 files changed, 227 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala new file mode 100644 index 0000000000000..a485a1a1d7ae4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala @@ -0,0 +1,47 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate + + +case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode { + + private[this] var predicate: (InternalRow) => Boolean = _ + + override def output: Seq[Attribute] = child.output + + override def open(): Unit = { + child.open() + predicate = GeneratePredicate.generate(condition, child.output) + } + + override def next(): Boolean = { + var found = false + while (child.next() && !found) { + found = predicate.apply(child.get()) + } + found + } + + override def get(): InternalRow = child.get() + + override def close(): Unit = child.close() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala new file mode 100644 index 0000000000000..e02ec8aa18069 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.types.StructType + + +/** + * A local physical operator, in the form of an iterator. + * + * Before consuming the iterator, open function must be called. + * After consuming the iterator, close function must be called. + */ +abstract class LocalNode extends TreeNode[LocalNode] { + + def output: Seq[Attribute] + + /** + * Initializes the iterator state. Must be called before calling `next()`. + * + * Implementations of this must also call the `open()` function of its children. + */ + def open(): Unit + + /** + * Advances the iterator to the next tuple. Returns true if there is at least one more tuple. + */ + def next(): Boolean + + /** + * Returns the current tuple. + */ + def get(): InternalRow + + /** + * Closes the iterator and releases all resources. + * + * Implementations of this must also call the `close()` function of its children. + */ + def close(): Unit + + /** + * Returns the content of the iterator from the beginning to the end in the form of a Scala Seq. + */ + def collect(): Seq[Row] = { + val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output)) + val result = new ArrayBuffer[Row] + open() + while (next()) { + result += converter.apply(get()).asInstanceOf[Row] + } + close() + result + } +} + + +abstract class LeafLocalNode extends LocalNode { + override def children: Seq[LocalNode] = Seq.empty +} + + +abstract class UnaryLocalNode extends LocalNode { + + def child: LocalNode + + override def children: Seq[LocalNode] = Seq(child) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala new file mode 100644 index 0000000000000..e574d1473cdcb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala @@ -0,0 +1,42 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, NamedExpression} + + +case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode { + + private[this] var project: UnsafeProjection = _ + + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def open(): Unit = { + project = UnsafeProjection.create(projectList, child.output) + child.open() + } + + override def next(): Boolean = child.next() + + override def get(): InternalRow = { + project.apply(child.get()) + } + + override def close(): Unit = child.close() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala new file mode 100644 index 0000000000000..994de8afa9a02 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala @@ -0,0 +1,49 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute + +/** + * An operator that scans some local data collection in the form of Scala Seq. + */ +case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode { + + private[this] var iterator: Iterator[InternalRow] = _ + private[this] var currentRow: InternalRow = _ + + override def open(): Unit = { + iterator = data.iterator + } + + override def next(): Boolean = { + if (iterator.hasNext) { + currentRow = iterator.next() + true + } else { + false + } + } + + override def get(): InternalRow = currentRow + + override def close(): Unit = { + // Do nothing + } +} From 1f2291c6b49aba65279b7ffba5e2a0de33ed2077 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 14 Aug 2015 13:02:29 -0700 Subject: [PATCH 2/2] Removed Scala ArrayBuffer import. --- .../org/apache/spark/sql/execution/local/LocalNode.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala index e02ec8aa18069..341c81438e6d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala @@ -17,15 +17,12 @@ package org.apache.spark.sql.execution.local -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types.StructType - /** * A local physical operator, in the form of an iterator. * @@ -65,7 +62,7 @@ abstract class LocalNode extends TreeNode[LocalNode] { */ def collect(): Seq[Row] = { val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output)) - val result = new ArrayBuffer[Row] + val result = new scala.collection.mutable.ArrayBuffer[Row] open() while (next()) { result += converter.apply(get()).asInstanceOf[Row]