Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2861] Fields grouping on split streams fails #1387

Closed

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Nov 19, 2015

No description provided.

@mjsax
Copy link
Member Author

mjsax commented Nov 19, 2015

In FlinkTopologyBuilder I had to introduce a token for operator names which is used in WrapperSetupHelper to get the original name of an operator (Flink changes operator names when chaining happens). This is quite hacky. If anybody has a better idea how to solve this, please let me know.

@StephanEwen
Copy link
Contributor

I ran into that issue a while back. The problem was that you emit elements that have the name/tag of the split stream (to use the split selector) and over that, you run the regular partitioner.

Splitting this into two steps should work: Splitting, having a mapper that removes the "split tag" and then doing the field partitioning.

Would be nice to make this nice in Flink, but it is tricky. Not all functions go though a collector, and not all collectors can split.

@mjsax
Copy link
Member Author

mjsax commented Nov 20, 2015

I don't understand you comment? What you describe is exactly how it works (even before this PR). The problem was actually just a missing TypeInformation I added here... However, I introduced this token into the name of an operator here, because the .select(...).map(...) get chained which alters the operator name and I need the original name later on in WrapperSetupHelper -- this part is "hacky". But I am not sure if there is a better solution to it. Look for the new variable FlinkTopologyBuilder.token.

@mjsax
Copy link
Member Author

mjsax commented Nov 20, 2015

Btw: Travis fails due to a instable test -- I already create a JIRA for it.

this.numberOfAttributes, flinkCollector));
}
final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I fixed that in my code base as well...

@mxm
Copy link
Contributor

mxm commented Nov 22, 2015

Maybe a stupid question but why don't you send the operator name along with the BoltWrapper? Then there is no need to extract it from the Flink task name.

@mjsax mjsax force-pushed the flink-2861-fieldsGrouping-SplitStream branch from f31c895 to 7d6b4c7 Compare November 23, 2015 14:31
@mjsax
Copy link
Member Author

mjsax commented Nov 23, 2015

That's actually not a stupid question. I did not do it in the first place to avoid "redundant" code (I was not aware that Flink changes names). I just changed it to your suggestion. If Travis passed, I would merge this. Or are there any objections?

@StephanEwen
Copy link
Contributor

@mjsax Okay, I probably misread the original code. I thought the map() that drops the tag was not there...

@mjsax
Copy link
Member Author

mjsax commented Nov 24, 2015

@StephanEwen In the original code that was actually the case. The BoltWrapper receiving the data was able to extract the tuples by itself. This got changed as I introduces proper generic types in FlinkTopologyBuilder.

Merging this now...

@StephanEwen
Copy link
Contributor

Ah, okay, so I skipped a version by accident ;-) I think that original change could not quite have worked, as it would have propagated the wrong TypeInfos to the ArrayKeySelector and OutputSerializer. The one with map() looks good...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants