Skip to content

Commit

Permalink
[FLINK-2576] [javaAPI] [scalaAPI] Restored binary compatibility for D…
Browse files Browse the repository at this point in the history
…ataSet (inner) join.
  • Loading branch information
fhueske committed Oct 9, 2015
1 parent b00c1d7 commit 1272cd5
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 228 deletions.
80 changes: 40 additions & 40 deletions flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
Expand Up @@ -63,7 +63,7 @@
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.join.InnerJoinOperatorSets;
import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
Expand All @@ -75,7 +75,7 @@
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.operators.join.JoinOperatorSets;
import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;
import org.apache.flink.api.java.operators.join.JoinType;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -729,17 +729,17 @@ public UnsortedGrouping<T> groupBy(String... fields) {
* {@link DataSet DataSets} on key equality and provides multiple ways to combine
* joining elements into one DataSet.<br/>
*
* This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
* This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
* can be called to define the join key of the first joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
* @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
* @return A JoinOperatorSets to continue the definition of the Join transformation.
*
* @see InnerJoinOperatorSets
* @see JoinOperatorSets
* @see DataSet
*/
public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other) {
return new InnerJoinOperatorSets<T, R>(this, other);
public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
return new JoinOperatorSets<T, R>(this, other);
}

/**
Expand All @@ -748,19 +748,19 @@ public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other) {
* {@link DataSet DataSets} on key equality and provides multiple ways to combine
* joining elements into one DataSet.<br/>
*
* This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
* This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
* can be called to define the join key of the first joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
* @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
* optimizer will pick the join strategy.
* @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
* @return A JoinOperatorSets to continue the definition of the Join transformation.
*
* @see InnerJoinOperatorSets
* @see JoinOperatorSets
* @see DataSet
*/
public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy) {
return new InnerJoinOperatorSets<T, R>(this, other, strategy);
public <R> JoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSets<T, R>(this, other, strategy);
}

/**
Expand All @@ -770,18 +770,18 @@ public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy)
* joining elements into one DataSet.<br/>
* This method also gives the hint to the optimizer that the second DataSet to join is much
* smaller than the first one.<br/>
* This method returns a {@link InnerJoinOperatorSets} on which
* {@link InnerJoinOperatorSets#where(String...)} needs to be called to define the join key of the first
* This method returns a {@link JoinOperatorSets} on which
* {@link JoinOperatorSets#where(String...)} needs to be called to define the join key of the first
* joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
* @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
* @return A JoinOperatorSets to continue the definition of the Join transformation.
*
* @see InnerJoinOperatorSets
* @see JoinOperatorSets
* @see DataSet
*/
public <R> InnerJoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
return new InnerJoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_SECOND);
public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
return new JoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_SECOND);
}

/**
Expand All @@ -791,17 +791,17 @@ public <R> InnerJoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
* joining elements into one DataSet.<br/>
* This method also gives the hint to the optimizer that the second DataSet to join is much
* larger than the first one.<br/>
* This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
* This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
* can be called to define the join key of the first joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see InnerJoinOperatorSets
* @see JoinOperatorSets
* @see DataSet
*/
public <R> InnerJoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
return new InnerJoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_FIRST);
public <R> JoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
return new JoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_FIRST);
}

/**
Expand All @@ -816,11 +816,11 @@ public <R> InnerJoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
* @param other The other DataSet with which this DataSet is joined.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
* @see org.apache.flink.api.java.operators.join.JoinOperatorSetsBase
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other) {
return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.LEFT_OUTER);
public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other) {
return new JoinOperatorSetsBase<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.LEFT_OUTER);
}

/**
Expand All @@ -837,11 +837,11 @@ public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other) {
* optimizer will pick the join strategy.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
* @see org.apache.flink.api.java.operators.join.JoinOperatorSetsBase
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSets<>(this, other, strategy, JoinType.LEFT_OUTER);
public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
}

/**
Expand All @@ -856,11 +856,11 @@ public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other, JoinHint strat
* @param other The other DataSet with which this DataSet is joined.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
* @see org.apache.flink.api.java.operators.join.JoinOperatorSetsBase
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other) {
return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.RIGHT_OUTER);
public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other) {
return new JoinOperatorSetsBase<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.RIGHT_OUTER);
}

/**
Expand All @@ -877,11 +877,11 @@ public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other) {
* optimizer will pick the join strategy.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
* @see org.apache.flink.api.java.operators.join.JoinOperatorSetsBase
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSets<>(this, other, strategy, JoinType.RIGHT_OUTER);
public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
}

/**
Expand All @@ -896,11 +896,11 @@ public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other, JoinHint stra
* @param other The other DataSet with which this DataSet is joined.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
* @see org.apache.flink.api.java.operators.join.JoinOperatorSetsBase
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> fullOuterJoin(DataSet<R> other) {
return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.FULL_OUTER);
public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other) {
return new JoinOperatorSetsBase<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.FULL_OUTER);
}

/**
Expand All @@ -917,11 +917,11 @@ public <R> JoinOperatorSets<T, R> fullOuterJoin(DataSet<R> other) {
* optimizer will pick the join strategy.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
* @see org.apache.flink.api.java.operators.join.JoinOperatorSetsBase
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> fullOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSets<>(this, other, strategy, JoinType.FULL_OUTER);
public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
}


Expand Down
Expand Up @@ -42,10 +42,12 @@
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;
import org.apache.flink.api.java.operators.join.JoinType;
import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
Expand Down Expand Up @@ -842,6 +844,132 @@ protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?>
// throw new UnsupportedOperationException("RightSemiJoin operator currently not supported.");
// }
// }

/**
* Intermediate step of a Join transformation. <br/>
* To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
* {@link JoinOperatorSets#where(int...)} or
* {@link JoinOperatorSets#where(org.apache.flink.api.java.functions.KeySelector)}.
*
* @param <I1> The type of the first input DataSet of the Join transformation.
* @param <I2> The type of the second input DataSet of the Join transformation.
*/
public static final class JoinOperatorSets<I1, I2> extends JoinOperatorSetsBase<I1, I2> {

public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2) {
super(input1, input2);
}

public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint) {
super(input1, input2, hint);
}

/**
* {@inheritDoc}
*
* @return An incomplete Join transformation.
* Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
* {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
* to continue the Join.
*/
@Override
public JoinOperatorSetsPredicate where(int... fields) {
return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
}

/**
* {@inheritDoc}
*
* @return An incomplete Join transformation.
* Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
* {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
* to continue the Join.
*/
@Override
public JoinOperatorSetsPredicate where(String... fields) {
return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
}

/**
* {@inheritDoc}
*
* @return An incomplete Join transformation.
* Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
* {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
* to continue the Join.
*/
@Override
public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<>(keySelector, input1.getType(), keyType));
}


/**
* Intermediate step of a Join transformation. <br/>
* To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
* {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
* {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}.
*/
public class JoinOperatorSetsPredicate extends JoinOperatorSetsPredicateBase {

private JoinOperatorSetsPredicate(Keys<I1> keys1) {
super(keys1);
}

/**
* Continues a Join transformation and defines the {@link Tuple} fields of the second join
* {@link DataSet} that should be used as join keys.<br/>
* <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
* <p/>
* The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
* the element of the first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
* @return A DefaultJoin that represents the joined DataSet.
*/
@Override
public DefaultJoin<I1, I2> equalTo(int... fields) {
return createDefaultJoin(new Keys.ExpressionKeys<>(fields, input2.getType()));
}

/**
* Continues a Join transformation and defines the fields of the second join
* {@link DataSet} that should be used as join keys.<br/>
* <p/>
* The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
* the element of the first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* @param fields The fields of the second join DataSet that should be used as keys.
* @return A DefaultJoin that represents the joined DataSet.
*/
@Override
public DefaultJoin<I1, I2> equalTo(String... fields) {
return createDefaultJoin(new Keys.ExpressionKeys<>(fields, input2.getType()));
}

/**
* Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>
* The KeySelector function is called for each element of the second DataSet and extracts a single
* key value on which the DataSet is joined. </br>
* <p/>
* The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
* the element of the first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined.
* @return A DefaultJoin that represents the joined DataSet.
*/
@Override
public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
return createDefaultJoin(new Keys.SelectorFunctionKeys<>(keySelector, input2.getType(), keyType));
}
}
}


// --------------------------------------------------------------------------------------------
// default join functions
Expand Down

0 comments on commit 1272cd5

Please sign in to comment.