Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12798] [SQL] generated BroadcastHashJoin #10989

Closed
wants to merge 27 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Jan 29, 2016

A row from stream side could match multiple rows on build side, the loop for these matched rows should not be interrupted when emitting a row, so we buffer the output rows in a linked list, check the termination condition on producer loop (for example, Range or Aggregate).

Davies Liu added 18 commits January 20, 2016 16:29
Conflicts:
	sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50409 has finished for PR 10989 at commit f234c21.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
	sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
	sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #50433 has finished for PR 10989 at commit 89614a5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies davies changed the title [SPARK-12798] [SQL] generated BroadcastHashJoin [WIP] [SPARK-12798] [SQL] generated BroadcastHashJoin Jan 30, 2016
@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #50435 has finished for PR 10989 at commit dcf4fdc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #50437 has finished for PR 10989 at commit 1ecce29.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #50438 has finished for PR 10989 at commit 0139fde.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -31,22 +33,20 @@
* TODO: replaced it by batched columnar format.
*/
public class BufferedRowIterator {
protected InternalRow currentRow;
protected LinkedList<InternalRow> currentRows = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

orthogonal to this pr -- my first reaction to this is that maybe we should spend a week or two to convert all operators to a push-based model. Otherwise performance is going to suck big time for some operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a huge topic, let's talk about this offline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @mgaido91 @viirya
When spark.sql.codegen.wholeStage = true, some joins caused OOM, analyzed the dump file, and found that BufferedRowIterator#currentRows holds all matching rows.
If codegen is turned off, it runs just fine, only one matching row is generated each time.
Increasing the executor memory may run successfully, but there is always a probability of failure, because it is not known how many rows of the current key match.

example:

    val value = "x" * 1000 * 1000
    case class TestData(key: Int, value: String)
    val testData = spark.sparkContext.parallelize((1 to 1)
      .map(i => TestData(i, value))).toDF()
    var bigData = testData
    for (_ <- Range(0, 10)) {
      bigData = bigData.union(bigData)
    }
    val testDataX = testData.as("x").selectExpr("key as xkey", "value as xvalue")
    val bigDataY = bigData.as("y").selectExpr("key as ykey", "value as yvalue")
    testDataX.join(bigDataY).where("xkey = ykey").write.saveAsTable("test")

hprof:
image

currently generated code snippet:

protected void processNext() throws java.io.IOException {
    while (findNextInnerJoinRows(smj_leftInput_0, smj_rightInput_0)) {
      scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
      while (smj_iterator_0.hasNext()) {
        InternalRow smj_rightRow_1 = (InternalRow) smj_iterator_0.next();
        append(xxRow.copy());
      }
      if (shouldStop()) return;
    }
}

Is it possible to change to code like this, or is there any other better way?

private scala.collection.Iterator<UnsafeRow> smj_iterator_0;
protected void processNext() throws java.io.IOException {
    if(smj_iterator_0 != null & smj_iterator_0.hasNext) {
        append(xxRow.getRow().copy());
        if(smj_iterator_0.hasNext) {
            smj_iterator_0 = null;
        }
        return;
    }
    while (findNextInnerJoinRows(smj_leftInput_0, smj_rightInput_0)) {
      smj_iterator_0 = smj_matches_0.generateIterator();
      if (smj_iterator_0.hasNext()) {
        append(xxRow.getRow().copy());
        if(smj_iterator_0.hasNext) {
            smj_iterator_0 = null;
        }
        return;
      }
      if (shouldStop()) return;
    }
}

@@ -54,13 +54,27 @@ public void setInput(Iterator<InternalRow> iter) {
}

/**
* Returns whether it should stop processing next row or not.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's "it"? and what's "next row"?

@@ -54,13 +54,27 @@ public void setInput(Iterator<InternalRow> iter) {
}

/**
* Returns whether `processNext()` should stop processing next row from `input` or not.
*/
protected boolean shouldStop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin this seems like it could be used to support limit.

@nongli
Copy link
Contributor

nongli commented Feb 1, 2016

Can you include the generated code?

@SparkQA
Copy link

SparkQA commented Feb 1, 2016

Test build #2486 has finished for PR 10989 at commit c1c0588.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@davies
Copy link
Contributor Author

davies commented Feb 2, 2016

