Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4460] Side Outputs in Flink #3484

Closed
wants to merge 11 commits into from

Conversation

aljoscha
Copy link
Contributor

@aljoscha aljoscha commented Mar 7, 2017

This is a refinement of #2982 by @chenqin.

I changed the API a bit, added support for side outputs to ProcessFunction, enabled side outputs to work with chaining, added proper Scala API and a Scala API test and added documentation.

R: @uce @kl0u and @chenqin for review, please

@aljoscha aljoscha force-pushed the finish-pr-2982-side-outputs-cp branch 2 times, most recently from 1b70ee5 to 8c5ac2e Compare March 7, 2017 15:37
Copy link
Contributor

@chenqin chenqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments, LGTM.
+@uce @kl0u for second opinion

* <pre>{@code
* static final OutputTag<X> sideOutputTag = new OutputTag<X>("side-output") {};
*
* public void flatMap(X value, Collector<String> out) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments seems out of date, I think we already decided to get ride of CollectorWrapper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing

@@ -85,6 +86,7 @@
private Set<Integer> sources;
private Set<Integer> sinks;
private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might consider use addVirtualSideOutputNode and virtualSideOutputNodes. Unless we want to refactor move away from current assumption <IN>operator<OUT> to <<tag1,IN1>...<tagX,INX> operator <<taga,OUTa>...<tagx,OUTX>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is already called addVirtualSideOutputNode(). I'm adjusting the name of the field. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

int virtualId = upStreamVertexID;
upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
if (outputTag == null) {
// selections that happen downstream override earlier selections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may consider call out this behavior in getSideOutput comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is a leftover from copying this code from split/select. For side outputs it can't happen that you have multiple "selects" after one another. Will remove the comment. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me!

@@ -60,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.collect.Iterables;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think introduce this dependency is good idea or bad idea? Up to you :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'm changing this to simply have two loops.

I think you introduced this in the first place, though. 😉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds right, good catch!
Thanks for fixing!

Copy link
Contributor

@chenqin chenqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@chenqin chenqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@chenqin chenqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@chenqin chenqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@chenqin chenqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no objection from @uce @kl0u, please land this diff

@@ -60,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.collect.Iterables;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds right, good catch!
Thanks for fixing!

@uce
Copy link
Contributor

uce commented Mar 9, 2017

Unfortunately, I won't have time to look over this PR this week. Thanks for pinging me @aljoscha @chenqin.

// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if(isSkippedElement && lateDataOutputTag != null && isLate(element)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenqin and @aljoscha I am starting to review the PR and I was wondering when is this new isLate() check needed? At least for the out-of-box window assigners, this seems to be a redundant check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kl0u Good catch!

I put isLate there with intention to filter out dropped events with other reasons which I may not aware of. lateArrivingEvents is really late arriving and dropped events.

@aljoscha If that is redundant check, we might just remove isLate.
What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this again. I think it doesn't hurt to have it because it catches the case when a WindowAssigner doesn't assign any windows. In that case an element is also "skipped" but it is not necessarily considered late. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a test for the behaviour with a "weird" WindowAssigner.

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some comments, most on code style and duplicate code. There is one that raises some correctness concerns. It is in the OutputTag class, in the equals method, the second point.

This review was just the initial one. I may have some additional comments.

this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos);
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove one of the 2 empty lines.

* @param id The id of the created {@code OutputTag}.
*/
public OutputTag(String id) {
Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need both lines with the checks. We can just have:

this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");

Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
this.id = requireNonNull(id);

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for line breaking:
TypeHint<T> typeHint = new TypeHint<T>(OutputTag.class, this, 0) {};

*/
public OutputTag(String id, TypeInformation<T> typeInfo) {
this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
this.typeInfo =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for line breaking.


@Override
public boolean equals(Object obj) {
return obj instanceof OutputTag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two points:

  1. we cannot have this.id == null or (OutputTag) obj).id == null because we check at the constructor, so this method can be simplified.
  2. we never check for uniqueness of the outputTag.id. We should do it at the translation. This is also a correctness issue as this may result in undesired sideoutput "collisions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have liked to include the TypeInformation into the check but we can't do that because it's transient. I'll try and figure something out for checking that side outputs are unique, not as easy as it seems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. The problem is that if this does not work, then we can have important side effects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still the first comment applies: the equals can be simplified given that id != null.

@@ -1528,14 +1572,16 @@ public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
LATENESS,
lateOutputTag);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong alignment

@@ -1618,14 +1664,16 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong alignment

@@ -1702,15 +1754,16 @@ public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
LATENESS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong alignment

@@ -53,5 +54,11 @@ public void collect(StreamRecord<T> record) {
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature can go on the same line

@@ -40,6 +41,12 @@ public void collect(StreamRecord<T> record) {
}

@Override
public <X> void collect(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature fits in one line

@aljoscha
Copy link
Contributor Author

Thanks @kl0u for the (already) quite thorough review! I'll push a commit with fixes.

@aljoscha aljoscha force-pushed the finish-pr-2982-side-outputs-cp branch from 3c521b2 to efefb83 Compare March 11, 2017 07:53
@aljoscha
Copy link
Contributor Author

@kl0u @chenqin I cleaned up the commits, distributed the fixes from the comments to the right commits. I also added more tests/ITCases for: detecting name clashes in side output IDs, side outputs with multiple consumers.

@kl0u
Copy link
Contributor

kl0u commented Mar 11, 2017

Thanks @aljoscha I will have a look on Monday.

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @aljoscha . I did a first pass which did not include the translation part of the StreamGraph. I will continue with this part now.

So far I had some minor comments and one more important at the CopyingChainingOutput.pushToOperator().


@Override
public boolean equals(Object obj) {
return obj instanceof OutputTag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still the first comment applies: the equals can be simplified given that id != null.

* into the side output with the given {@link OutputTag}.
*
* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the ) and the {

* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){
sideOutputTag = clean(sideOutputTag);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to not reuse the argument variable but create a new one.

}

requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requireNotNull should be in the beginning of the method.

* connected to downstream operations.
*
* @param <T> The type of the elements that result from this {@code SideOutputTransformation}
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we do not check if the input is null (we do it in the caller method only) but we try get the parallelism. We could have the parallelism as a separate argument, and then, after the super() check if the input is null.
This makes the code of the class self-contained as you do not have to check other classes to see if the input can be null or not. What do you think?

* @param element The element to check
* @return The element for which should be considered when sideoutputs
*/
protected boolean isLate(StreamRecord<IN> element){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used any more, right? So it can be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must have removed the check by accident. I think we agreed to rename this to something more meaningful and keep it, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I remember as well.

outSerializer = upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(), taskEnvironment.getUserClassLoader());
} else {
// main output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can become one line.


pushToOperator(record);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can become private as the copying alternative has its own implementation, right?

pushToOperator(record);
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can become private, as before.

operator.processElement(copy);
}
catch (Exception e) {
operator.processElement(castRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be copy, not castRecord.

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aljoscha I finished my review. I had some comments.

* We need to create an {@link OutputTag} so that we can reference it when emitting
* data to a side output and also to retrieve the side output stream from an operation.
*/
static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we add a side output but we do nothing to show that it works. Probably we can add a prefix "rejected-" to the record and print it, so that the user can see what the side output does.

this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to add the outputTag also in the edgeId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the edge id exactly does and who uses it so I prefer to not touch it, for now.

@@ -333,32 +373,39 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
new ArrayList<String>());
new ArrayList<String>(), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null should go to the next line for uniformity.

@@ -63,6 +66,7 @@
private TypeSerializer<?> typeSerializerIn1;
private TypeSerializer<?> typeSerializerIn2;
private TypeSerializer<?> typeSerializerOut;
private Map<OutputTag<?>, TypeSerializer<?>> typeSerializerMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used anywhere in the code. Can it be removed, along with the getTypeSerializerOut() and setTypeSerializerOut()?

@aljoscha aljoscha force-pushed the finish-pr-2982-side-outputs-cp branch from efefb83 to 62cc5ee Compare March 17, 2017 16:19
@aljoscha
Copy link
Contributor Author

Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm now waiting for travis to give the green light and then I'll merge.

@chenqin A lot of thanks also to you for working on this and pushing it with me! 😃

@aljoscha aljoscha force-pushed the finish-pr-2982-side-outputs-cp branch from 62cc5ee to 20d8d67 Compare March 17, 2017 20:32
chenqin and others added 3 commits March 17, 2017 21:36
This does not yet allow users to emit to side outputs in user functions.
Only operators (StreamOperator) can emit to side outputs. A side output
can be retrieved on a SingleOutputStreamOperator.
We use side outputs to emit dropped late data.
@aljoscha aljoscha force-pushed the finish-pr-2982-side-outputs-cp branch from 20d8d67 to d0eef93 Compare March 17, 2017 20:36
@aljoscha aljoscha closed this Mar 18, 2017
@aljoscha aljoscha deleted the finish-pr-2982-side-outputs-cp branch March 18, 2017 07:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants