From 075ef762edc62f7ca81625321160ec2ab7be34a2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 7 Jul 2015 16:39:00 -0700 Subject: [PATCH] fix codegen with BroadcastHashJion --- .../spark/sql/execution/joins/BroadcastHashOuterJoin.scala | 3 +-- .../org/apache/spark/sql/execution/joins/HashOuterJoin.scala | 2 +- .../org/apache/spark/sql/execution/joins/HashedRelation.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index 5da04c78744d9..6a7b83fe52194 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -78,8 +78,7 @@ case class BroadcastHashOuterJoin( // Note that we use .execute().collect() because we don't want to convert data to Scala types val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() // buildHashTable uses code-generated rows as keys, which are not serializable - val hashed = - buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output)) + val hashed = buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output)) sparkContext.broadcast(hashed) }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 886b5fa0c5103..234f52a7b9c73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -170,7 +170,7 @@ override def outputPartitioning: Partitioning = joinType match { var existingMatchList = hashTable.get(rowKey) if (existingMatchList == null) { existingMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, existingMatchList) + hashTable.put(rowKey.copy(), existingMatchList) } existingMatchList += currentRow.copy() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index e18c817975134..93abf94d9c96a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -124,7 +124,7 @@ private[joins] object HashedRelation { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { val newMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, newMatchList) + hashTable.put(rowKey.copy(), newMatchList) newMatchList } else { keyIsUnique = false