Skip to content

Commit

Permalink
Move most of the existing SMJ code into Java.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 6, 2015
1 parent dfdb93f commit b420a71
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import java.util.Arrays;

import scala.Function1;
import scala.collection.AbstractIterator;
import scala.collection.Iterator;
import scala.math.Ordering;

import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.AbstractScalaRowIterator;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRowConverter;
Expand Down Expand Up @@ -97,7 +97,7 @@ public Iterator<Row> sort(Iterator<Row> inputIterator) throws IOException {
);
}
final UnsafeSorterIterator sortedIterator = sorter.getSortedIterator();
return new AbstractIterator<Row>() {
return new AbstractScalaRowIterator() {

private final int numFields = schema.length();
private final UnsafeRow row = new UnsafeRow();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.joins;

import java.util.NoSuchElementException;
import javax.annotation.Nullable;

import scala.Function1;
import scala.collection.Iterator;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

import org.apache.spark.sql.AbstractScalaRowIterator;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.JoinedRow5;
import org.apache.spark.sql.catalyst.expressions.RowOrdering;
import org.apache.spark.util.collection.CompactBuffer;

/**
* Implements the merge step of sort-merge join.
*/
class SortMergeJoinIterator extends AbstractScalaRowIterator {

private static final ClassTag<Row> ROW_CLASS_TAG = ClassTag$.MODULE$.apply(Row.class);
private final Iterator<Row> leftIter;
private final Iterator<Row> rightIter;
private final Function1<Row, Row> leftKeyGenerator;
private final Function1<Row, Row> rightKeyGenerator;
private final RowOrdering keyOrdering;
private final JoinedRow5 joinRow = new JoinedRow5();

@Nullable private Row leftElement;
@Nullable private Row rightElement;
private Row leftKey;
private Row rightKey;
@Nullable private CompactBuffer<Row> rightMatches;
private int rightPosition = -1;
private boolean stop = false;
private Row matchKey;

public SortMergeJoinIterator(
Iterator<Row> leftIter,
Iterator<Row> rightIter,
Function1<Row, Row> leftKeyGenerator,
Function1<Row, Row> rightKeyGenerator,
RowOrdering keyOrdering) {
this.leftIter = leftIter;
this.rightIter = rightIter;
this.leftKeyGenerator = leftKeyGenerator;
this.rightKeyGenerator = rightKeyGenerator;
this.keyOrdering = keyOrdering;
fetchLeft();
fetchRight();
}

private void fetchLeft() {
if (leftIter.hasNext()) {
leftElement = leftIter.next();
leftKey = leftKeyGenerator.apply(leftElement);
} else {
leftElement = null;
}
}

private void fetchRight() {
if (rightIter.hasNext()) {
rightElement = rightIter.next();
rightKey = rightKeyGenerator.apply(rightElement);
} else {
rightElement = null;
}
}

/**
* Searches the right iterator for the next rows that have matches in left side, and store
* them in a buffer.
*
* @return true if the search is successful, and false if the right iterator runs out of
* tuples.
*/
private boolean nextMatchingPair() {
if (!stop && rightElement != null) {
// run both side to get the first match pair
while (!stop && leftElement != null && rightElement != null) {
final int comparing = keyOrdering.compare(leftKey, rightKey);
// for inner join, we need to filter those null keys
stop = comparing == 0 && !leftKey.anyNull();
if (comparing > 0 || rightKey.anyNull()) {
fetchRight();
} else if (comparing < 0 || leftKey.anyNull()) {
fetchLeft();
}
}
rightMatches = new CompactBuffer<Row>(ROW_CLASS_TAG);
if (stop) {
stop = false;
// Iterate the right side to buffer all rows that match.
// As the records should be ordered, exit when we meet the first record that not match.
while (!stop && rightElement != null) {
rightMatches.$plus$eq(rightElement);
fetchRight();
stop = keyOrdering.compare(leftKey, rightKey) != 0;
}
if (rightMatches.size() > 0) {
rightPosition = 0;
matchKey = leftKey;
}
}
}
return rightMatches != null && rightMatches.size() > 0;
}

@Override
public boolean hasNext() {
return nextMatchingPair();
}

@Override
public Row next() {
if (hasNext()) {
// We are using the buffered right rows and run down left iterator
final Row joinedRow = joinRow.apply(leftElement, rightMatches.apply(rightPosition));
rightPosition += 1;
if (rightPosition >= rightMatches.size()) {
rightPosition = 0;
fetchLeft();
if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) {
stop = false;
rightMatches = null;
}
}
return joinedRow;
} else {
// No more results
throw new NoSuchElementException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

/**
* Shim to allow us to implement [[scala.Iterator]] in Java. Scala 2.11+ has an AbstractIterator
* class for this, but that class is `private[scala]` in 2.10. We need to explicitly fix this to
* `Row` in order to work around a spurious IntelliJ compiler error.
*/
private[spark] abstract class AbstractScalaRowIterator extends Iterator[Row]
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.sql.execution.joins

import java.util.NoSuchElementException

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
import org.apache.spark.util.collection.CompactBuffer

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -63,105 +60,12 @@ case class SortMergeJoin(
val rightResults = right.execute().map(_.copy())

leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
new Iterator[InternalRow] {
// Mutable per row objects.
private[this] val joinRow = new JoinedRow5
private[this] var leftElement: InternalRow = _
private[this] var rightElement: InternalRow = _
private[this] var leftKey: InternalRow = _
private[this] var rightKey: InternalRow = _
private[this] var rightMatches: CompactBuffer[InternalRow] = _
private[this] var rightPosition: Int = -1
private[this] var stop: Boolean = false
private[this] var matchKey: InternalRow = _

// initialize iterator
initialize()

override final def hasNext: Boolean = nextMatchingPair()

override final def next(): InternalRow = {
if (hasNext) {
// we are using the buffered right rows and run down left iterator
val joinedRow = joinRow(leftElement, rightMatches(rightPosition))
rightPosition += 1
if (rightPosition >= rightMatches.size) {
rightPosition = 0
fetchLeft()
if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) {
stop = false
rightMatches = null
}
}
joinedRow
} else {
// no more result
throw new NoSuchElementException
}
}

private def fetchLeft() = {
if (leftIter.hasNext) {
leftElement = leftIter.next()
leftKey = leftKeyGenerator(leftElement)
} else {
leftElement = null
}
}

private def fetchRight() = {
if (rightIter.hasNext) {
rightElement = rightIter.next()
rightKey = rightKeyGenerator(rightElement)
} else {
rightElement = null
}
}

private def initialize() = {
fetchLeft()
fetchRight()
}

/**
* Searches the right iterator for the next rows that have matches in left side, and store
* them in a buffer.
*
* @return true if the search is successful, and false if the right iterator runs out of
* tuples.
*/
private def nextMatchingPair(): Boolean = {
if (!stop && rightElement != null) {
// run both side to get the first match pair
while (!stop && leftElement != null && rightElement != null) {
val comparing = keyOrdering.compare(leftKey, rightKey)
// for inner join, we need to filter those null keys
stop = comparing == 0 && !leftKey.anyNull
if (comparing > 0 || rightKey.anyNull) {
fetchRight()
} else if (comparing < 0 || leftKey.anyNull) {
fetchLeft()
}
}
rightMatches = new CompactBuffer[InternalRow]()
if (stop) {
stop = false
// iterate the right side to buffer all rows that matches
// as the records should be ordered, exit when we meet the first that not match
while (!stop && rightElement != null) {
rightMatches += rightElement
fetchRight()
stop = keyOrdering.compare(leftKey, rightKey) != 0
}
if (rightMatches.size > 0) {
rightPosition = 0
matchKey = leftKey
}
}
}
rightMatches != null && rightMatches.size > 0
}
}
new SortMergeJoinIterator(
leftIter,
rightIter,
leftKeyGenerator,
rightKeyGenerator,
keyOrdering);
}
}
}
Loading

0 comments on commit b420a71

Please sign in to comment.