Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23938][SQL] Add map_zip_with function #22017

Closed
wants to merge 16 commits into from

Conversation

mn-mikke
Copy link
Contributor

@mn-mikke mn-mikke commented Aug 6, 2018

What changes were proposed in this pull request?

This PR adds a new SQL function called map_zip_with. It merges the two given maps into a single map by applying function to the pair of values with the same key.

How was this patch tested?

Added new tests into:

  • DataFrameFunctionsSuite.scala
  • HigherOrderFunctionsSuite.scala

@mn-mikke
Copy link
Contributor Author

mn-mikke commented Aug 6, 2018

cc @ueshin @mgaido91 @hvanhovell

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94325 has finished for PR 22017 at commit ef56011.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ArrayDataMerger(elementType: DataType)
  • case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike
  • abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes
  • case class MapZipWith(left: Expression, right: Expression, function: Expression)

}
}

private def getMapType(expr: Expression) = expr.dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like you to use the same util method. I suggested to introduce object HigherOrderFunction at #21986 (comment).

nullable2: Boolean,
dt3: DataType,
nullable3: Boolean,
f: (Expression, Expression, Expression) => Expression): Expression = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

case BinaryType => false
case _: AtomicType => true
case _ => false
@transient protected lazy val elementTypeSupportEquals = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can avoid the braces

val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
val values = new GenericArrayData(new Array[Any](keys.numElements()))
keys.foreach(keyType, (idx: Int, key: Any) => {
val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is very inefficient. The computational complexity is very high (N^2 in the size of the biggest map). I think here can implement something more efficient avoid also the changes for the code refactoring. I'd propose to get also the index where a key has been found in each map, so that we can access the values by index. In this way the overall complexity would be O(N).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for mentioning this! I'm not happy with the current complexity either. I've assumed that the implementation of maps will change into something with O(1) element access in future. By then, the complexity would be O(N) for types supporting equals as well and we would safe a portion of duplicated code.

If you think that maps will remain like this for a long time, really like your suggestion with indexes.

@ueshin What's your view on that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no plan to have a different map implementation and anyway there is a lot of code which depends on having the array based version of MapData. Regarding the duplicated code, to be honest, I think that avoiding the refactoring introduced by that would also make this PR cleaner...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will change it. Thanks a lot!

}

test("map_zip_with function - invalid")
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please move the brace at the end of the previous line (there are other place where this should be done, please update them too)

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94362 has finished for PR 22017 at commit ec583eb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ArrayDataUnion(elementType: DataType) extends ((ArrayData, ArrayData) => ArrayData)
  • case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike


abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
object GetMapValueUtil
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: brace should in previous line.


override def functions: Seq[Expression] = function :: Nil

override def nullable: Boolean = left.nullable || right.nullable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left.nullable && right.nullable? Because if one side is empty map, NULL will be passed as the value for each key in other side.

Copy link
Contributor Author

@mn-mikke mn-mikke Aug 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nullable flag is rather related to the cases when the whole map is null. The case that you are referring to is handled by valueContainsNull flag of MapType (see the line 496).

@SparkQA
Copy link

SparkQA commented Aug 8, 2018

Test build #94394 has finished for PR 22017 at commit 12ad8b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 8, 2018

Test build #94393 has finished for PR 22017 at commit 89a3da4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Array[Option[Int]] instead of (Option[Int], Option[Int])? Moreover, I can't understand why we need this at all. As we have the HashMap, we can just add there the indexes and return it as an array..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we changed it to (Option[Int], Option[Int]), wouldn't we need two similar i loops instead of one?

My motivation for using also the ArrayBuffer is preserve the order of keys. A random order would break map comparison in tests. Maybe you will come with idea how to compare maps in tests better :-)

Copy link
Contributor

@mgaido91 mgaido91 Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we changed it to (Option[Int], Option[Int]), wouldn't we need two similar i loops instead of one?

I really don't think so, it would be the same as now I think

well, maybe we can fix map comparison in tests... :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't think so, it would be the same as now I think

Let's assume that indexes are tuple for now. indexes(z).isEmpty could replace with indexes.productElement(z).isEmpty, but how to replace indexes(z) = Some(i)? Since tuple is immutable, I don't see how to replace ith element with copy function. Maybe we could implement a dedicated class to hold indexes, but is it worth doing that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the HashMap is mutable, you can just: hashMap += key -> newTuple

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array based solution is 20% faster indeed according to my benchmark, but I think it is not critical as I run the benchmark performing the operation 1.000.000 times and the absolute difference was 2 ms. So I prefer the cleaner solution (that is using tuples). @ueshin what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally prefer the cleaner solution, but actually I'd prefer the previous approach in this case for 2 reasons:

  • We shouldn't ignore 20% of performance difference.
  • I'm not sure we can modify the comparison of MapType in ExpressionEvalHelper here. We might need another pr to make sure the modification is valid.

We still need to add comments what the arrays are for and the reason, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Are you comfortable with reverting back to the previous version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mn-mikke we can use LinkedHashMap in order to preserve key order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this idea, thanks!

private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
val keys = Array(keys1, keys2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to do something like for (arr <- Seq(keys1, keys2))?

j += 1
}
if (!found) {
assertSizeOfArrayBuffer(arrayBuffer.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we check this only once at the end in order to avoid the overhead at each iteration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this line is to avoid OutOfMemoryError exception when max array size is exceeded and throw something more accurate. Maybe I'm missing something, but wouldn't we break it we checked this only once at the end? The max size could be exceeded in any iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, because you are using an ArrayBuffer....makes sense, thanks

arrayBuffer
}

private def getValue(valueData: ArrayData, eType: DataType, index: Option[Int]) = index match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this? It can be index.map(valueData.get(_, eType)).getOrElse(null) and we are using it only in one place (twice, but in the same place)...

@SparkQA
Copy link

SparkQA commented Aug 9, 2018

Test build #94458 has finished for PR 22017 at commit 38ce4e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


@transient lazy val functionForEval: Expression = functionsForEval.head

@transient lazy val (keyType, leftValueType, _) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyType should be TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, thanks!

WDYT about introducing a coercion rule handling different key types? For cases like (IntType, LongType) might be handy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks!

var i = 0
while (i < keys1.numElements) {
val key = keys1.get(i, keyType)
if(!hashMap.contains(key)) hashMap.put(key, (Some(i), None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's use brackets:

if (!hashMap.contains(key)) {
  hashMap.put(key, (Some(i), None))
}

if (unsafeRow != expectedRow) {
val field = StructField("field", expression.dataType)
val dataType = StructType(field :: field :: Nil)
if (!checkResult(unsafeRow, expectedRow, dataType)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsafeRows are compared based on equality of backing arrays. This approach doesn't work well when ignoring order in unsafe representation of maps.

@SparkQA
Copy link

SparkQA commented Aug 9, 2018

Test build #94508 has finished for PR 22017 at commit 5d2a78e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94526 has finished for PR 22017 at commit 3c849cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Aug 10, 2018

LGTM.
@mgaido91 Do you have any other comments on this?

Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too, apart from one question

HigherOrderFunction.mapKeyValueArgumentType(right.dataType)

@transient lazy val keyType =
TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? We are enforcing that the two maps have the same key type, can't we just get one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though there is a coercion rule for unification of key types. The key types may differ in nullability flags if they are complex. In theory, we could use == and findTightestCommonType in the coercion rule since there is no codegen to be optimized for null checks. But unfortunatelly, bind gets called once before execution of coercion rules, so findTightestCommonType is important for setting up a correct input type for lamda function.

Maybe, we could play with order of analysis rules, but I'm not sure about all the consequences. @ueshin could shad some light on analysis rules ordering?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the current analysis rules order might cause some problem. Let me think about it for a while.

Copy link
Member

@ueshin ueshin Aug 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I submitted a pr to fix analysis rules order to fix argument types before bind #22075.
Btw, we should use findCommonTypeDifferentOnlyInNullFlags for this because after the type coercion, the difference between two key types must be only nullabilities. Sorry for confusing you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, if checkInputDataTypes was executed before bind, findTightestCommonType could play the same role. But yeah, findCommonTypeDifferentOnlyInNullFlags will be semantically more accurate. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #22075, checkArgumentDataType() introduced in it will be executed before bind, so the key types should be "sameType" and we will be able to use findCommonTypeDifferentOnlyInNullFlags. We still need checkInputDataTypes to be executed after bind to check the whole data types are valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. We also need to check the output data type of lambda functions for the expressions like ArrayFilter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Year, we need it.

/**
* Similar to [[findTightestCommonType]] but with string promotion.
*/
def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why except Decimals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have maps with decimals of different precision as keys. Cast will fail in analysis phase since it can't cast a key to nullable (potential lost of precision). IMHO, the type mismatch exception from this function will be more accurate. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, good catch! But it led me to another issue. We can't choose those types possibly to be null as a map key. Instead of adding the method, how about modifying findTypeForComplex as something like:

private def findTypeForComplex(
      t1: DataType,
      t2: DataType,
      findTypeFunc: (DataType, DataType) => Option[DataType]): Option[DataType] = (t1, t2) match {
  ...
    case (MapType(kt1, vt1, valueContainsNull1), MapType(kt2, vt2, valueContainsNull2)) =>
      findTypeFunc(kt1, kt2)
        .filter(kt => !Cast.forceNullable(kt1, kt) && !Cast.forceNullable(kt2, kt))
        .flatMap { kt =>
          findTypeFunc(vt1, vt2).map { vt =>
            MapType(kt, vt, valueContainsNull1 || valueContainsNull2)
          }
      }
  ...
}

We might need to have another pr to discuss this.

cc @cloud-fan @gatorsmile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thoughts, do we really need those? Seems like the current coercions rules don't contain possibly cast to null?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see that this is a matter of findTypeForComplex. I'll submit another pr later. Maybe we can go back to findWiderTypeForTwo in TypeCoercion and findCommonTypeDifferentOnlyInNullFlag for keyType.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I submitted a pr #22086.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for both your PRs! I will submit changes once they get in.

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94570 has finished for PR 22017 at commit 595161f.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94585 has finished for PR 22017 at commit 595161f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 13, 2018

Test build #94685 has finished for PR 22017 at commit 595161f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #94712 has finished for PR 22017 at commit 2b7e9e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

case class MapZipWith(left: Expression, right: Expression, function: Expression)
extends HigherOrderFunction with CodegenFallback {

@transient lazy val functionForEval: Expression = functionsForEval.head
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we use def here to follow the comment #21954 (comment)?

}

// Nothing to check since the data type of the lambda function can be anything.
override def checkInputDataTypes(): TypeCheckResult = TypeCheckResult.TypeCheckSuccess
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call checkArgumentDataTypes() here again.

Row(null)))
}

test("map_zip_with function - map of complex types") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: non-primitive instead of complex?

Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType

@transient lazy val keyType =
TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the null flag be false for both them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If leftKeyType is ArrayType(IntegerType, false) and rightKeyType is ArrayType(IntegerType, true) for instance, the coercion rule is not executed leftKeyType.sameType(rightKeyType) == true.

An array with nulls seems to be a valid key.:

scala> spark.range(1).selectExpr("map(array(1, 2, null), 12)").show()
+---------------------------------------+
|map(array(1, 2, CAST(NULL AS INT)), 12)|
+---------------------------------------+
|                        [[1, 2,] -> 12]|
+---------------------------------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #94737 has finished for PR 22017 at commit bcd4e0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Aug 14, 2018

Thanks! merging to master.

@asfgit asfgit closed this in 42263fd Aug 14, 2018
asfgit pushed a commit that referenced this pull request Aug 17, 2018
…should be true.

## What changes were proposed in this pull request?

This is a follow-up pr of #22017 which added `map_zip_with` function.
In the test, when creating a lambda function, we use the `valueContainsNull` values for the nullabilities of the value arguments, but we should've used `true` as the same as `bind` method because the values might be `null` if the keys don't match.

## How was this patch tested?

Added small tests and existing tests.

Closes #22126 from ueshin/issues/SPARK-23938/fix_tests.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
asfgit pushed a commit that referenced this pull request Oct 25, 2018
## What changes were proposed in this pull request?

- Revert [SPARK-23935][SQL] Adding map_entries function: #21236
- Revert [SPARK-23937][SQL] Add map_filter SQL function: #21986
- Revert [SPARK-23940][SQL] Add transform_values SQL function: #22045
- Revert [SPARK-23939][SQL] Add transform_keys function: #22013
- Revert [SPARK-23938][SQL] Add map_zip_with function: #22017
- Revert the changes of map_entries in [SPARK-24331][SPARKR][SQL] Adding arrays_overlap, array_repeat, map_entries to SparkR: #21434

## How was this patch tested?
The existing tests.

Closes #22827 from gatorsmile/revertMap2.4.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants