Skip to content

Commit

Permalink
merging in master
Browse files Browse the repository at this point in the history
  • Loading branch information
JDrit committed Jul 27, 2015
2 parents e49da48 + 945d8bc commit f8a9c6c
Show file tree
Hide file tree
Showing 40 changed files with 1,456 additions and 2,180 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions;

import org.apache.spark.annotation.DeveloperApi;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;

/**
* ::DeveloperApi::
* A function description type which can be recognized by FunctionRegistry, and will be used to
* show the usage of the function in human language.
*
* `usage()` will be used for the function usage in brief way.
* `extended()` will be used for the function usage in verbose way, suppose
* an example will be provided.
*
* And we can refer the function name by `_FUNC_`, in `usage` and `extended`, as it's
* registered in `FunctionRegistry`.
*/
@DeveloperApi
@Retention(RetentionPolicy.RUNTIME)
public @interface ExpressionDescription {
String usage() default "_FUNC_ is undocumented";
String extended() default "No example for _FUNC_.";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions;

/**
* Expression information, will be used to describe a expression.
*/
public class ExpressionInfo {
private String className;
private String usage;
private String name;
private String extended;

public String getClassName() {
return className;
}

public String getUsage() {
return usage;
}

public String getName() {
return name;
}

public String getExtended() {
return extended;
}

public ExpressionInfo(String className, String name, String usage, String extended) {
this.className = className;
this.name = name;
this.usage = usage;
this.extended = extended;
}

public ExpressionInfo(String className, String name) {
this(className, name, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,48 +30,75 @@ import org.apache.spark.sql.catalyst.util.StringKeyHashMap
/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */
trait FunctionRegistry {

def registerFunction(name: String, builder: FunctionBuilder): Unit
final def registerFunction(name: String, builder: FunctionBuilder): Unit = {
registerFunction(name, new ExpressionInfo(builder.getClass.getCanonicalName, name), builder)
}

def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder): Unit

@throws[AnalysisException]("If function does not exist")
def lookupFunction(name: String, children: Seq[Expression]): Expression

/* List all of the registered function names. */
def listFunction(): Seq[String]

/* Get the class of the registered function by specified name. */
def lookupFunction(name: String): Option[ExpressionInfo]
}

class SimpleFunctionRegistry extends FunctionRegistry {

private val functionBuilders = StringKeyHashMap[FunctionBuilder](caseSensitive = false)
private val functionBuilders =
StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false)

override def registerFunction(name: String, builder: FunctionBuilder): Unit = {
functionBuilders.put(name, builder)
override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
: Unit = {
functionBuilders.put(name, (info, builder))
}

override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
val func = functionBuilders.get(name).getOrElse {
val func = functionBuilders.get(name).map(_._2).getOrElse {
throw new AnalysisException(s"undefined function $name")
}
func(children)
}

override def listFunction(): Seq[String] = functionBuilders.iterator.map(_._1).toList.sorted

override def lookupFunction(name: String): Option[ExpressionInfo] = {
functionBuilders.get(name).map(_._1)
}
}

/**
* A trivial catalog that returns an error when a function is requested. Used for testing when all
* functions are already filled in and the analyzer needs only to resolve attribute references.
*/
object EmptyFunctionRegistry extends FunctionRegistry {
override def registerFunction(name: String, builder: FunctionBuilder): Unit = {
override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
: Unit = {
throw new UnsupportedOperationException
}

override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
throw new UnsupportedOperationException
}

override def listFunction(): Seq[String] = {
throw new UnsupportedOperationException
}

override def lookupFunction(name: String): Option[ExpressionInfo] = {
throw new UnsupportedOperationException
}
}


object FunctionRegistry {

type FunctionBuilder = Seq[Expression] => Expression

val expressions: Map[String, FunctionBuilder] = Map(
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
// misc non-aggregate functions
expression[Abs]("abs"),
expression[CreateArray]("array"),
Expand Down Expand Up @@ -205,13 +232,13 @@ object FunctionRegistry {

val builtin: FunctionRegistry = {
val fr = new SimpleFunctionRegistry
expressions.foreach { case (name, builder) => fr.registerFunction(name, builder) }
expressions.foreach { case (name, (info, builder)) => fr.registerFunction(name, info, builder) }
fr
}

/** See usage above. */
def expression[T <: Expression](name: String)
(implicit tag: ClassTag[T]): (String, FunctionBuilder) = {
(implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) = {

// See if we can find a constructor that accepts Seq[Expression]
val varargCtor = Try(tag.runtimeClass.getDeclaredConstructor(classOf[Seq[_]])).toOption
Expand All @@ -237,6 +264,15 @@ object FunctionRegistry {
}
}
}
(name, builder)

val clazz = tag.runtimeClass
val df = clazz.getAnnotation(classOf[ExpressionDescription])
if (df != null) {
(name,
(new ExpressionInfo(clazz.getCanonicalName, name, df.usage(), df.extended()),
builder))
} else {
(name, (new ExpressionInfo(clazz.getCanonicalName, name), builder))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ case class UnaryPositive(child: Expression) extends UnaryExpression with Expects
/**
* A function that get the absolute value of the numeric value.
*/
@ExpressionDescription(
usage = "_FUNC_(expr) - Returns the absolute value of the numeric value",
extended = "> SELECT _FUNC_('-1');\n1")
case class Abs(child: Expression)
extends UnaryExpression with ExpectsInputTypes with CodegenFallback {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ trait String2StringExpression extends ImplicitCastInputTypes {
/**
* A function that converts the characters of a string to uppercase.
*/
@ExpressionDescription(
usage = "_FUNC_(str) - Returns str with all characters changed to uppercase",
extended = "> SELECT _FUNC_('SparkSql');\n 'SPARKSQL'")
case class Upper(child: Expression)
extends UnaryExpression with String2StringExpression {

Expand All @@ -227,6 +230,9 @@ case class Upper(child: Expression)
/**
* A function that converts the characters of a string to lowercase.
*/
@ExpressionDescription(
usage = "_FUNC_(str) - Returns str with all characters changed to lowercase",
extended = "> SELECT _FUNC_('SparkSql');\n'sparksql'")
case class Lower(child: Expression) extends UnaryExpression with String2StringExpression {

override def convert(v: UTF8String): UTF8String = v.toLowerCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object PartialAggregation {
* A pattern that finds joins with equality conditions that can be evaluated using equi-join.
*/
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
/** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ case class WithWindowDefinition(
override def output: Seq[Attribute] = child.output
}

case class WriteToFile(
path: String,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,37 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.types.StringType

/**
* A logical node that represents a non-query command to be executed by the system. For example,
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
trait Command

/**
* Returned for the "DESCRIBE [EXTENDED] FUNCTION functionName" command.
* @param functionName The function to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
*/
private[sql] case class DescribeFunction(
functionName: String,
isExtended: Boolean) extends LogicalPlan with Command {

override def children: Seq[LogicalPlan] = Seq.empty
override val output: Seq[Attribute] = Seq(
AttributeReference("function_desc", StringType, nullable = false)())
}

/**
* Returned for the "SHOW FUNCTIONS" command, which will list all of the
* registered function list.
*/
private[sql] case class ShowFunctions(
db: Option[String], pattern: Option[String]) extends LogicalPlan with Command {
override def children: Seq[LogicalPlan] = Seq.empty
override val output: Seq[Attribute] = Seq(
AttributeReference("function", StringType, nullable = false)())
}
9 changes: 2 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ class DataFrame private[sql](
// happen right away to let these side effects take place eagerly.
case _: Command |
_: InsertIntoTable |
_: CreateTableUsingAsSelect |
_: WriteToFile =>
_: CreateTableUsingAsSelect =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
queryExecution.analyzed
Expand Down Expand Up @@ -1615,11 +1614,7 @@ class DataFrame private[sql](
*/
@deprecated("Use write.parquet(path)", "1.4.0")
def saveAsParquetFile(path: String): Unit = {
if (sqlContext.conf.parquetUseDataSourceApi) {
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import java.util.Properties

import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, Partition}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}

/**
* :: Experimental ::
Expand Down Expand Up @@ -259,7 +259,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}.toArray

sqlContext.baseRelationToDataFrame(
new ParquetRelation2(
new ParquetRelation(
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))
}
}
Expand Down
6 changes: 0 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,6 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "Enables Parquet filter push-down optimization when set to true.")

val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi",
defaultValue = Some(true),
doc = "<TODO>")

val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
key = "spark.sql.parquet.followParquetFormatSpec",
defaultValue = Some(false),
Expand Down Expand Up @@ -456,8 +452,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)

private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API)

private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Loading

0 comments on commit f8a9c6c

Please sign in to comment.