Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2590] fixing DataSetUtils.zipWithUniqueId() #1075

Closed
wants to merge 3 commits into from

Conversation

s1ck
Copy link
Contributor

@s1ck s1ck commented Aug 29, 2015

  • modified algorithm as explained in the issue
  • updated method documentation

* modified algorithm as explained in the issue
* updated method documentation
@rmetzger
Copy link
Contributor

Thanks a lot for the contribution.
Can you add a test case for the method to make sure the issue is not re-introduced again when somebody else is changing the code?

@HuangWHWHW
Copy link
Contributor

@rmetzger +1. I think add a test is helpful.
Otherwise can you give us a infomation that prove the 'id = (counter << shifter) + taskId; ' will never generate the same id in different task?
And a minor thing in you issue description:
Is log2(8)=3 not 4?

@@ -121,6 +122,7 @@ public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) thr

return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {

long maxLength = log2(Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this static final

@StephanEwen
Copy link
Contributor

+1 for a test, otherwise this looks good!

@s1ck
Copy link
Contributor Author

s1ck commented Aug 31, 2015

There is already a test case for zipWithUniqueId() in https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java#L66
However, this test is under the assumption that there is only one task running, which is why it did not fail in the first place.
If there are multiple tasks, the resulting unique id is not deterministic for a single dataset element. I would implement a test, that creates a dataset, applies the zipWithUniqueId method, calls distinct(0) on the created ids and checks the number of resulting elements (must be equal to the input dataset). Would this be sufficient?
Furthermore, the current test cases for DataSetUtils assume a resulting dataset as string and check this after each test run. My proposed test would not fit in that scheme. Should I create a new test case class for this method?

@StephanEwen I wanted to do this, but static doesn't work with anonymous classes. However, I can declare the UDF as a private inner class (didn't want to change much code).
@HuangWHWHW the log2 method already existed and in the issue, I proposed to rename it. Maybe getBitSize(long value)? As for the "proof": if each task id is smaller than the total number of parallel tasks t, its bit representation is also smaller than the bit representation of t. Thus, when we shift the counter by the number of bits of t, there cannot be a collision for different task ids

@StephanEwen
Copy link
Contributor

@s1ck Good idea. You can also call collect(), add the IDs to a set and make sure the set has the right cardinality. In general, avoiding temp files and Strings for comparison is a good idea.

@tillrohrmann
Copy link
Contributor

@s1ck, the testZipWithUniqueId test is bogus. You can remove this test case an replace it with your described test. It would also be great if you could set the parallelism of testZipWithIndex to something greater than 1. Here it would also make sense to use collect instead of writing to disk.

+1 for renaming log2 into getBitSize(long value). When you rename the method, could you also change the line shifter = getBitSize(getRuntimeContext().getNumberOfParallelSubtasks()) into shifter = getBitSize(getRuntimeContext().getNumberOfParallelSubtasks() - 1). That way, we would also get the right unique ids in case of parallelism = 1.

@s1ck
Copy link
Contributor Author

s1ck commented Aug 31, 2015

@tillrohrmann While writing the new tests for both methods, I encountered that zipWithIndex is broken, too. It sometimes throws ConcurrentModificationException. This is because each task sorts a broadcasted list in the open method. This could not fail before due to parallelism = 1.
I would fix this by creating a local copy of that list (which should be small in that specific case). Shall I fix this in the same issue or do you want me to create a new issue for that?

@StephanEwen
Copy link
Contributor

There is an issue that tracks the ConcurrentModificationExceptionproblem. As per discussion in that issue, can you use a BroadcastVariableInitializer? Safes redundant sorts.

@s1ck
Copy link
Contributor Author

s1ck commented Aug 31, 2015

@StephanEwen thx for the hint. works fine! Will cleanup and commit now.

* added tests for parallel execution of both zip functions
* renamed log2 -> getBitSize
* updated documentation
@s1ck
Copy link
Contributor Author

s1ck commented Aug 31, 2015

@tillrohrmann I did not include the shifter = getBitSize(getRuntimeContext().getNumberOfParallelSubtasks() - 1) as your hint only applies for power of 2 values. E.g., getBitSize(7) returns 3 and we need 3 bits to cover the range from 0 to 6.

@HuangWHWHW
Copy link
Contributor

Ah, thank you for the proof.
And didn`t see the log2 in detail before, sorry.

@tillrohrmann
Copy link
Contributor

@s1ck, it's important to note that 1 will be subtracted from getRuntimeContext().getNumberOfParallelSubtasks() and not getBitSize(). The reason is that we have 0 based indices for the subtasks. Thus, we only have to calculate the maximum number of bits to represent the highest index we can encounter. And this is getRuntimeContext().getNumberOfParallelSubtasks() - 1. Thus if getNumberOfParallelSubtasks == 7, then we would calculate getBitSize(6) == 3.

* maximum bit size is changed to getNumberOfParallelSubTasks() - 1
@s1ck
Copy link
Contributor Author

s1ck commented Sep 1, 2015

@tillrohrmann of course you are right, I thought wrong about it. it's committed

@tillrohrmann
Copy link
Contributor

@s1ck, looks really good. Thanks for your contribution. Will merge it now.

@s1ck
Copy link
Contributor Author

s1ck commented Sep 1, 2015

Sorry, I did not see that there are also identical test cases in Scala which now fail due to the -1 change. As those scala methods wrap the Java methods, is it necessary to run the same tests on them again?

@tillrohrmann
Copy link
Contributor

No problem @s1ck. It might be a bit redundant but it tests that the forwarding is done correctly. Therefore, I fixed the test case.

@s1ck
Copy link
Contributor Author

s1ck commented Sep 1, 2015

Ok, thank you.

asfgit pushed a commit that referenced this pull request Sep 2, 2015
…ipWithIndex()

* modified algorithm as explained in the issue
* updated method documentation

[FLINK-2590] reducing required bit shift size

* maximum bit size is changed to getNumberOfParallelSubTasks() - 1

This closes #1075.
@asfgit asfgit closed this in ab14f90 Sep 2, 2015
nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
…ipWithIndex()

* modified algorithm as explained in the issue
* updated method documentation

[FLINK-2590] reducing required bit shift size

* maximum bit size is changed to getNumberOfParallelSubTasks() - 1

This closes apache#1075.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants