From ec583eb29ba6fdb79d0b85cbecb3f709e6648b25 Mon Sep 17 00:00:00 2001 From: Marek Novotny Date: Tue, 7 Aug 2018 11:16:59 +0200 Subject: [PATCH] [SPARK-23938][SQL] Merging master into the feature branch and resolving confilicts. --- .../expressions/collectionOperations.scala | 44 ++++++++++++------- .../expressions/higherOrderFunctions.scala | 10 +++-- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 755cb95d59eba..60c830d070906 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3756,24 +3756,14 @@ object ArraySetLike { } /** - * Returns an array of the elements in the union of x and y, without duplicates + * The class performs union operation with two [[ArrayData]] objects. */ -@ExpressionDescription( - usage = """ - _FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, - without duplicates. - """, - examples = """ - Examples: - > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); - array(1, 2, 3, 5) - """, - since = "2.4.0") -case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike - with ComplexTypeMergingExpression { +class ArrayDataUnion(elementType: DataType) extends ((ArrayData, ArrayData) => ArrayData) { - @transient lazy val evalUnion: (ArrayData, ArrayData) => ArrayData = { - if (elementTypeSupportEquals) { + private lazy val ordering: Ordering[Any] = TypeUtils.getInterpretedOrdering(elementType) + + private lazy val evalFunc: (ArrayData, ArrayData) => ArrayData = { + if (ArraySetLike.typeSupportsEquals(elementType)) { (array1, array2) => val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] val hs = new OpenHashSet[Any] @@ -3834,6 +3824,28 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike } } + def apply(array1: ArrayData, array2: ArrayData): ArrayData = evalFunc(array1, array2) +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ + Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike + with ComplexTypeMergingExpression { + + @transient lazy val evalUnion = new ArrayDataUnion(elementType) + override def nullSafeEval(input1: Any, input2: Any): Any = { val array1 = input1.asInstanceOf[ArrayData] val array2 = input2.asInstanceOf[ArrayData] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index cb7eef39aaa31..84d8156018e22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -392,7 +392,9 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression) @transient lazy val MapType(_, rightValueType, _) = getMapType(right) - @transient lazy val merger = new ArrayDataMerger(keyType) + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) override def inputs: Seq[Expression] = left :: right :: Nil @@ -445,11 +447,11 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression) private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = { val mapData1 = value1.asInstanceOf[MapData] val mapData2 = value2.asInstanceOf[MapData] - val keys = merger.merge(mapData1.keyArray(), mapData2.keyArray()) + val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray()) val values = new GenericArrayData(new Array[Any](keys.numElements())) keys.foreach(keyType, (idx: Int, key: Any) => { - val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, merger.ordering) - val v2 = GetMapValueUtil.getValueEval(mapData2, key, keyType, rightValueType, merger.ordering) + val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering) + val v2 = GetMapValueUtil.getValueEval(mapData2, key, keyType, rightValueType, ordering) keyVar.value.set(key) value1Var.value.set(v1) value2Var.value.set(v2)