Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala DataStream&DataStreamUtils accessors fix #1574

Closed
wants to merge 2 commits into from

Conversation

mbalassi
Copy link
Contributor

@mbalassi mbalassi commented Feb 2, 2016

The PR contains two orthogonal approaches of fixing the access to DataStreamUtils.collect for a scala environment. One or both of the approaches can be merged.

The first simply reexposes access to the underlying java DataStream, while the second effectively adds a scala API for DataStreamUtils.

I am in favor of adding both. Usage of the latter looks as follows:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.contrib.streaming.scala.utils._

object ScalaStreamCollect {

  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val it = env.generateSequence(0L, 10L)
      .collect()

    while (it.hasNext){
      print(it.next())
    }
  }
}

At least the first commit should be merged asap as the fix was requested on the user mailing list. [1]

[1] https://mail-archives.apache.org/mod_mbox/flink-user/201602.mbox/%3CCAFqo6nQ24dtExjPOX%3DrSuWSww8skCH23Q8i7CJ3Ef5LYhDj2pA%40mail.gmail.com%3E

@cmonty
Copy link

cmonty commented Feb 2, 2016

The Scala interface looks good to me and gets rid of the need to call getJavaStream directly.

Thanks!

@brianchhun
Copy link

Looks good. It looks like collect is returning a Java iterator instead of a Scala one. Is there a reason for this given that the code is for Scala users?

@mbalassi
Copy link
Contributor Author

mbalassi commented Feb 2, 2016

Thanks for the comments. @gumchum: Internally it is stored as a Java Iterator and that is the reason why left it so, but it definitely make sense to convert it to Scala before passing it to the users, so their code does not get "polluted" with Java stuff. I am modifying it.

@StephanEwen
Copy link
Contributor

Looks good to me, but @gumchum's comment is valid.

One more thing, I would call the method on the Scala DataStream def javaStream : JavaStream[T] = stream. I think that is more in line with the Scala style: no parenthesis, because it is a simple accessor, and no "get" prefix, because for methods that appear like variables (no parenthesis), that seems the way they encourage it.

@mbalassi
Copy link
Contributor Author

mbalassi commented Feb 3, 2016

Thanks for the comments, updated the PR accordingly.

@StephanEwen
Copy link
Contributor

Thanks, merging this...

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Feb 4, 2016
@asfgit asfgit closed this in 4852302 Feb 4, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants