Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32421][SQL] Add code-gen for shuffled hash join #29277

Closed
wants to merge 7 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Jul 28, 2020

What changes were proposed in this pull request?

Adding codegen for shuffled hash join. Shuffled hash join codegen is very similar to broadcast hash join codegen. So most of code change is to refactor existing codegen in BroadcastHashJoinExec to HashJoin.

Example codegen for query in JoinBenchmark:

  def shuffleHashJoin(): Unit = {
    val N: Long = 4 << 20
    withSQLConf(
      SQLConf.SHUFFLE_PARTITIONS.key -> "2",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
      SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
      codegenBenchmark("shuffle hash join", N) {
        val df1 = spark.range(N).selectExpr(s"id as k1")
        val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
        val df = df1.join(df2, col("k1") === col("k2"))
        assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
        df.noop()
      }
    }
  }

Shuffled hash join codegen:

== Subtree 3 / 3 (maxMethodCodeSize:113; maxConstantPoolSize:126(0.19% used); numInnerClasses:0) ==
*(3) ShuffledHashJoin [k1#2L], [k2#6L], Inner, BuildRight
:- *(1) Project [id#0L AS k1#2L]
:  +- *(1) Range (0, 4194304, step=1, splits=1)
+- *(2) Project [(id#4L * 3) AS k2#6L]
   +- *(2) Range (0, 1398101, step=1, splits=1)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage3(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=3
/* 006 */ final class GeneratedIteratorForCodegenStage3 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input_0;
/* 010 */   private org.apache.spark.sql.execution.joins.HashedRelation shj_relation_0;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] shj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 012 */
/* 013 */   public GeneratedIteratorForCodegenStage3(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input_0 = inputs[0];
/* 021 */     shj_relation_0 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* plan */).buildHashedRelation(inputs[1]);
/* 022 */     shj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 023 */
/* 024 */   }
/* 025 */
/* 026 */   private void shj_doConsume_0(InternalRow inputadapter_row_0, long shj_expr_0_0) throws java.io.IOException {
/* 027 */     // generate join key for stream side
/* 028 */
/* 029 */     // find matches from HashRelation
/* 030 */     scala.collection.Iterator shj_matches_0 = false ?
/* 031 */     null : (scala.collection.Iterator)shj_relation_0.get(shj_expr_0_0);
/* 032 */     if (shj_matches_0 != null) {
/* 033 */       while (shj_matches_0.hasNext()) {
/* 034 */         UnsafeRow shj_matched_0 = (UnsafeRow) shj_matches_0.next();
/* 035 */         {
/* 036 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1);
/* 037 */
/* 038 */           long shj_value_1 = shj_matched_0.getLong(0);
/* 039 */           shj_mutableStateArray_0[0].reset();
/* 040 */
/* 041 */           shj_mutableStateArray_0[0].write(0, shj_expr_0_0);
/* 042 */
/* 043 */           shj_mutableStateArray_0[0].write(1, shj_value_1);
/* 044 */           append((shj_mutableStateArray_0[0].getRow()).copy());
/* 045 */
/* 046 */         }
/* 047 */       }
/* 048 */     }
/* 049 */
/* 050 */   }
/* 051 */
/* 052 */   protected void processNext() throws java.io.IOException {
/* 053 */     while ( inputadapter_input_0.hasNext()) {
/* 054 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 055 */
/* 056 */       long inputadapter_value_0 = inputadapter_row_0.getLong(0);
/* 057 */
/* 058 */       shj_doConsume_0(inputadapter_row_0, inputadapter_value_0);
/* 059 */       if (shouldStop()) return;
/* 060 */     }
/* 061 */   }
/* 062 */
/* 063 */ }

Broadcast hash join codegen for the same query (for reference here):

== Subtree 2 / 2 (maxMethodCodeSize:280; maxConstantPoolSize:218(0.33% used); numInnerClasses:0) ==
*(2) BroadcastHashJoin [k1#2L], [k2#6L], Inner, BuildRight, false
:- *(2) Project [id#0L AS k1#2L]
:  +- *(2) Range (0, 4194304, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#22]
   +- *(1) Project [(id#4L * 3) AS k2#6L]
      +- *(1) Range (0, 1398101, step=1, splits=1)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     range_taskContext_0 = TaskContext.get();
/* 027 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */
/* 032 */     bhj_relation_0 = ((org.apache.spark.sql.execution.joins.LongHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcast */).value()).asReadOnlyCopy();
/* 033 */     incPeakExecutionMemory(bhj_relation_0.estimatedSize());
/* 034 */
/* 035 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 036 */
/* 037 */   }
/* 038 */
/* 039 */   private void initRange(int idx) {
/* 040 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 041 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 042 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(4194304L);
/* 043 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 044 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 045 */     long partitionEnd;
/* 046 */
/* 047 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 048 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 049 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 050 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 051 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 052 */     } else {
/* 053 */       range_nextIndex_0 = st.longValue();
/* 054 */     }
/* 055 */     range_batchEnd_0 = range_nextIndex_0;
/* 056 */
/* 057 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 058 */     .multiply(step).add(start);
/* 059 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 060 */       partitionEnd = Long.MAX_VALUE;
/* 061 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 062 */       partitionEnd = Long.MIN_VALUE;
/* 063 */     } else {
/* 064 */       partitionEnd = end.longValue();
/* 065 */     }
/* 066 */
/* 067 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 068 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 069 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 070 */     if (range_numElementsTodo_0 < 0) {
/* 071 */       range_numElementsTodo_0 = 0;
/* 072 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 073 */       range_numElementsTodo_0++;
/* 074 */     }
/* 075 */   }
/* 076 */
/* 077 */   private void bhj_doConsume_0(long bhj_expr_0_0) throws java.io.IOException {
/* 078 */     // generate join key for stream side
/* 079 */
/* 080 */     // find matches from HashedRelation
/* 081 */     UnsafeRow bhj_matched_0 = false ? null: (UnsafeRow)bhj_relation_0.getValue(bhj_expr_0_0);
/* 082 */     if (bhj_matched_0 != null) {
/* 083 */       {
/* 084 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 085 */
/* 086 */         long bhj_value_2 = bhj_matched_0.getLong(0);
/* 087 */         range_mutableStateArray_0[3].reset();
/* 088 */
/* 089 */         range_mutableStateArray_0[3].write(0, bhj_expr_0_0);
/* 090 */
/* 091 */         range_mutableStateArray_0[3].write(1, bhj_value_2);
/* 092 */         append((range_mutableStateArray_0[3].getRow()));
/* 093 */
/* 094 */       }
/* 095 */     }
/* 096 */
/* 097 */   }
/* 098 */
/* 099 */   protected void processNext() throws java.io.IOException {
/* 100 */     // initialize Range
/* 101 */     if (!range_initRange_0) {
/* 102 */       range_initRange_0 = true;
/* 103 */       initRange(partitionIndex);
/* 104 */     }
/* 105 */
/* 106 */     while (true) {
/* 107 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 108 */         long range_nextBatchTodo_0;
/* 109 */         if (range_numElementsTodo_0 > 1000L) {
/* 110 */           range_nextBatchTodo_0 = 1000L;
/* 111 */           range_numElementsTodo_0 -= 1000L;
/* 112 */         } else {
/* 113 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 114 */           range_numElementsTodo_0 = 0;
/* 115 */           if (range_nextBatchTodo_0 == 0) break;
/* 116 */         }
/* 117 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 118 */       }
/* 119 */
/* 120 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 121 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 122 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 123 */
/* 124 */         bhj_doConsume_0(range_value_0);
/* 125 */
/* 126 */         if (shouldStop()) {
/* 127 */           range_nextIndex_0 = range_value_0 + 1L;
/* 128 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 129 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 130 */           return;
/* 131 */         }
/* 132 */
/* 133 */       }
/* 134 */       range_nextIndex_0 = range_batchEnd_0;
/* 135 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 136 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 137 */       range_taskContext_0.killTaskIfInterrupted();
/* 138 */     }
/* 139 */   }
/* 140 */
/* 141 */ }

Why are the changes needed?

Codegen shuffled hash join can help save CPU cost. We added shuffled hash join codegen internally in our fork, and seeing obvious improvement in benchmark compared to current non-codegen code path.

Test example query in JoinBenchmark, seeing 30% wall clock time improvement compared to existing non-codegen code path:

Enable shuffled hash join code-gen:

Running benchmark: shuffle hash join
  Running case: shuffle hash join wholestage off
  Stopped after 2 iterations, 1358 ms
  Running case: shuffle hash join wholestage on
  Stopped after 5 iterations, 2323 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join wholestage off                    649            679          43          6.5         154.7       1.0X
shuffle hash join wholestage on                     436            465          45          9.6         103.9       1.5X

Disable shuffled hash join codegen:

Running benchmark: shuffle hash join
  Running case: shuffle hash join wholestage off
  Stopped after 2 iterations, 1345 ms
  Running case: shuffle hash join wholestage on
  Stopped after 5 iterations, 2967 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join wholestage off                    646            673          37          6.5         154.1       1.0X
shuffle hash join wholestage on                     549            594          47          7.6         130.9       1.2X

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in WholeStageCodegenSuite.

@c21
Copy link
Contributor Author

c21 commented Jul 28, 2020

cc @cloud-fan and @sameeragarwal if you guys can help take a look, thanks!

Comment on lines 135 to 136
relationTerm = ctx.addMutableState(
"org.apache.spark.sql.execution.joins.HashedRelation", "relation", forceInline = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you already use mutable state for the hashed relation here, why don't just follow BroadcastHashJoinExec to call buildHashedRelation at prepareRelation and set it to mutable state? Then BroadcastHashJoinExec and ShuffledHashJoinExec look more consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - BroadcastHashJoinExec needs to broadcast build side in prepareBroadcast(). I feel it's hard to refactor there. Wondering do you have any idea to make it cleaner?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same feeling with @viirya .

Can we broadcast build side in BroadcastHashJoinExec.prepareRelation?

Copy link
Contributor

@cloud-fan cloud-fan Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always call prepareRelation in doProduce as the first step

  override def doProduce(ctx: CodegenContext): String = {
    prepareRelation(ctx) // which sets 2 vars: relationTerm and isUniqueKey
    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may not understand the real problem. What's wrong with

  override def doProduce(ctx: CodegenContext): String = {
    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
  }

  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    joinType match {
      case _: InnerLike => codegenInner(ctx, input)
      case LeftOuter | RightOuter => codegenOuter(ctx, input)
      case LeftSemi => codegenSemi(ctx, input)
      case LeftAnti => codegenAnti(ctx, input)
      case j: ExistenceJoin => codegenExistence(ctx, input)
      case x =>
        throw new IllegalArgumentException(
          s"BroadcastHashJoin should not take $x as the JoinType")
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan and @viirya -

Updated code with following change:

(1). ShuffledHashJoinExec.prepareRelation will do buildRelation to build hash map.
(1).BroadcastHashJoinExec.prepareRelation will do prepareBroadcastto broadcast build side and build hash map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan

re #29277 (comment):

wondering do you think keeping two vars would look much better, than the current approach for calling prepareRelation separately in each codegenInner/codegenOuter/... ? If yes, I can make the change accordingly, thanks.

re #29277 (comment):

By design, doConsume() generates code for processing one input row. BroadcastHashJoinExec can do codegen work in doConsume() with only stream side input, because it just broadcasts executing its build side query plan, and generates per-row-processing codegen for stream side in doConsume().

However, ShuffledHashJoinExec cannot do codegen work in doConsume() with stream and build side input, because it needs to first read all build side input rows and build a hash map, before processing each row from stream side input. We cannot generate code in doConsume() with simply a pair of stream and build side input row. Similar to SortMergeJoinExec, where it needs to stream only one row in one side, and buffer the other side for multiple rows. ShuffledHashJoinExec has to generate code in doProduce(), and its children have to do codegen separately in their own iterator classes.

|$isNull = $row.isNullAt($i);
|$value = $isNull ? $defaultValue : ($valueCode);
""".stripMargin
val leftVarsDecl =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you remove left concept, we better clean up these leftXXX variables too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sorry for missing this, done.

* the variables should be declared separately from accessing the columns, we can't use the
* codegen of BoundReference here.
*/
protected def createVars(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original createLeftVars is created to defer accessing of row fields after condition evaluation. But I look at the usage of this createVars in HashJoin, I don't see such thing. If you don't do defer there, you can simply use BoundReference codegen, it is much simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - I was actually originally using BoundReference, but got compilation error with variable redefinition. E.g., for code branch c21:codegen-fail (with change to BoundReference compare to this PR), and example query in JoinBenchmark.

The BoundReference version generated code like this:

while (shj_streamedInput_0.hasNext()) {
  shj_streamedRow_0 = (InternalRow) shj_streamedInput_0.next();
  
  // generate join key for stream side
  long shj_value_0 = shj_streamedRow_0.getLong(0); // 1st definition here
  
  // find matches from HashRelation
  scala.collection.Iterator shj_matches_0 = false ? null : (scala.collection.Iterator)shj_relation_0.get(shj_value_0);
  while (shj_matches_0.hasNext()) {
    UnsafeRow shj_matched_0 = (UnsafeRow) shj_matches_0.next();
    long shj_value_0 = shj_streamedRow_0.getLong(0); // 2nd definition here and compilation error
    shj_mutableStateArray_0[0].write(0, shj_value_0);           
  }
}

So basically the variable shj_value_0 here (stream side key ) needs to be defer accessed in HashJoin.consume(), and it was first accessed in HashJoin.genStreamSideJoinKey(). So it seems that BoundReference not work for me out of box.

Let me know if it makes sense or there would be any other better approach, thanks.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126737 has finished for PR 29277 at commit bb9994a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -316,6 +318,387 @@ trait HashJoin extends BaseJoinExec {
resultProj(r)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use PR comments to highlight the real changes? Seems most of the diff is just moving code around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry about that. yes mostly of them is moving code around without change. Highlighted change with comments, thanks.

val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")

protected override def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

codegenAnti is changed to keep NULL-aware anti join separately here, and move other logic to HashJoin.codegenAnti.

* Generates the code for left semi join.
*/
private def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = {
protected override def prepareRelation(ctx: CodegenContext): (String, Boolean) = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new method prepareRelation is added to call prepareBroadcast() and get to know whether the key is known to be unique during codegen time.

@@ -23,6 +23,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all change in ShuffledHashJoinExec here are real change, not refactoring.

* the variables should be declared separately from accessing the columns, we can't use the
* codegen of BoundReference here.
*/
protected def createVars(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createVars is copied from SortMergeJoinExec.createLeftVars() to be usable from SortMergeJoinExec and ShuffledHashJoinExec for generating code for stream side input.

* Returns the code for generating join key for stream side, and expression of whether the key
* has any null in it or not.
*/
protected def genStreamSideJoinKey(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is copied from BroadcastHashJoinExec without change.

val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")

if (keyIsKnownUnique) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added keyIsKnownUnique to support unique-key code path for BroadcastHashJoinExec.

val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")

if (keyIsKnownUnique) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added keyIsKnownUnique to support unique-key code path for BroadcastHashJoinExec.

val resultVar = input ++ Seq(ExprCode.forNonNullValue(
JavaCode.variable(existsVar, BooleanType)))

if (keyIsKnownUnique) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added keyIsKnownUnique to support unique-key code path for BroadcastHashJoinExec.

@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
// The children of SortMergeJoin should do codegen separately.
j.withNewChildren(j.children.map(
child => InputAdapter(insertWholeStageCodegen(child))))
case j: ShuffledHashJoinExec =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

codegen children of ShuffledHashJoinExec separately same as SortMergeJoinExec.

@@ -316,6 +318,387 @@ trait HashJoin extends BaseJoinExec {
resultProj(r)
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry about that. yes mostly of them is moving code around without change. Highlighted change with comments, thanks.

@SparkQA
Copy link

SparkQA commented Jul 29, 2020

Test build #126741 has finished for PR 29277 at commit 7fbd3a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


val thisPlan = ctx.addReferenceObj("plan", this)
val (relationTerm, _) = prepareRelation(ctx)
val buildRelation = s"$relationTerm = $thisPlan.buildHashedRelation($buildInput);"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not include it when we create the relationTerm in prepareRelation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry if we include buildRelation inside prepareRelation, how do we use buildRelation in final code-gen code? Do you mean creating a private var to keep buildRelation after prepareRelation is called?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like

  protected def prepareRelation(ctx: CodegenContext): String = {
    val thisPlan = ctx.addReferenceObj("plan", this)
    val clsName = classOf[HashedRelation].getName

    ctx.addMutableState(clsName, "relation", v =>
      s"""
         | $v = $thisPlan.buildHashedRelation(inputs[1]);
       """.stripMargin, forceInline = true)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we don't need the if (!$initRelation) stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - yes I got your point. Updated to do initialization in prepareRelation, thanks.

@apache apache deleted a comment from c21 Jul 29, 2020
@c21
Copy link
Contributor Author

c21 commented Jul 30, 2020

@cloud-fan - updated the PR with addressing comments, and it is ready for review. Also updated the PR description for latest codegen code of example query. Thanks.

@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
// The children of SortMergeJoin should do codegen separately.
j.withNewChildren(j.children.map(
child => InputAdapter(insertWholeStageCodegen(child))))
case j: ShuffledHashJoinExec =>
// The children of ShuffledHashJoin should do codegen separately.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and SortMergeJoin. I actually re-think about it and then figure out, because codegen related code rarely changes recently. It would be nice to add more comments here to explain it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sure, wondering what kind of wording you are expecting here? does it look better with:

// The children of ShuffledHashJoin should do codegen separately,
// because codegen for ShuffledHashJoin depends on more than one row
// from the build side input.
// The children of SortMergeJoin should do codegen separately,
// because codegen for SortMergeJoin depends on more than one row
// from the buffer side input.

@SparkQA
Copy link

SparkQA commented Jul 30, 2020

Test build #126796 has finished for PR 29277 at commit a15af30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

s"""
|while ($streamedInput.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly what will be generated by streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this). See InputAdapter.doProduce. I don't understand why we can't put the join part in doConsume.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: https://github.com/apache/spark/pull/29277/files#diff-db4ffe4f0196a9d7cf1f04c350ee3381R124

We actually build the relation in the class constructor, the codegen flow should be the same with broadcast hash join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry I got your point now, updated to do join part in doConsume() now. Moved BHJ's doProduce() and doConsume() into HashJoin, to be shared between BHJ and SHJ. Also updated PR with latest codegen code for example query. Thanks for suggestion.

@cloud-fan
Copy link
Contributor

retest this please

1 similar comment
@c21
Copy link
Contributor Author

c21 commented Jul 30, 2020

retest this please

@@ -903,6 +904,10 @@ case class CollapseCodegenStages(
// The children of SortMergeJoin should do codegen separately.
j.withNewChildren(j.children.map(
child => InputAdapter(insertWholeStageCodegen(child))))
case j: ShuffledHashJoinExec =>
// The children of ShuffledHashJoin should do codegen separately.
j.withNewChildren(j.children.map(
Copy link
Member

@viirya viirya Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this now. ShuffledHashJoin now does codegen like BroadcastHashJoin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - I don't think we can remove this. We have to do shuffled hash join codegen separately, as we have a hardcoded dependency for build side input input[1] when building relation. This can go wrong if we have multiple shuffled hash join in one query.

E.g.

  test("ShuffledHashJoin should be included in WholeStageCodegen") {
    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30",
        SQLConf.SHUFFLE_PARTITIONS.key -> "2",
        SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
      val df1 = spark.range(5).select($"id".as("k1"))
      val df2 = spark.range(15).select($"id".as("k2"))
      val df3 = spark.range(6).select($"id".as("k3"))
      val twoJoinsDF = df1.join(df2, $"k1" === $"k2").join(df3, $"k1" === $"k3")
    }
  }

If we don't codegen shuffled hash join children separately, we will get something like:

/* 018 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 019 */     partitionIndex = index;
/* 020 */     this.inputs = inputs;
/* 021 */     inputadapter_input_0 = inputs[0];
/* 022 */     shj_relation_0 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* plan */).buildHashedRelation(inputs[1]);
/* 023 */     shj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 024 */     shj_relation_1 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[2] /* plan */).buildHashedRelation(inputs[1]);
/* 025 */     shj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 026 */
/* 027 */   }

shj_relation_0 and shj_relation_1 will try to build hash relation on same input (but shouldn't), as the input[1] is hardcoded there. On the other hand, I couldn't think of an alternative way not to hardcode input[1] here in codegen. Let me know if you have any better options. Thanks. I also updated WholeStageCodegenSuite.scala to have a unit test for this kind of multiple joins query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we only need to do it for the build side?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, @viirya - if we only codegen separately for build side, we would still have the same problem as above for multiple SHJs right? Essentially we would fuse multiple stream sides codegen together in one codegen method, so we will have multiple build side initialized in init(), and naming collision as above. Let me know if it doesn't make sense, or I can create a counter example here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sounds good, non-trivial for me now as well. Will try to resolve it in the future. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And there are more problems if we have many shuffle hash join stay together. We need to accumulate the CodegenSupport.inputRDDs, but WholeStageCodegenExec only supports up to 2 input RDDs for now.

Copy link
Contributor Author

@c21 c21 Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to accumulate the CodegenSupport.inputRDDs, but WholeStageCodegenExec only supports up to 2 input RDDs for now.

Yes. Agreed. SortMergeJoinExec took the decision to do codegen for children separately, it's just simpler without getting into these limitations.

Copy link
Member

@viirya viirya Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we only codegen separately for build side, we should not build hash relation using inputs[1] anymore. It will work similarly like BroadcastHashJoin, we just build hash relation using buildPlan. And ShuffledHashJoinExec has only one input RDD now, like BroadcastHashJoin's inputRDDs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it still has some problems. Will think about it if I have more time.

Comment on lines 84 to 87
/**
* Returns a tuple of variable name for HashedRelation,
* and boolean false to indicate key not to be known unique in code-gen time.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the comment to HashJoin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sure, updated.

@viirya
Copy link
Member

viirya commented Jul 30, 2020

@c21 Jenkins seems not working now. You can sync with master to fix the SparkR issue in GitHub Actions. We can also rely on GitHub Actions to test.

@c21
Copy link
Contributor Author

c21 commented Jul 30, 2020

Jenkins seems not working now. You can sync with master to fix the SparkR issue in GitHub Actions. We can also rely on GitHub Actions to test.

@viirya - thanks for tips, rebased to master now.

@cloud-fan
Copy link
Contributor

Regarding the comment from @viirya, I've one last comment: #29277 (comment)

Otherwise, I think it's good to go.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126813 has finished for PR 29277 at commit ac64864.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in ae82768 Jul 31, 2020
@c21
Copy link
Contributor Author

c21 commented Jul 31, 2020

Thank you @cloud-fan and @viirya for review and discussion!

@cloud-fan
Copy link
Contributor

@c21 thanks for your great work! If you want to work on the followup , please let me know. I can create a JIRA ticket and we can discuss there.

@c21
Copy link
Contributor Author

c21 commented Jul 31, 2020

@cloud-fan - thanks for discussion. Creating https://issues.apache.org/jira/browse/SPARK-32505 as followup task, and I will take a shot. Thanks.

@c21 c21 deleted the codegen branch July 31, 2020 16:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants