-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3063][SQL] ExistingRdd should convert Map to catalyst Map. #1963
Conversation
QA tests have started for PR 1963 at commit
|
LGTM. |
QA tests have started for PR 1963 at commit
|
QA tests have finished for PR 1963 at commit
|
QA tests have started for PR 1963 at commit
|
QA tests have finished for PR 1963 at commit
|
QA tests have started for PR 1963 at commit
|
QA tests have finished for PR 1963 at commit
|
Jenkins, test this please. |
QA tests have started for PR 1963 at commit
|
QA tests have finished for PR 1963 at commit
|
Thanks! I've merged this to master and 1.1. |
Currently `ExistingRdd.convertToCatalyst` doesn't convert `Map` value. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #1963 from ueshin/issues/SPARK-3063 and squashes the following commits: 3ba41f2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 4d7bae2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 9321379 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 d8a900a [Takuya UESHIN] Make ExistingRdd.convertToCatalyst be able to convert Map value. (cherry picked from commit 6b5584e) Signed-off-by: Michael Armbrust <michael@databricks.com>
Currently `ExistingRdd.convertToCatalyst` doesn't convert `Map` value. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#1963 from ueshin/issues/SPARK-3063 and squashes the following commits: 3ba41f2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 4d7bae2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 9321379 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 d8a900a [Takuya UESHIN] Make ExistingRdd.convertToCatalyst be able to convert Map value.
…alue support to Parquet. JIRA: - https://issues.apache.org/jira/browse/SPARK-3036 - https://issues.apache.org/jira/browse/SPARK-3037 Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`: ``` message root { optional group a (MAP) { repeated group map (MAP_KEY_VALUE) { required int32 key; optional int32 value; } } } ``` for `ArrayType` when `containsNull` is `true`: ``` message root { optional group a (LIST) { repeated group bag { optional int32 array; } } } ``` We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues. Notice: This PR is based on #1963 and #1889. Please check them first. /cc marmbrus, yhuai Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits: 4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet. 013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet. 62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037 8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037 (cherry picked from commit 727cb25) Signed-off-by: Michael Armbrust <michael@databricks.com>
…alue support to Parquet. JIRA: - https://issues.apache.org/jira/browse/SPARK-3036 - https://issues.apache.org/jira/browse/SPARK-3037 Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`: ``` message root { optional group a (MAP) { repeated group map (MAP_KEY_VALUE) { required int32 key; optional int32 value; } } } ``` for `ArrayType` when `containsNull` is `true`: ``` message root { optional group a (LIST) { repeated group bag { optional int32 array; } } } ``` We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues. Notice: This PR is based on #1963 and #1889. Please check them first. /cc marmbrus, yhuai Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits: 4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet. 013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet. 62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037 8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037
Currently `ExistingRdd.convertToCatalyst` doesn't convert `Map` value. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#1963 from ueshin/issues/SPARK-3063 and squashes the following commits: 3ba41f2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 4d7bae2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 9321379 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 d8a900a [Takuya UESHIN] Make ExistingRdd.convertToCatalyst be able to convert Map value.
…alue support to Parquet. JIRA: - https://issues.apache.org/jira/browse/SPARK-3036 - https://issues.apache.org/jira/browse/SPARK-3037 Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`: ``` message root { optional group a (MAP) { repeated group map (MAP_KEY_VALUE) { required int32 key; optional int32 value; } } } ``` for `ArrayType` when `containsNull` is `true`: ``` message root { optional group a (LIST) { repeated group bag { optional int32 array; } } } ``` We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues. Notice: This PR is based on apache#1963 and apache#1889. Please check them first. /cc marmbrus, yhuai Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits: 4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet. 013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet. 62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037 8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed. Closes #30818 from maropu/BugfixInAQE. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed. Closes #30818 from maropu/BugfixInAQE. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 51ef443) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed. Closes #30818 from maropu/BugfixInAQE. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 51ef443) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. This is a backport PR for #30818. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed with AQE enabled. Closes #30830 from maropu/SPARK-33822-BRANCH3.0. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ict (apache#1963) ### What changes were proposed in this pull request? If spark.sql.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled is true, change KeyGroupedPartitioning.satisfies0(distribution) check from all clustering keys (here, join keys) being in partition keys, to the two sets overlapping. ### Why are the changes needed? If spark.sql.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled is true, then SPJ no longer triggers if there are more join keys than partition keys. But SPJ is supported in this case if flag is false. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? -Added tests in KeyGroupedPartitioningSuite Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
Currently
ExistingRdd.convertToCatalyst
doesn't convertMap
value.