Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -99,11 +100,22 @@ object TypeCoercion {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)

case (t1 @ ArrayType(elementType1, nullable1), t2 @ ArrayType(elementType2, nullable2))
if t1.sameType(t2) =>
val dataType = findTightestCommonType(elementType1, elementType2).get
Some(ArrayType(dataType, nullable1 || nullable2))

case (t1 @ MapType(keyType1, valueType1, nullable1),
t2 @ MapType(keyType2, valueType2, nullable2)) if t1.sameType(t2) =>
val keyType = findTightestCommonType(keyType1, keyType2).get
val valueType = findTightestCommonType(valueType1, valueType2).get
Some(MapType(keyType, valueType, nullable1 || nullable2))

case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
// Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
// except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
// - Different names: use f1.name
// - Names differing in case: use f1.name
// - Different nullabilities: `nullable` is true iff one of them is nullable.
val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable)
Expand Down Expand Up @@ -148,6 +160,61 @@ object TypeCoercion {
case (l, r) => None
}

/**
* Case 2 type widening over complex types. `widerTypeFunc` is a function that finds the wider
* type over point types. The `widerTypeFunc` specifies behavior over whether types should be
* promoted to StringType.
*/
private def findWiderTypeForTwoComplex(
t1: DataType,
t2: DataType,
widerTypeFunc: (DataType, DataType) => Option[DataType]): Option[DataType] = {
(t1, t2) match {
case (_, _) if t1 == t2 => Some(t1)
case (NullType, _) => Some(t1)
case (_, NullType) => Some(t1)

case (ArrayType(elementType1, nullable1), ArrayType(elementType2, nullable2)) =>
val dataType = widerTypeFunc.apply(elementType1, elementType2)

dataType.map(ArrayType(_, nullable1 || nullable2))

case (MapType(keyType1, valueType1, nullable1), MapType(keyType2, valueType2, nullable2)) =>
val keyType = widerTypeFunc.apply(keyType1, keyType2)
val valueType = widerTypeFunc.apply(valueType1, valueType2)

if (keyType.nonEmpty && valueType.nonEmpty) {
Some(MapType(keyType.get, valueType.get, nullable1 || nullable2))
} else {
None
}

case (StructType(fields1), StructType(fields2)) =>
val fieldTypes = fields1.zip(fields2).map { case (f1, f2) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this requires that fields with the same name must also be in the same position. Is this assumption correct?

Copy link
Author

@bdrillard bdrillard Jan 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 That's seems to be the assumption already made in findTightestCommonType. The sameType function on DataType also requires structfields are ordered the same, else it returns false.

The difference here is that we don't require the structfields strictly have the same type, so we can support widening to LongType, StringType, etc. But we do require the fields 1. have the same order, and 2. have the same name (either with strict case, or ignoring case).

// In order to match Case 2 widening of types, we do not require field data types be the
// same type, but fields having different names are considered heterogeneous
if ((SQLConf.get.caseSensitiveAnalysis && f1.name.equals(f2.name))
|| (!SQLConf.get.caseSensitiveAnalysis && f1.name.equalsIgnoreCase(f2.name))) {
widerTypeFunc(f1.dataType, f2.dataType)
} else {
None
}
}

if (fieldTypes.forall(_.nonEmpty)) {
val structFields = fields1.zip(fields2).zip(fieldTypes).map { case ((f1, f2), t) =>
StructField(f1.name, t.get, nullable = f1.nullable || f2.nullable)
}

Some(StructType(structFields))
} else {
None
}

case _ => None
}
}

/**
* Case 2 type widening (see the classdoc comment above for TypeCoercion).
*
Expand All @@ -158,11 +225,7 @@ object TypeCoercion {
findTightestCommonType(t1, t2)
.orElse(findWiderTypeForDecimal(t1, t2))
.orElse(stringPromotion(t1, t2))
.orElse((t1, t2) match {
case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) =>
findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2))
case _ => None
})
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we now check for ArrayTypes (including MapTypes) in findTightestCommonType, the match here becomes redundant. @mgaido91, @gczsjdy, does this thinking make sense to you both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense, but I'd love that in your implementation in findTightestCommonType, you would replicate this logic, ie. removing the sameType guard and using findWiderTypeForTwo, in order to allow casting an array of int to an array of long. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I think that's possible. In order to handle instances with and without string promotion, I think it may be necessary to add a boolean parameter, and then to handle the instances where the pointType/keyType and valueType may result in None, see https://github.com/apache/spark/pull/20010/files#diff-383a8cdd0a9c58cae68e0a79295520a3R105

To support the minor change in function signature for findTightestCommonType, I have to do some refactoring in the tests. Let me know if you think there's a cleaner implementation, but this seems to help localize like concerns into findTightestCommonType.

Copy link

@gczsjdy gczsjdy Dec 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense, but I doubt the solution we call findWiderType.. in findTightest... This seems not reasonable.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion: we define a new function, like findWiderTypeForArray. This new function can provide 'findWider' functionality compared to the 'findTightestArray' part(which is basically the first commit in your PR). We won't break the original findTightest semantic in this way and the code is clean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your idea @gczsjdy!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gczsjdy I've taken a shot at implementing your suggestion with findWiderTypeForTwoComplex, which takes as an argument a widerTypeFunc, describing which widening behavior to apply to point types (should they permit promotion to string or not).

Because ArrayType instances that would require widening the type could be nested in StructType and MapType, I think it's necessary to have more case matching than would be in findWiderTypeForArray, hence findWiderTypeForTwoComplex.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your implementation, except for the concerns I raised in another thread. Have you tested locally?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this PR should be ready for a Jenkins build.

.orElse(findWiderTypeForTwoComplex(t1, t2, findWiderTypeForTwo))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause infinite loop? Calling findWiderTypeForTwo in findWiderTypeForTwo?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not. findWiderTypeForTwoComplex will only be called as we operate over "complex" types (i.e arrays, maps, structs), and will only recurse (calling findWiderTypeForTwo) over the child point-types of a complex type, so we ensure the recursive computation gets narrower as it recurses, until eventually terminating at the leaf level of the schema.

}

private def findWiderCommonType(types: Seq[DataType]): Option[DataType] = {
Expand All @@ -182,12 +245,7 @@ object TypeCoercion {
t2: DataType): Option[DataType] = {
findTightestCommonType(t1, t2)
.orElse(findWiderTypeForDecimal(t1, t2))
.orElse((t1, t2) match {
case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) =>
findWiderTypeWithoutStringPromotionForTwo(et1, et2)
.map(ArrayType(_, containsNull1 || containsNull2))
case _ => None
})
Copy link
Author

@bdrillard bdrillard Dec 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same sanity check here, as above.

.orElse(findWiderTypeForTwoComplex(t1, t2, findWiderTypeWithoutStringPromotionForTwo))
}

def findWiderTypeWithoutStringPromotion(types: Seq[DataType]): Option[DataType] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,51 @@ class TypeCoercionSuite extends AnalysisTest {
widenTest(StringType, MapType(IntegerType, StringType, true), None)
widenTest(ArrayType(IntegerType), StructType(Seq()), None)

widenTest(
ArrayType(StringType, containsNull = true),
ArrayType(StringType, containsNull = false),
Some(ArrayType(StringType, containsNull = true)))
widenTest(
MapType(StringType, StringType, valueContainsNull = true),
MapType(StringType, StringType, valueContainsNull = false),
Some(MapType(StringType, StringType, valueContainsNull = true)))
widenTest(
StructType(Seq(StructField("a", StringType, nullable = true))),
StructType(Seq(StructField("a", StringType, nullable = false))),
Some(StructType(Seq(StructField("a", StringType, nullable = true)))))

widenTest(
StructType(
Seq(StructField("a", ArrayType(StringType, containsNull = true), nullable = true))),
StructType(
Seq(StructField("a", ArrayType(StringType, containsNull = false), nullable = false))),
Some(StructType(
Seq(StructField("a", ArrayType(StringType, containsNull = true), nullable = true)))))
widenTest(
StructType(
Seq(StructField("a", MapType(StringType, StringType, valueContainsNull = true)))),
StructType(
Seq(StructField("a", MapType(StringType, StringType, valueContainsNull = false)))),
Some(StructType(
Seq(StructField("a", MapType(StringType, StringType, valueContainsNull = true))))))
widenTest(
ArrayType(
StructType(Seq(StructField("a", StringType, nullable = true))), containsNull = true),
ArrayType(
StructType(Seq(StructField("a", StringType, nullable = false))), containsNull = false),
Some(ArrayType(
StructType(Seq(StructField("a", StringType, nullable = true))), containsNull = true)))
widenTest(
MapType(
StringType,
StructType(Seq(StructField("a", StringType, nullable = true))), valueContainsNull = true),
MapType(
StringType,
StructType(Seq(StructField("a", StringType, nullable = false))), valueContainsNull = false),
Some(MapType(
StringType,
StructType(Seq(StructField("a", StringType, nullable = true))), valueContainsNull = true)))

widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("b", IntegerType))),
Expand Down Expand Up @@ -431,7 +476,7 @@ class TypeCoercionSuite extends AnalysisTest {
}
}

test("wider common type for decimal and array") {
test("wider common type for decimal and complex types") {
def widenTestWithStringPromotion(
t1: DataType,
t2: DataType,
Expand Down Expand Up @@ -462,27 +507,139 @@ class TypeCoercionSuite extends AnalysisTest {
ArrayType(DoubleType, containsNull = false),
Some(ArrayType(DoubleType, containsNull = true)))
widenTestWithStringPromotion(
ArrayType(TimestampType, containsNull = false),
ArrayType(StringType, containsNull = true),
ArrayType(ArrayType(IntegerType), containsNull = true),
ArrayType(ArrayType(LongType), containsNull = false),
Some(ArrayType(ArrayType(LongType), containsNull = true)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really wanna do this? Is there any other database have this behavior?

I think for complex type, we should ignore the nullable difference, but I'm not sure if we should do type coercion inside complex type.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91, thoughts on this?

It's definitely possible for us to revert back to behavior where we don't do IntegerType-to-LongType, xType-to-StringType, etc. promotion inside complex types, which was how a previous form of this PR handled it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more coherent with the behavior in other parts and this is the behavior I would expect. But I think that we should follow @gatorsmile's suggestion and check Hive's behavior first.

widenTestWithStringPromotion(
ArrayType(StringType), ArrayType(TimestampType), Some(ArrayType(StringType)))
widenTestWithStringPromotion(
StructType(
Seq(StructField("a", ArrayType(LongType, containsNull = true), nullable = true))),
StructType(
Seq(StructField("a", ArrayType(StringType, containsNull = false), nullable = false))),
Some(StructType(
Seq(StructField("a", ArrayType(StringType, containsNull = true), nullable = true)))))
widenTestWithStringPromotion(
ArrayType(StructType(Seq(StructField("a", LongType, nullable = true))), containsNull = true),
ArrayType(StructType(
Seq(StructField("a", StringType, nullable = false))), containsNull = false),
Some(ArrayType(StructType(
Seq(StructField("a", StringType, nullable = true))), containsNull = true)))
widenTestWithStringPromotion(
StructType(Seq(StructField("a", ArrayType(LongType)))),
StructType(Seq(StructField("b", ArrayType(StringType)))),
None)
widenTestWithStringPromotion(
ArrayType(StructType(Seq(StructField("a", LongType)))),
ArrayType(StructType(Seq(StructField("b", StringType)))),
None)

// MapType
widenTestWithStringPromotion(
MapType(StringType, ShortType, valueContainsNull = true),
MapType(StringType, DoubleType, valueContainsNull = false),
Some(MapType(StringType, DoubleType, valueContainsNull = true)))
widenTestWithStringPromotion(
MapType(StringType, MapType(StringType, IntegerType, valueContainsNull = true),
valueContainsNull = true),
MapType(StringType, MapType(StringType, LongType, valueContainsNull = false),
valueContainsNull = false),
Some(MapType(StringType, MapType(StringType, LongType, valueContainsNull = true),
valueContainsNull = true)))
widenTestWithStringPromotion(
StructType(Seq(StructField("a", MapType(
StringType, LongType, valueContainsNull = true), nullable = true))),
StructType(Seq(StructField("a", MapType(
StringType, StringType, valueContainsNull = false), nullable = false))),
Some(StructType(Seq(StructField("a", MapType(
StringType, StringType, valueContainsNull = true), nullable = true)))))
widenTestWithStringPromotion(
MapType(StringType,
StructType(Seq(StructField("a", LongType, nullable = true))),
valueContainsNull = true),
MapType(StringType,
StructType(Seq(StructField("a", StringType, nullable = false))),
valueContainsNull = false),
Some(MapType(StringType,
StructType(Seq(StructField("a", StringType, nullable = true))),
valueContainsNull = true)))
widenTestWithStringPromotion(
StructType(Seq(StructField("a", MapType(StringType, LongType)))),
StructType(Seq(StructField("b", MapType(StringType, StringType)))),
None)
widenTestWithStringPromotion(
MapType(StringType, StructType(Seq(StructField("a", LongType)))),
MapType(StringType, StructType(Seq(StructField("b", StringType)))),
None)

// String promotion
widenTestWithStringPromotion(IntegerType, StringType, Some(StringType))
widenTestWithStringPromotion(StringType, TimestampType, Some(StringType))
widenTestWithStringPromotion(
ArrayType(TimestampType, containsNull = true),
ArrayType(StringType, containsNull = false),
Some(ArrayType(StringType, containsNull = true)))
widenTestWithStringPromotion(
ArrayType(ArrayType(IntegerType), containsNull = false),
ArrayType(ArrayType(LongType), containsNull = false),
Some(ArrayType(ArrayType(LongType), containsNull = false)))
ArrayType(LongType, containsNull = true),
ArrayType(StringType, containsNull = false),
Some(ArrayType(StringType, containsNull = true)))
widenTestWithStringPromotion(
MapType(StringType, TimestampType, valueContainsNull = true),
MapType(StringType, StringType, valueContainsNull = false),
Some(MapType(StringType, StringType, valueContainsNull = true)))
widenTestWithStringPromotion(
MapType(StringType, LongType, valueContainsNull = true),
MapType(StringType, StringType, valueContainsNull = false),
Some(MapType(StringType, StringType, valueContainsNull = true)))

// Without string promotion
widenTestWithoutStringPromotion(IntegerType, StringType, None)
widenTestWithoutStringPromotion(StringType, TimestampType, None)
widenTestWithoutStringPromotion(ArrayType(LongType), ArrayType(StringType), None)
widenTestWithoutStringPromotion(ArrayType(StringType), ArrayType(TimestampType), None)
widenTestWithoutStringPromotion(
MapType(StringType, LongType),
MapType(StringType, TimestampType),
None)
widenTestWithoutStringPromotion(
MapType(StringType, StringType),
MapType(StringType, TimestampType),
None)
widenTestWithoutStringPromotion(
StructType(Seq(StructField("a", MapType(StringType, LongType)))),
StructType(Seq(StructField("a", MapType(StringType, StringType)))),
None)
widenTestWithoutStringPromotion(
StructType(Seq(StructField("a", ArrayType(LongType)))),
StructType(Seq(StructField("a", ArrayType(StringType)))),
None)
widenTestWithoutStringPromotion(
StructType(Seq(StructField("a", MapType(StringType, LongType)))),
StructType(Seq(StructField("a", MapType(StringType, StringType)))),
None)
widenTestWithoutStringPromotion(
ArrayType(StructType(Seq(StructField("a", LongType)))),
ArrayType(StructType(Seq(StructField("a", StringType)))),
None)

// String promotion
widenTestWithStringPromotion(IntegerType, StringType, Some(StringType))
widenTestWithStringPromotion(StringType, TimestampType, Some(StringType))
widenTestWithStringPromotion(
ArrayType(LongType), ArrayType(StringType), Some(ArrayType(StringType)))
// Although the data types promotion would not fail, tests should still return None due to field
// name mismatch
widenTestWithoutStringPromotion(
StructType(Seq(StructField("a", MapType(StringType, LongType)))),
StructType(Seq(StructField("b", MapType(StringType, LongType)))),
None)
widenTestWithoutStringPromotion(
StructType(Seq(StructField("a", ArrayType(LongType)))),
StructType(Seq(StructField("b", ArrayType(LongType)))),
None)
widenTestWithoutStringPromotion(
StructType(Seq(StructField("a", MapType(StringType, LongType)))),
StructType(Seq(StructField("b", MapType(StringType, LongType)))),
None)
widenTestWithStringPromotion(
ArrayType(StringType), ArrayType(TimestampType), Some(ArrayType(StringType)))
ArrayType(StructType(Seq(StructField("a", LongType)))),
ArrayType(StructType(Seq(StructField("b", LongType)))),
None)
}

private def ruleTest(rule: Rule[LogicalPlan], initial: Expression, transformed: Expression) {
Expand Down