 val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()

/* 001 */
/* 002 */ public Object generate(Object[] references) {
/* 003 */   return new GeneratedIterator(references);
/* 004 */ }
/* 005 */
/* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */
/* 008 */   private Object[] references;
/* 009 */   private boolean agg_initAgg0;
/* 010 */   private boolean agg_bufIsNull1;
/* 011 */   private long agg_bufValue2;
/* 012 */   private org.apache.spark.broadcast.TorrentBroadcast broadcasthashjoin_broadcast6;
/* 013 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation broadcasthashjoin_relation7;
/* 014 */   private boolean range_initRange8;
/* 015 */   private long range_partitionEnd9;
/* 016 */   private long range_number10;
/* 017 */   private boolean range_overflow11;
/* 018 */   private UnsafeRow broadcasthashjoin_result19;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder broadcasthashjoin_holder20;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter broadcasthashjoin_rowWriter21;
/* 021 */   private UnsafeRow agg_result37;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder38;
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter39;
/* 024 */
/* 025 */   private void initRange(int idx) {
/* 050 */   }
/* 051 */
/* 052 */
/* 053 */   private void agg_doAggregateWithoutKey5() throws java.io.IOException {
/* 054 */     // initialize aggregation buffer

/* 057 */     agg_bufIsNull1 = false;
/* 058 */     agg_bufValue2 = 0L;

/* 063 */     // initialize Range
/* 064 */     if (!range_initRange8) {
/* 065 */       range_initRange8 = true;
/* 066 */       if (input.hasNext()) {
/* 067 */         initRange(((InternalRow) input.next()).getInt(0));
/* 068 */       } else {
/* 069 */         return;
/* 070 */       }
/* 071 */     }
/* 072 */
/* 073 */     while (!range_overflow11 && range_number10 < range_partitionEnd9) {
/* 074 */       long range_value12 = range_number10;
/* 075 */       range_number10 += 1L;
/* 076 */       if (range_number10 < range_value12 ^ 1L < 0) {
/* 077 */         range_overflow11 = true;
/* 078 */       }
/* 079 */
/* 080 */       // generate join key
/* 084 */       broadcasthashjoin_rowWriter21.zeroOutNullBytes();
/* 089 */       boolean broadcasthashjoin_isNull13 = false;
/* 090 */       long broadcasthashjoin_value14 = -1L;
/* 091 */       if (false || 60000L == 0) {
/* 092 */         broadcasthashjoin_isNull13 = true;
/* 093 */       } else {
/* 094 */         /* input[0, bigint] */
/* 095 */
/* 096 */         if (false) {
/* 097 */           broadcasthashjoin_isNull13 = true;
/* 098 */         } else {
/* 099 */           broadcasthashjoin_value14 = (long)(range_value12 % 60000L);
/* 100 */         }
/* 101 */       }
/* 102 */       if (broadcasthashjoin_isNull13) {
/* 103 */         broadcasthashjoin_rowWriter21.setNullAt(0);
/* 104 */       } else {
/* 105 */         broadcasthashjoin_rowWriter21.write(0, broadcasthashjoin_value14);
/* 106 */       }
/* 107 */
/* 108 */
/* 109 */       // find matches from HashRelation
/* 110 */       org.apache.spark.util.collection.CompactBuffer broadcasthashjoin_matches23 = broadcasthashjoin_result19.anyNull() ? null : (org.apache.spark.util.collection.CompactBuffer) broadcasthashjoin_relation7.get(broadcasthashjoin_result19);
/* 111 */       if (broadcasthashjoin_matches23 != null) {
/* 112 */         int broadcasthashjoin_size25 = broadcasthashjoin_matches23.size();
/* 113 */         for (int broadcasthashjoin_i24 = 0; broadcasthashjoin_i24 < broadcasthashjoin_size25; broadcasthashjoin_i24++) {
/* 114 */           UnsafeRow broadcasthashjoin_row26 = (UnsafeRow) broadcasthashjoin_matches23.apply(broadcasthashjoin_i24);
/* 115 */           /* input[0, bigint] */
/* 116 */           long broadcasthashjoin_value28 = broadcasthashjoin_row26.getLong(0);
/* 121 */           // do aggregate
/* 127 */           long agg_value30 = -1L;
/* 128 */           agg_value30 = agg_bufValue2 + 1L;
/* 129 */           // update aggregation buffer
/* 130 */
/* 131 */           agg_bufIsNull1 = false;
/* 132 */           agg_bufValue2 = agg_value30;
/* 136 */         }
/* 137 */       }
/* 138 */
/* 139 */
/* 140 */       if (shouldStop()) return;
/* 141 */     }
/* 144 */   }
/* 145 */
/* 146 */
/* 147 */   public GeneratedIterator(Object[] references) {
/* 148 */     this.references = references;
/* 149 */     agg_initAgg0 = false;
/* 150 */
/* 151 */
/* 152 */     this.broadcasthashjoin_broadcast6 = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
/* 153 */
/* 154 */     broadcasthashjoin_relation7 = (org.apache.spark.sql.execution.joins.UnsafeHashedRelation) broadcasthashjoin_broadcast6.value();
/* 155 */     incPeakExecutionMemory(broadcasthashjoin_relation7.getUnsafeSize());
/* 156 */
/* 157 */     range_initRange8 = false;
/* 158 */     range_partitionEnd9 = 0L;
/* 159 */     range_number10 = 0L;
/* 160 */     range_overflow11 = false;
/* 161 */     broadcasthashjoin_result19 = new UnsafeRow(1);
/* 162 */     this.broadcasthashjoin_holder20 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(broadcasthashjoin_result19, 0);
/* 163 */     this.broadcasthashjoin_rowWriter21 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(broadcasthashjoin_holder20, 1);
/* 164 */     agg_result37 = new UnsafeRow(1);
/* 165 */     this.agg_holder38 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result37, 0);
/* 166 */     this.agg_rowWriter39 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder38, 1);
/* 167 */   }
/* 168 */
/* 169 */   protected void processNext() throws java.io.IOException {
/* 170 */
/* 171 */     if (!agg_initAgg0) {
/* 172 */       agg_initAgg0 = true;
/* 173 */       agg_doAggregateWithoutKey5();
/* 174 */
/* 175 */       // output the result
/* 179 */       agg_rowWriter39.zeroOutNullBytes();
/* 183 */       if (agg_bufIsNull1) {
/* 184 */         agg_rowWriter39.setNullAt(0);
/* 185 */       } else {
/* 186 */         agg_rowWriter39.write(0, agg_bufValue2);
/* 187 */       }
/* 188 */       currentRows.add(agg_result37.copy());
/* 190 */     }
/* 192 */   }
/* 193 */ }
/* 194 */

}

$thisPlan.updatePeakMemory($hashMapTerm);
incPeakExecutionMemory($hashMapTerm.getPeakMemoryUsedBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we bake the peak memory usage into hashMapTerm.free()? This seems like something we'll forget to do.

@SparkQA
Copy link

SparkQA commented Feb 2, 2016

Test build #50587 has finished for PR 10989 at commit 4d75022.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -389,14 +379,16 @@ case class TungstenAggregate(
UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
$outputCode

if (shouldStop()) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document the required behavior of shouldStop(). How does it need to behave so that the clean up below (hashMapTerm.free()) is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once shouldStop() returns true, the caller should exit the loop (via return).

map.free() is called only when it had consumed all the items in the loop (without return).

Will added these to the doc string of shouldStop().

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #2496 has finished for PR 10989 at commit 4d75022.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Davies Liu added 2 commits February 2, 2016 22:22
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@davies
Copy link
Contributor Author

davies commented Feb 3, 2016

After rebased to master:

/* 001 */
/* 002 */ public Object generate(Object[] references) {
/* 003 */   return new GeneratedIterator(references);
/* 004 */ }
/* 005 */
/* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */
/* 008 */   private Object[] references;
/* 009 */   private boolean agg_initAgg;
/* 010 */   private boolean agg_bufIsNull;
/* 011 */   private long agg_bufValue;
/* 012 */   private org.apache.spark.broadcast.TorrentBroadcast broadcasthashjoin_broadcast;
/* 013 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation broadcasthashjoin_relation;
/* 014 */   private boolean range_initRange;
/* 015 */   private long range_partitionEnd;
/* 016 */   private long range_number;
/* 017 */   private boolean range_overflow;
/* 018 */   private UnsafeRow broadcasthashjoin_result;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder broadcasthashjoin_holder;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter broadcasthashjoin_rowWriter;
/* 021 */   private UnsafeRow agg_result;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 024 */
/* 025 */   public GeneratedIterator(Object[] references) {
/* 026 */     this.references = references;
/* 027 */     agg_initAgg = false;
/* 028 */
/* 029 */
/* 030 */     this.broadcasthashjoin_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
/* 031 */
/* 032 */     broadcasthashjoin_relation = (org.apache.spark.sql.execution.joins.UnsafeHashedRelation) broadcasthashjoin_broadcast.value();
/* 033 */     incPeakExecutionMemory(broadcasthashjoin_relation.getUnsafeSize());
/* 034 */
/* 035 */     range_initRange = false;
/* 036 */     range_partitionEnd = 0L;
/* 037 */     range_number = 0L;
/* 038 */     range_overflow = false;
/* 039 */     broadcasthashjoin_result = new UnsafeRow(1);
/* 040 */     this.broadcasthashjoin_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(broadcasthashjoin_result, 0);
/* 041 */     this.broadcasthashjoin_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(broadcasthashjoin_holder, 1);
/* 042 */     agg_result = new UnsafeRow(1);
/* 043 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 044 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 045 */   }
/* 046 */
/* 047 */
/* 048 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 049 */     // initialize aggregation buffer
/* 050 */
/* 051 */     agg_bufIsNull = false;
/* 052 */     agg_bufValue = 0L;
/* 053 */
/* 054 */
/* 055 */
/* 056 */
/* 057 */     // initialize Range
/* 058 */     if (!range_initRange) {
/* 059 */       range_initRange = true;
/* 060 */       if (input.hasNext()) {
/* 061 */         initRange(((InternalRow) input.next()).getInt(0));
/* 062 */       } else {
/* 063 */         return;
/* 064 */       }
/* 065 */     }
/* 066 */
/* 067 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 068 */       long range_value = range_number;
/* 069 */       range_number += 1L;
/* 070 */       if (range_number < range_value ^ 1L < 0) {
/* 071 */         range_overflow = true;
/* 072 */       }
/* 073 */
/* 074 */       // generate join key
/* 075 */
/* 076 */
/* 077 */
/* 078 */       broadcasthashjoin_rowWriter.zeroOutNullBytes();
/* 079 */
/* 080 */       /* (input[0, bigint] % 60000) */
/* 081 */       boolean broadcasthashjoin_isNull = false;
/* 082 */       long broadcasthashjoin_value = -1L;
/* 083 */       if (false || 60000L == 0) {
/* 084 */         broadcasthashjoin_isNull = true;
/* 085 */       } else {
/* 086 */
/* 087 */         if (false) {
/* 088 */           broadcasthashjoin_isNull = true;
/* 089 */         } else {
/* 090 */           broadcasthashjoin_value = (long)(range_value % 60000L);
/* 091 */         }
/* 092 */       }
/* 093 */       if (broadcasthashjoin_isNull) {
/* 094 */         broadcasthashjoin_rowWriter.setNullAt(0);
/* 095 */       } else {
/* 096 */         broadcasthashjoin_rowWriter.write(0, broadcasthashjoin_value);
/* 097 */       }
/* 098 */
/* 099 */
/* 100 */       // find matches from HashRelation
/* 101 */       org.apache.spark.util.collection.CompactBuffer broadcasthashjoin_matches = broadcasthashjoin_result.anyNull() ? null : (org.apache.spark.util.collection.CompactBuffer) broadcasthashjoin_relation.get(broadcasthashjoin_result);
/* 102 */       if (broadcasthashjoin_matches != null) {
/* 103 */         int broadcasthashjoin_size = broadcasthashjoin_matches.size();
/* 104 */         for (int broadcasthashjoin_i = 0; broadcasthashjoin_i < broadcasthashjoin_size; broadcasthashjoin_i++) {
/* 105 */           UnsafeRow broadcasthashjoin_row = (UnsafeRow) broadcasthashjoin_matches.apply(broadcasthashjoin_i);
/* 106 */           /* input[0, bigint] */
/* 107 */           long broadcasthashjoin_value3 = broadcasthashjoin_row.getLong(0);
/* 108 */
/* 109 */
/* 110 */
/* 111 */
/* 112 */           // do aggregate
/* 113 */           /* (input[0, bigint] + 1) */
/* 114 */           long agg_value1 = -1L;
/* 115 */           agg_value1 = agg_bufValue + 1L;
/* 116 */           // update aggregation buffer
/* 117 */           agg_bufIsNull = false;
/* 118 */           agg_bufValue = agg_value1;
/* 119 */
/* 120 */
/* 121 */         }
/* 122 */       }
/* 123 */
/* 124 */
/* 125 */       if (shouldStop()) return;
/* 126 */     }
/* 127 */
/* 128 */
/* 129 */   }
/* 130 */
/* 131 */
/* 132 */   private void initRange(int idx) {
/* 133 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 134 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 135 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10485760L);
/* 136 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 137 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 138 */
/* 139 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 140 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 141 */       range_number = Long.MAX_VALUE;
/* 142 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 143 */       range_number = Long.MIN_VALUE;
/* 144 */     } else {
/* 145 */       range_number = st.longValue();
/* 146 */     }
/* 147 */
/* 148 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 149 */     .multiply(step).add(start);
/* 150 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 151 */       range_partitionEnd = Long.MAX_VALUE;
/* 152 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 153 */       range_partitionEnd = Long.MIN_VALUE;
/* 154 */     } else {
/* 155 */       range_partitionEnd = end.longValue();
/* 156 */     }
/* 157 */   }
/* 158 */
/* 159 */
/* 160 */   protected void processNext() throws java.io.IOException {
/* 161 */     if (!agg_initAgg) {
/* 162 */       agg_initAgg = true;
/* 163 */       agg_doAggregateWithoutKey();
/* 164 */
/* 165 */       // output the result
/* 166 */
/* 167 */
/* 168 */       agg_rowWriter.zeroOutNullBytes();
/* 169 */
/* 170 */
/* 171 */       if (agg_bufIsNull) {
/* 172 */         agg_rowWriter.setNullAt(0);
/* 173 */       } else {
/* 174 */         agg_rowWriter.write(0, agg_bufValue);
/* 175 */       }
/* 176 */       currentRows.add(agg_result.copy());
/* 177 */     }
/* 178 */   }
/* 179 */ }
/* 180 */

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #50650 has finished for PR 10989 at commit e0c8c65.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #2509 has finished for PR 10989 at commit e0c8c65.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Feb 3, 2016

LGTM

@davies
Copy link
Contributor Author

davies commented Feb 3, 2016

Merging this into master.

@asfgit asfgit closed this in c4feec2 Feb 3, 2016
| $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm);
| if ($matches != null) {
| int $size = $matches.size();
| for (int $i = 0; $i < $size; $i++) {
Copy link
Contributor

@cloud-fan cloud-fan Oct 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a strong reason that we can't interrupt this loop. We can make i a global variable for example.

I don't mean to change anything, but just to verify my understanding. Also cc @hvanhovell @viirya @mgaido91 @rednaxelafx

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmh... this code seems rather outdated...I couldn't find it in the current codebase. Anyway, I don't understand why you want to interrupt it. AFAIU, this is generating the result from all the matches of a row, hence if we interrupt it somehow we would end up returning a wrong result (in the result we would omit some rows...).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, yeah, this code is changed a lot since this PR, looks like at that moment this BroadcastHashJoin only supports inner join. I also don't really get the idea to interrupt this loop early, as looks like we need to go through all matched rows here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the code change a lot but we still generate loops for broadcast join.

This PR made BufferedRowIterator.currentRow to BufferedRowIterator.currentRows, to store result rows instead of a single row. If we can interrupt the loop and can still run it in the next call of processNext, we can still keep a single result row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmh, maybe I see your point now. I think it may be feasible but a bit complex. We might keep a global variable for $matches and read from it in the produce method. This is what you are saying right? Just changing here wouldn't work IMHO because in the next iteration the keys are changed...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then we gotta to keep matches as global status instead of local one, so we can go over remaining matched rows in next iterations. And we shouldn't get next row from streaming side but use previous row from that side. It can make a single result row without buffering all matched rows into currentRows, though it might need to add some complexity into the generated code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants