Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Apr 19, 2018
1 parent ad6762a commit 07f4c82
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1009,16 +1009,35 @@ case class CatalystToExternalMap private(
override def children: Seq[Expression] =
keyLambdaFunction :: valueLambdaFunction :: inputData :: Nil

private lazy val toScalaValue: Any => Any = {
assert(inputData.dataType.isInstanceOf[MapType])
val mapType = inputData.dataType.asInstanceOf[MapType]
CatalystTypeConverters.createToScalaConverter(mapType)
private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType]

private lazy val keyConverter =
CatalystTypeConverters.createToScalaConverter(inputMapType.keyType)
private lazy val valueConverter =
CatalystTypeConverters.createToScalaConverter(inputMapType.valueType)

private def newMapBuilder(): Builder[AnyRef, AnyRef] = {
val clazz = Utils.classForName(collClass.getCanonicalName + "$")
val module = clazz.getField("MODULE$").get(null)
val method = clazz.getMethod("newBuilder")
method.invoke(module).asInstanceOf[Builder[AnyRef, AnyRef]]
}

override def eval(input: InternalRow): Any = {
val result = inputData.eval(input).asInstanceOf[MapData]
if (result != null) {
toScalaValue(result)
val builder = newMapBuilder()
builder.sizeHint(result.numElements())
val keyArray = result.keyArray()
val valueArray = result.valueArray()
var i = 0
while (i < result.numElements()) {
val key = keyConverter(keyArray.get(i, inputMapType.keyType))
val value = valueConverter(valueArray.get(i, inputMapType.valueType))
builder += Tuple2(key, value)
i += 1
}
builder.result()
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

<<<<<<< 0c94e48bc50717e1627c0d2acd5382d9adc73c97
test("LambdaVariable should support interpreted execution") {
def genSchema(dt: DataType): Seq[StructType] = {
Seq(StructType(StructField("col_1", dt, nullable = false) :: Nil),
Expand Down

0 comments on commit 07f4c82

Please sign in to comment.