Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2576] Add Outer Join operator to Optimizer and APIs #1138

Closed
wants to merge 12 commits into from

Conversation

jkovacs
Copy link
Contributor

@jkovacs jkovacs commented Sep 16, 2015

This PR implements FLINK-2576 (Adding the outer join operator to the optimizer and Java/Scala APIs, previously part of FLINK-2106).
For reference, the revious pull requests for the outer join implementation were #907 and #1052.

First of all thanks for the help we received in person and on the mailing list.
I designed the API as per the consensus on the mailing list and tried reusing as much code from the join operator api as possible.

This PR contributes the following:

  • An OuterJoinNode to the optimizer, and 3 Sort Merge OuterJoinDescriptors for each type of outer join
  • One outer join base operator
  • left/righ/fullOuterJoin() methods to the Java and Scala APIs
    • Including some updates to the join javadocs in Java/Scala APIs
  • Refactorings where necessary (mostly concerned with being able to reuse inner join operator code)
  • Specifically refactoring of the JoinOperator in the Java API:
    • Added JoinType property, identifying inner/left-/right-/full outer join
    • Removed PlanXUnwrappingJoinOperator classes, instead promoting the TupleXUnwrappingJoiners to be able to reuse the existing unwrapping logic
    • Added inner class JoinOperatorBaseBuilder to be able to transparently construct a base operator for all types of joins, as well as tuple unwrapping of left and right inputs
    • Make sure the user can't compile a default join plan for outer joins, as well as make projection joins work with outer joins (see below)
  • End to end integration tests for the outer join operator using the Java and Scala APIs in flink-tests

Usage & Implementation:
In both APIs we prohibit using the default join functionality for outer joins. The user is required
to specify a custom join function that combines the (potentially null) left and right side tuples.
In the Java API we support the projection join functionality for outer joins. (Projection joins are not yet implemented in the Scala API for inner joins, therefore no changes there.)
Important to note is that when the user performs a projection join, the type information is lost.
This is also the case for the inner projection join. Additionally, we explicitly "downgrade" the result type information of an outer projection join to a Tuple of GenericTypeInfo<>(Object.class), in order to be able to serialize null values.
A nicer way to do this would be to use an Optional<T> type to represent nullable tuple values, but because we can't rely on Java 8 types, nor did I want to hardcode a dependency to a 3rd party Optional type (e.g. from guava) into the API, we went this route, for now.

@jkovacs jkovacs changed the title Feature/flink 2576 [FLINK-2576] Add Outer Join operator to Optimizer and APIs Sep 16, 2015
@fhueske
Copy link
Contributor

fhueske commented Sep 17, 2015

@jkovacs and @r-pogalz, thank you very much for this PR and the detailed description!
It's quite a bit of code so it will take some time to be reviewed. I hope to give feedback soon.

Nonetheless, we can start a discussion about the handling of projection for outer joins. By changing the type information to GenericTypeInfo<TupleX> to support tuples with null values, a DataSet<TupleX> cannot be used (in a join, groupBy, reduce, ...) as before because the runtime will use completely different serializers and comparators. Therefore, I am more in favor of not supporting projection for outer joins.

@jkovacs
Copy link
Contributor Author

jkovacs commented Sep 18, 2015

Thanks @fhueske, that's a good point I haven't considered.

Another idea that occurred to me was to convert the result tuple types to GenericTypeInfo<T> (instead of GenericTypeInfo<Object>), where T is the original type of the tuple field (e.g. String or Integer). This would be null safe and would allow the user to group by those fields, assuming of course they are sure that the fields are non-null (e.g. on a left or right outer join).
Although I'm not sure of all the consequences of using, say, GenericTypeInfo<String> instead of BasicTypeInfo<String> for serialization and comparison.

I pushed this change as jkovacs@f682baa to a different branch to test it

Also rebased branch onto current master and resolved conflicts (Failing test is some YARN integration test).

@jkovacs
Copy link
Contributor Author

jkovacs commented Sep 18, 2015

To partly answer my own question: One big drawback of downgrading the tuple field types to GenericTypeInfo is that for (de)serialization and comparison the generic Kryo serializers will be used, which are significantly slower than the native flink serializers and comparators for basic types, such as Integer (according to this blog post).

One obvious way to work around this is to only downgrade the fields that are actually nullable, and keep the original types of the definitely non-null fields (i.e. the types from the outer side of a left or right outer join). This way the user can still group/join/sort efficiently on the non-null fields, while preserving null safety for the other fields.

I pushed another commit for this to my temporary branch for review, if this makes sense: jkovacs/flink@feature/FLINK-2576...jkovacs:feature/FLINK-2576-projection-types

As you can see I was really hoping to make the projection joins work properly :-) but if you feel that the effort isn't worth it or I'm missing something else entirely, we can for sure simply scrap that and throw an InvalidProgramException when the user tries to do a project outer join instead of defining his own join udf. Opinions on that are welcome.

@fhueske
Copy link
Contributor

fhueske commented Sep 23, 2015

Hi @jkovacs, thanks for all your efforts to make the projection work. Going for a GenericeTypeInfo would work in many cases but unfortunately not in all. For example union operates in Flink on serialization level and requires that all data sets which are unioned use the same serializer. By transparently using a GenericTypeInfo users might be surprised why DataSet<Tuple2<String,Long>.union(DataSet<Tuple2<String,Long>) does not work. If we only support OuterJoins with an explicit JoinFunction, the user has full control how to deal with null values and can even use a custom Tuple type or Tuple serializer (via Operator.returns()) that supports null values. In my opinion, the best approach is to only support OuterJoins with JoinFunctions.

@StephanEwen
Copy link
Contributor

Agreed with Fabian. For now, let's require join functions.

Future work would be to use Tuples with Options in Scala. In Java, we should probably add an option type as well (and teach the TypeExtractor to use them). Java core adds an Option type only in Java 8, unfortunately.

We could add one for Java 7 and deprecate it later.

//no more elements
return false;
}
} else if (currLeftIterator.hasNext() && !currRightIterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might yield NPE, if currLeftIterator == null && currRightIterator != null && currRightIterator.hasNext().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically true, but I believe from the control flow that scenario is impossible, since either both iterators get reassigned something non-null at the same time, or both remain null and the method returns false (no more elements). @r-pogalz can you confirm that or can we make this more explicit?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkovacs is right, it is not possible that currLeftIterator or currRightIterator are null at this point, as they are just wrappers around the subsets and always get assigned. In case that a subset is null and hasNext() is called, the wrapper will return false.
There are also tests in OuterJoinOperatorBaseTest which cover the cases where one side of the outer join is empty.

@fhueske
Copy link
Contributor

fhueske commented Sep 25, 2015

We have a couple of unit tests to check the correctness of the API, i.e., check that valid use is working and invalid use throws an early exceptions. See for example org.apache.flink.api.java.operator.JoinOperatorTest. It would be good to have such unit tests as well for outer joins.

}

@Test(expected = InvalidProgramException.class)
public void testDefaultJoin() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should be done as a unit test (as mentioned in my other comment).

@fhueske
Copy link
Contributor

fhueske commented Sep 25, 2015

As I said in my previous comments, I would prefer to skip support for projection joins initially.
Instead, it would be good, if we could ensure by API design that an outer join is always completed with an with(JoinFunction) call. One way to do this would be to return a special OuterJoinOperatorSetsPredicate object when .where() is called. This OuterJoinOperatorSetsPredicate does not return a JoinOperator when equalTo() is called but an unfinished outer join that only allows to call with(). That way, the regular join API would remain stable. Or do you have a better idea to model the API in the right way?

If we don't allow DefaultJoin and ProjectJoin for outer joins, we can also revert the corresponding changes.

@fhueske
Copy link
Contributor

fhueske commented Sep 25, 2015

Hi @jkovacs and @r-pogalz,

really good work! I left a few comments inline but overall the PR is in a pretty good shape. Please let me know, if you have questions or would like to discuss some of the comments I made.

Have a good weekend, Fabian

@jkovacs
Copy link
Contributor Author

jkovacs commented Oct 2, 2015

Thanks @fhueske and @StephanEwen for the comprehensive review and additional details on Flink internals!, I agree that we should rather wait to implement the projection join correctly at a later point.
I'll append a few commits addressing the review comments and squash them later into the appropriate commits when you feel it's ready to merge.

@fhueske
Copy link
Contributor

fhueske commented Oct 9, 2015

Thanks for the update @jkovacs and @r-pogalz. Very good work!
I will go ahead, try this PR, and merge it :-)

fhueske pushed a commit to fhueske/flink that referenced this pull request Oct 9, 2015
@fhueske
Copy link
Contributor

fhueske commented Oct 9, 2015

Looks good. I added one commit to restore binary compatibility. The code is not super nice, but it allows to runt programs which have been previously compiled without the need to recompile. We can still clean up the code later if we decide to do so. Final tests are running, will merge after they passed.

@asfgit asfgit closed this in b00c1d7 Oct 9, 2015
@fhueske
Copy link
Contributor

fhueske commented Oct 9, 2015

Oh, just realized we did not update the documentation.
I will open a JIRA for that and add it later today.

@fhueske
Copy link
Contributor

fhueske commented Oct 9, 2015

Thank you very much @jkovacs and @r-pogalz for adding outer joins to Flink!

cfmcgrady pushed a commit to cfmcgrady/flink that referenced this pull request Oct 23, 2015
lofifnc pushed a commit to lofifnc/flink that referenced this pull request Oct 23, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants