Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2152] Added zipWithIndex #832

Closed
wants to merge 4 commits into from

Conversation

andralungu
Copy link
Contributor

This PR adds the zipWithIndex utility method to Flink's DataSetUtils as described in the mailing list discussion: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/The-correct-location-for-zipWithIndex-and-zipWithUniqueId-td6310.html.

The method could, in the future, be moved to DataSet.

@fhueske , @tillrohrmann , once we reach a conclusion for this one, I will also update #801 (I wouldn't like to fix unnecessary merge conflicts).

Once zipWIthUniqueIds is added, I could also explain the difference in the docs.

@rmetzger
Copy link
Contributor

+1 to merge.

public class DataSetUtils<T> {

/**
* Method that goes over all the elements in each partition in order to retireve
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrieve

@tillrohrmann
Copy link
Contributor

Good work @andralungu.

The data set utils are not working with the Scala API. Would be nice to support for Scala a syntax like

val ds: DataSet[String] = ...
val zipped: DataSet[(Long, String)] = ds.zipWithIndex

env.setParallelism(1);
DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F");

DataSetUtils<String> dataSetUtils = new DataSetUtils<String>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to create a DataSetUtils object here? So far it does not store any state.

@tillrohrmann
Copy link
Contributor

Sorry for my late reply. Has everything worked out by now?

On Tue, Jun 23, 2015 at 2:43 PM, Andra Lungu notifications@github.com
wrote:

Hey @tillrohrmann https://github.com/tillrohrmann ,

Sorry for the incredibly late reply. The last weeks have been very hectic.
Nevertheless, I'd like to properly finish and polish this issue very soon.

For that: I have addressed the Java comments, but I still have to provide
support for Scala. I love this task because it really takes me out of my
comfort zone: Gelly and Java. It's no secret that Scala is not my strongest
point. Therefore, I'd like to use this thread to ask some rather trivial
questions:

Before defining implicit methods and using pimp-my-lib, I need to wrap the
Java function. Which should be easy right? Since there is a wrap method.
This being said, in org.apache.flink.api.scala, I created a DataSetUtils
class. and wanted to call wrap(ju.countElements...). Apparently it does not
let me. Can someone help me out with that?

Thanks!


Reply to this email directly or view it on GitHub
#832 (comment).

@andralungu
Copy link
Contributor Author

Actually, I get a weird compile error: it says missing Type parameter for the map in DataSet.scala...
I think this is because the map is overloaded... and I haven't found the workaround just yet...
(The error is reproducible by calling testZipWithIndex)

@thvasilo
Copy link

Is it a type inference problem? Have you tried importing org.apache.flink.api.scala._ to see if that fixes it?

@andralungu
Copy link
Contributor Author

Uhmmm... flink.api.scala is imported. That's not the issue.

@thvasilo
Copy link

Seems like the problem was that the wrap was returning a DataSet[(java.lang.Long, T)]

If you change the map to .map { t => (t.f0.toLong, t.f1) } it should work.

@andralungu
Copy link
Contributor Author

Hey Theo,

Thanks a lot for finding my bug there ^^
PR updated to address the Java issues and to contain a pimped Scala version of zipWithIndex :)


@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary semicolon :) You can simplify this method as def getFolder = tempFolder.

@chiwanpark
Copy link
Member

Hi, I added some minor comments about coding style in Scala test case. The rest things is okay.
I think we can merge this after fixing the style.

andralungu and others added 4 commits June 28, 2015 11:08
[FLINK-2152] Added zipWithIndex utility method

[FLINK-2152] Fixed minor documentation bug
[FLINK-2152] Scala zipWithIndex; second attempt

[FLINK-2152] Scala zipWithIndex - finalised

[FLINK-2152] Fixed checkstyle violation
@andralungu
Copy link
Contributor Author

Perfectly valid comments, thanks! PR Updated.

@chiwanpark
Copy link
Member

Looks good :) merging

@chiwanpark
Copy link
Member

Oops! I forgot add "This closes #832" into commit message. I mistook because this is my first commit to upload Apache repository. Sorry. How can I fix it?

@rmetzger
Copy link
Contributor

No problem ;)
In this case I would kindly ask @andralungu to close the issue manually.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants