-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming Scala API #275
Streaming Scala API #275
Conversation
d54d226
to
ca2af20
Compare
Looks great, @gyfora. The windowed join on named fields of case classes is simply beautiful. I like the project structure, adding the streaming scala code under flink-scala. Another somewhat viable option could be to open a flink-streaming-scala project (and rename flink-streaming-core to f-s-java, or even have separate f-s-core and f-s-java projects). Would like to merge it soon, because it not only includes the streaming scala API but some much needed refactor of our java API. Also would like to try out the scala API by implementing the examples with a couple of volunteers. The streaming examples are fairly short and straightforward compared to the batch ones, so a handful of people is more than enough in that department. |
+1 On Sun, Dec 21, 2014 at 5:45 PM, Márton Balassi notifications@github.com
|
I suggest to merge this asap after the package structure and code formatting is sorted out. And we can add features as we go. As Marton said, I also refactored some parts of the java api and we would like to build on top of that. @StephanEwen or @aljoscha can you guys please help me with this? |
Why does Streaming use it's own classes for field selectors? I.e. FieldsKeySelector, CaseClassKeySelector. Streaming also has it's own selector classes in the Java API. |
The Java api FieldsKeySelector works on array, java tuple and pojo types as well, but it doesnt work on scala tuples. The Scala FieldsKeySelector works on Java and Scala tuples, and the CaseClassKeySelector works on case classes. |
It should be possible to put the two scala selectors together in a ScalaFieldsKeySelector but I think we need to have them in some form |
But in the batch API we don't have special KeySelectors, everything is handled uniformly in Keys.java. The field keys and expression keys support Java Tuples, Scala Tuples, Pojos and Case Classes. |
Also, as @mbalassi mentioned, we should really think about the directory structure now. Having Java Streaming in addons, Scala Streaming in flink-scala and the examples also scattered all over the place does not make things very manageable. |
I like the work, overall. It's just some nitpicking here and there. 😄 |
What I don't get is how to use the Keys to get the actual key (like I do in the selectors), maybe I am missing something trivial here. If I knew I would have used them trust me :) |
Yes, @rmetzger added some magic when he unified Pojo and Tuple keys. TypeComparator has a method extractKeys() that does exactly that. It extracts the key fields of a tuple or object and stores them in an array. I think all the infrastructure is there. You can just use Keys.java and create TypeComparators, those you can use for everything else. Maybe @rmetzger can have a look and make some suggestions for how this could best be implemented in the Streaming API. |
You are right I also use the comparators for pojo types. I didnt use comparators for everythibg else because it was trivial to extract keys from tuples and arrays. But i guess its better to have the Keys approach I will implement it. |
Plus, you get all the support for nesting tuples, pojos and case classes that is already there. 😸 |
+1 for having a clear code structure. IMO, we should either move the whole flink-streaming module to the root folder or keep everything under flink-addons. |
I would suggest to use the Keys class also for Tuples. If you just use the "trivial" extraction (= directly using the user input), you might cause unexpected behavior. |
Okay I will fix this when I have some time. Today or tomorrow :) |
@aljoscha and @fhueske on the package structure: We hope that the point in time when streaming can "graduate" from the flink-addons project is near, but to be honest some work is still needed there. If prioritized that that can be achived for the 0.9 release, if not it is safe to say that it can be done for the 1.0. |
Okay as @aljoscha and @rmetzger suggested I reworked our grouping and removed the extra classes and substituted them with the use of Keys from the batch API, works well and looks good :) The only thing I left in is the ArrayKeySelector which we use in the Streaming api to allow grouping and aggregating on array types as well. Let's merge this soon because I am also pushing out a rework of the streaming sources for better parallelism handling and also on the connectors to other systems which builds on this PR. |
Concerning the project structure: For the batch part, there are discussions May be good to keep in mind when you decide on a project structure. We could, in the next version, go for something like
|
…de simple operators
…nality + DataStream sinks + docs
Conflicts: flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
c8eb777
to
d629cc3
Compare
…cy types [scala] [streaming] updated example with scala delta helper usage
…dowing,batching) for the scala api
…rsion for ConnectedDataStreams [scala] [streaming] Changed return types to implicits
… by GenericSourceFunction interface
73c7975
to
ed1feeb
Compare
This PR contains the commits for the Scala api for Flink Streaming. Most functionality is already implemented and I think it is ready to merge after maybe some slight refactoring.
I would appreciate if someone with more scala experience could look it through, that code works but it might not be very pretty everywhere.
Also If anyone can please run a scala formatter on the code since it seems impossible to get line wrapping settings in eclipse so I had to break the lines manually to make travis pass.
What's missing:
Feel free to play around and test the features, I am sure that there will be bugs to fix :)