Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Feb 9, 2016

This PR support codegen for broadcast outer join.

In order to reduce the duplicated codes, this PR merge HashJoin and HashOuterJoin together (also BroadcastHashJoin and BroadcastHashOuterJoin).

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50961 has finished for PR 11130 at commit 52efe91.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #2527 has finished for PR 11130 at commit 52efe91.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Feb 9, 2016

For query:

val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
sqlContext.range(N).join(dim, (col("id") % 60000) === col("k"), "left").count()

will generate:

/* 001 */
/* 002 */ public Object generate(Object[] references) {
/* 003 */   return new GeneratedIterator(references);
/* 004 */ }
/* 005 */
/* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */
/* 008 */   private Object[] references;
/* 009 */   private boolean agg_initAgg;
/* 010 */   private boolean agg_bufIsNull;
/* 011 */   private long agg_bufValue;
/* 012 */   private org.apache.spark.broadcast.TorrentBroadcast join_broadcast;
/* 013 */   private org.apache.spark.sql.execution.joins.LongArrayRelation join_relation;
/* 014 */   private boolean range_initRange;
/* 015 */   private long range_partitionEnd;
/* 016 */   private long range_number;
/* 017 */   private boolean range_overflow;
/* 018 */   private UnsafeRow agg_result;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 021 */
/* 022 */   public GeneratedIterator(Object[] references) {
/* 023 */     this.references = references;
/* 024 */     agg_initAgg = false;
/* 025 */
/* 026 */
/* 027 */     this.join_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
/* 028 */
/* 029 */     join_relation = (org.apache.spark.sql.execution.joins.LongArrayRelation) join_broadcast.value();
/* 030 */     incPeakExecutionMemory(join_relation.getMemorySize());
/* 031 */
/* 032 */     range_initRange = false;
/* 033 */     range_partitionEnd = 0L;
/* 034 */     range_number = 0L;
/* 035 */     range_overflow = false;
/* 036 */     agg_result = new UnsafeRow(1);
/* 037 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 038 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 039 */   }
/* 042 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 043 */     // initialize aggregation buffer
/* 044 */
/* 045 */     agg_bufIsNull = false;
/* 046 */     agg_bufValue = 0L;
/* 051 */     // initialize Range
/* 052 */     if (!range_initRange) {
/* 053 */       range_initRange = true;
/* 054 */       if (input.hasNext()) {
/* 055 */         initRange(((InternalRow) input.next()).getInt(0));
/* 056 */       } else {
/* 057 */         return;
/* 058 */       }
/* 059 */     }
/* 060 */
/* 061 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 062 */       long range_value = range_number;
/* 063 */       range_number += 1L;
/* 064 */       if (range_number < range_value ^ 1L < 0) {
/* 065 */         range_overflow = true;
/* 066 */       }
/* 067 */
/* 068 */       // generate join key
/* 069 */       /* (input[0, bigint] % 60000) */
/* 070 */       boolean join_isNull = false;
/* 071 */       long join_value = -1L;
/* 072 */       if (false || 60000L == 0) {
/* 073 */         join_isNull = true;
/* 074 */       } else {
/* 076 */         if (false) {
/* 077 */           join_isNull = true;
/* 078 */         } else {
/* 079 */           join_value = (long)(range_value % 60000L);
/* 080 */         }
/* 081 */       }
/* 082 */       // find matches from HashedRelation
/* 083 */       UnsafeRow join_matched = join_isNull ? null: (UnsafeRow)join_relation.getValue(join_value);
/* 084 */
/* 085 */       boolean join_isNull4 = true;
/* 086 */       long join_value4 = -1L;
/* 087 */       if (join_matched != null) {
/* 088 */         /* input[0, bigint] */
/* 089 */         long join_value3 = join_matched.getLong(0);
/* 090 */         join_isNull4 = false;
/* 091 */         join_value4 = join_value3;
/* 092 */       }
/* 093 */
/* 094 */       final boolean join_invalid = true;
/* 095 */       if (!join_invalid) {
/* 096 */         // reset to null
/* 097 */         join_isNull4 = true;
/* 098 */       }
/* 103 */       // do aggregate
/* 104 */       /* (input[0, bigint] + 1) */
/* 105 */       long agg_value1 = -1L;
/* 106 */       agg_value1 = agg_bufValue + 1L;
/* 107 */       // update aggregation buffer
/* 108 */       agg_bufIsNull = false;
/* 109 */       agg_bufValue = agg_value1;
/* 114 */       if (shouldStop()) return;
/* 115 */     }
/* 118 */   }
/* 121 */   private void initRange(int idx) {
/* 146 */   }
/* 149 */   protected void processNext() throws java.io.IOException {
/* 150 */     if (!agg_initAgg) {
/* 151 */       agg_initAgg = true;
/* 152 */       agg_doAggregateWithoutKey();
/* 153 */
/* 154 */       // output the result
/* 157 */       agg_rowWriter.zeroOutNullBytes();
/* 160 */       if (agg_bufIsNull) {
/* 161 */         agg_rowWriter.setNullAt(0);
/* 162 */       } else {
/* 163 */         agg_rowWriter.write(0, agg_bufValue);
/* 164 */       }
/* 165 */       currentRows.add(agg_result.copy());
/* 166 */     }
/* 167 */   }
/* 168 */ }
/* 169 */

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50982 has finished for PR 11130 at commit 9a1f532.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// find the matches from HashedRelation
val matched = ctx.freshName("matched")
val valid = ctx.freshName("invalid")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps renamed to passesFilters or something like that.

@nongli
Copy link
Contributor

nongli commented Feb 10, 2016

I still see initRange() being generated as an empty function. Do we still need this?

@davies
Copy link
Contributor Author

davies commented Feb 10, 2016

@nongli I removed that manually

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #2528 has finished for PR 11130 at commit edbc284.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #2531 has finished for PR 11130 at commit da45df1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Feb 11, 2016

@nongli Could you take another look?

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
	sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@SparkQA
Copy link

SparkQA commented Feb 12, 2016

Test build #51194 has finished for PR 11130 at commit 9b05c7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeansModel(JavaModel, MLWritable, MLReadable):
    • class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol, HasSeed,
    • class BisectingKMeansModel(JavaModel):
    • class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasSeed):
    • class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed,
    • class ALSModel(JavaModel, MLWritable, MLReadable):
    • case class Grouping(child: Expression) extends Expression with Unevaluable
    • case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Unevaluable
    • class ContinuousQueryManager(sqlContext: SQLContext)
    • class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
    • class FileStreamSource(
    • trait HadoopFsRelationProvider extends StreamSourceProvider
    • abstract class ContinuousQueryListener

@davies
Copy link
Contributor Author

davies commented Feb 12, 2016

cc @rxin

hashJoin(streamedIter, hashTable, numOutputRows)

case LeftOuter =>
streamedIter.flatMap(currentRow => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick

streamedIter.flatMap { currentRow =>
  // ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are copied and pasted here, usually don't modify them to reduce the budget of review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Thanks for being considerate. Can you just fix them while you are at it now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it when update this PR.

| $outputCode
| }
""".stripMargin
|// generate join key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate join key -> generate join key for stream side

@SparkQA
Copy link

SparkQA commented Feb 16, 2016

Test build #51329 has finished for PR 11130 at commit 5724180.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 16, 2016

Test build #51333 has finished for PR 11130 at commit 5744941.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Feb 16, 2016

ping @rxin

@davies
Copy link
Contributor Author

davies commented Feb 18, 2016

@rxin @nongli I'm merging this to unblock other codegen/exchange/join work, please take a final look on this, any comments will be addressed by follow-up RPs.

@asfgit asfgit closed this in 95e1ab2 Feb 18, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants