Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cec946e
Add field expression
Dec 21, 2016
0160654
Add field expression unit test
Dec 21, 2016
b441ef6
Add doGenCode and modify the function description
Dec 28, 2016
320fd67
Add field expression unit test cases
Dec 28, 2016
f340084
Modify function description
Dec 28, 2016
a864449
Add support for field children which is not List
Dec 29, 2016
04ccadc
Add field function API for DataFrame and unit test cases
Dec 29, 2016
657742f
Modify @since version
Dec 29, 2016
62e62dc
Add Python API
Dec 29, 2016
0edc0e0
Add Apache License
Dec 29, 2016
39a4181
Modify import order
Dec 30, 2016
16af01a
Move Field class def to ConditionalExpressions and modify the test
Jan 5, 2017
35be82e
Modify the line feed's location
Jan 5, 2017
45a58a7
Move Field function test to ColumnExpressionSuite
Jan 5, 2017
0eb9817
Add null support and corresponding test cases
Jan 5, 2017
e78698c
Remove blank lines
Jan 5, 2017
4fea138
Modify field function definition to require >= 2 params
Jan 5, 2017
7f0f24b
Add blank line
Jan 5, 2017
40b3ad2
Modify the typesetting and use syntax sugar to beautify the code
Jan 6, 2017
7d347c2
Remove Dataset API, simplify the test cases, and add some comments
Jan 8, 2017
46c3986
Tune the code
Jan 8, 2017
4761687
Improve the performance and add the tail-rec annotation
Jan 9, 2017
6954b27
Add description in comment for strNull match
Jan 9, 2017
c565e6c
Improve the performance by reducing the type check and giving up patt…
Jan 9, 2017
7f0a7e9
Improve the performance by only looping through columns where dataTyp…
Jan 9, 2017
e1914ea
Add explicit NULL support
Jan 9, 2017
321c373
Add more comments/docs for parameters with multi types
Jan 9, 2017
ce776ec
Remove unused lines and tune the code
Jan 10, 2017
c8ed3d4
Change size to length
Jan 11, 2017
1c61252
Remove unused import and tune the doc
Jan 17, 2017
76278fc
Tune the doc further
Jan 19, 2017
ab990fd
Optimization done and align with scala style requirement
Feb 6, 2017
d71245f
Switch off scalastyle check in test to allow non-ascii characters
Feb 7, 2017
e89ae0f
Remove whitespace
Feb 8, 2017
082189a
Modify type check failure hint
Feb 16, 2017
32fa22b
Support implicit cast
Mar 16, 2017
b5d1675
Not do implicit cast while all have the same type
Mar 16, 2017
4b63e94
Update docs
Mar 22, 2017
c75d786
Fix
Jul 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ object FunctionRegistry {
expression[Coalesce]("coalesce"),
expression[Explode]("explode"),
expressionGeneratorOuter[Explode]("explode_outer"),
expression[Field]("field"),
expression[Greatest]("greatest"),
expression[If]("if"),
expression[Inline]("inline"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,21 @@ object TypeCoercion {
}
e.withNewChildren(children)

case e: ImplicitCastInputTypesToSameType =>
if (e.children.map(_.dataType).count(_ != e.children.head.dataType) == 0) {
e
} else {
val children: Seq[Expression] =
if (NumericType.acceptsType(e.children.head.dataType)) {
e.children.map(child => ImplicitTypeCasts.implicitCast(child, DoubleType).
getOrElse(Literal.create(null, DoubleType)))
} else {
e.children.map(child => ImplicitTypeCasts.implicitCast(child, StringType).
getOrElse(Literal.create(null, StringType)))
}
e.withNewChildren(children)
}

case e: ExpectsInputTypes if e.inputTypes.nonEmpty =>
// Convert NullType into some specific target type for ExpectsInputTypes that don't do
// general implicit casting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,12 @@ trait ExpectsInputTypes extends Expression {
trait ImplicitCastInputTypes extends ExpectsInputTypes {
// No other methods
}


/**
* A mixin for the analyzer to perform implicit type casting all items to the same type using
* [[org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts]].
*/
trait ImplicitCastInputTypesToSameType extends ExpectsInputTypes {
// No other methods
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.catalyst.expressions

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._

// scalastyle:off line.size.limit
Expand Down Expand Up @@ -313,3 +316,110 @@ object CaseKeyWhen {
CaseWhen(cases, elseValue)
}
}

/**
* A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found.
* It takes at least 2 parameters, and all parameters can be of any type.
* Implicit cast will be done when at least 2 parameters have different types, and it will be based
* on the first parameter's type.
* If the first parameter is of NumericType, all parameters will be implicitly cast to DoubleType,
* and those that can't be cast to DoubleType will be regarded as NULL.
* If the first parameter is of any other type, all parameters will be implicitly cast to StringType
* and the comparison will follow String's comparing rules.
* If the search expression is NULL, the return value is 0 because NULL fails equality comparison
* with any value.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.",
extended = """
Examples:
> SELECT _FUNC_(10, 9, 3, 10, 4);
3
> SELECT _FUNC_('a', 'b', 'c', 'd', 'a');
4
> SELECT _FUNC_('999', 'a', 999.0, 999, '999');
2
""")
// scalastyle:on line.size.limit
case class Field(children: Seq[Expression]) extends Expression
with ImplicitCastInputTypesToSameType {

/** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */
override def nullable: Boolean = false
override def foldable: Boolean = children.forall(_.foldable)

private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType)

override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(
TypeCollection(
BinaryType, BooleanType, DateType, NumericType, StringType, TimestampType, NullType))

override def checkInputDataTypes(): TypeCheckResult = {
val result = super.checkInputDataTypes()
if (result == TypeCheckResult.TypeCheckSuccess) {
if (children.length <= 1) {
TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments")
} else {
TypeCheckResult.TypeCheckSuccess
}
} else {
result
}
}

override def dataType: DataType = IntegerType

override def eval(input: InternalRow): Any = {
val target = children.head.eval(input)
@tailrec def findEqual(index: Int): Int = {
if (index == children.length) {
0
} else {
val value = children(index).eval(input)
if (value != null && ordering.equiv(target, value)) {
index
} else {
findEqual(index + 1)
}
}
}
if (target == null) 0 else findEqual(index = 1)
}

protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val evalChildren = children.map(_.genCode(ctx))
val target = evalChildren(0)
val targetDataType = children(0).dataType
val dataTypes = children.map(_.dataType)

def updateEval(evalWithIndex: (ExprCode, Int)): String = {
val (eval, index) = evalWithIndex
s"""
${eval.code}
if (${ctx.genEqual(targetDataType, eval.value, target.value)}) {
${ev.value} = ${index};
}
"""
}

def genIfElseStructure(code1: String, code2: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the name it felt like this method would put code1 in if block and code2 in else block but turns out thats not the case. That floating else looks weird.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can change this function's name? But actually I can't think of a better name. : )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe we can use foldLeft to replace current approach to get rid of the floating else.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't understand, how to use foldLeft approach? I think we can only use foldRight or reduceRight, because the code for latter children should be nested inner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, right, if use foldLeft, there is still a floating else. We can only use foldRight to remove it.

Copy link
Author

@gczsjdy gczsjdy Feb 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to use foldRight to remove it?
My thought: If I understand your meaning of floating else right(could you please explain it a little bit?), foldRight and reduceRight both can't avoid floating else, because we need nested else in else block, like this:
if (xxx) else { if (xxx) else { ... } } , so if we avoid floating else in genIfElseStructure, else should be in updateEval, which will make the code unclear and complicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks like:

${evalChildren.zip(dataTypes).zipWithIndex.tail.filter { x =>
  dataTypeMatchIndex.contains(x._2)
}.foldRight("") { (code: String, evalWithIndex: ((ExprCode, DataType), Int)) =>
  val ((eval, _), index) = evalWithIndex
  val condition = ctx.genEqual(targetDataType, eval.value, target.value)     
  s"""
    ${eval.code}
    if ($condition) {
      ${ev.value} = ${index};
    } else {
      $code
    }       
  """
}

You can do this with a function like you did before. It will have a empty "else" block at the end.

However this doesn't affect the functionality, just dealing with how the code looks. I don't have strong option about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think whats already present in the code is ok. Given that there is no better option without adding more complexity, lets stick with it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Maybe the order of code and evalWithIndex parameters should be changed.
@tejasapatil I agree with your opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current code is ok.

s"""
${code1}
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the floating else. As @tejasapatil said, looks weird.

${code2}
}
"""
}

ev.copy(code =
code"""
${target.code}
boolean ${ev.isNull} = false;
int ${ev.value} = 0;
if (!${target.isNull}) {
${evalChildren.zipWithIndex.tail.map(updateEval).reduceRight(genIfElseStructure)}
}
""".stripMargin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
Expand Down Expand Up @@ -222,4 +224,52 @@ class ConditionalExpressionSuite extends SparkFunSuite with ExpressionEvalHelper
CaseWhen(Seq((Literal.create(false, BooleanType), Literal(1))), Literal(-1)).genCode(ctx)
assert(ctx.inlinedMutableStates.size == 1)
}

test("field") {
// scalastyle:off
// non ascii characters are not allowed in the code, so we disable the scalastyle here.
val str1 = Literal("花花世界")
val str2 = Literal("a")
val str3 = Literal("b")
val str5 = Literal("999")
val strNull = Literal.create(null, StringType)

val bool1 = Literal(true)
val bool2 = Literal(false)

val int1 = Literal(1)
val int2 = Literal(2)
val int3 = Literal(3)
val int4 = Literal(999)
val intNull = Literal.create(null, IntegerType)

val double1 = Literal(1.221)
val double2 = Literal(1.222)
val double3 = Literal(1.224)

val timeStamp1 = Literal(new Timestamp(2016, 12, 27, 14, 22, 1, 1))
val timeStamp2 = Literal(new Timestamp(1988, 6, 3, 1, 1, 1, 1))
val timeStamp3 = Literal(new Timestamp(1990, 6, 5, 1, 1, 1, 1))

val date1 = Literal(new Date(1949, 1, 1))
val date2 = Literal(new Date(1979, 1, 1))
val date3 = Literal(new Date(1989, 1, 1))

checkEvaluation(Field(Seq(str1, str2, str3, str1)), 3)
checkEvaluation(Field(Seq(str2, str2, str2, str1)), 1)
checkEvaluation(Field(Seq(bool1, bool2, bool1, bool1)), 2)
checkEvaluation(Field(Seq(int1, int2, int3, int1)), 3)
checkEvaluation(Field(Seq(double2, double3, double1, double2)), 3)
checkEvaluation(Field(Seq(timeStamp1, timeStamp2, timeStamp3, timeStamp1)), 3)
checkEvaluation(Field(Seq(date1, date1, date2, date3)), 1)
// scalastyle:on
}

test("case key whn - internal pattern matching expects a List while apply takes a Seq") {
val indexedSeq = IndexedSeq(Literal(1), Literal(42), Literal(42), Literal(1))
val caseKeyWhaen = CaseKeyWhen(Literal(12), indexedSeq)
assert(caseKeyWhaen.branches ==
IndexedSeq((Literal(12) === Literal(1), Literal(42)),
(Literal(12) === Literal(42), Literal(1))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,16 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
df.select(typedLit(("a", 2, 1.0))),
Row(Row("a", 2, 1.0)) :: Nil)
}

test("field") {
// scalastyle:off
// non ascii characters are not allowed in the code, so we disable the scalastyle here.
val testData = Seq((1, 2, 3)).toDF()
checkAnswer(testData.selectExpr("field('花花世界', 'a', 1.23, true, '花花世界')"), Row(4))
checkAnswer(testData.selectExpr("field(null, 'a', 1.23, true, null)"), Row(0))
checkAnswer(testData.selectExpr("field(4, 3.00, '4', true, 4)"), Row(2))
checkAnswer(testData.selectExpr("field(3.00, '3', 3, timestamp(123), '3.00')"), Row(1))
checkAnswer(testData.selectExpr("field('3.0', 3.00, 3, '3.00')"), Row(0))
// scalastyle:on
}
}