Skip to content

Conversation

ChengXiangLi
Copy link

This PR includes:

  1. 4 random sampler implementation for different sample strategies.
  2. sample operator for DataSet Java API.
  3. random sampler unit test.
  4. sample operator Java API integration test.

@ChengXiangLi
Copy link
Author

Previously, i plan to leave the sample scala API to an separate PR as i not very familiar with scala, but the failed test shows that Flink has a test to make sure scala and java has the same API, i would try to add scala API and integration test later.

@ChengXiangLi
Copy link
Author

scala API and it's integration test has been merged into latest commit. @tillrohrmann do you have time to review this PR?

@tillrohrmann
Copy link
Contributor

Thanks for your contribution @ChengXiangLi. I'll try to review your PR tomorrow morning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, if I'm not mistaken, that hasNext has to be idempotent. Thus it should return true if current != null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put the last return statement in an else branch?

@tillrohrmann
Copy link
Contributor

Thanks for your contribution @ChengXiangLi. The code is really well tested and well structured. Great work :-)

I had only some minor comments. There is however one thing I'm not so sure about. With the current implementation, all parallel tasks of the sampling operator will get the same random generator/seed value. Thus, every node will generate the same sequence of random numbers. I think this can have a negative influence on the sampling. What we could do is to use RichMapPartitionFunction instead of the MapPartitionFunction. With the rich function, we either have access to the subtask index, given by getRuntimeContext().getIndexOfThisSubtask(), which we could use to modify the initial seed or we generate the random number generator in the open method (this method is executed on the TaskManager). Assuming that the clocks are not completely synchronized and that the individual tasks will be instantiated not at the same time, this could give us less correlated random number sequences. What do you think?

@ChengXiangLi
Copy link
Author

Thanks for the review, Till. For your last suggestion, I'm aware of this issue before, but i didn't know the exist of RichMapPartitionFunction, thanks for reminding it, i would update the code soon.

@ChengXiangLi ChengXiangLi force-pushed the FLINK-1901 branch 2 times, most recently from 1afae81 to 23b93d7 Compare August 5, 2015 09:47
@ChengXiangLi
Copy link
Author

Hi, @tillrohrmann , current implementation of sample with fixed size would generate fixed size sample for each partition randomly instead of the whole dataset, user may expect the later one actually most of the time. I'm research on how to sample fixed size elements randomly from distributed data stream, i think we can pause this PR review until i merge the previous fix.

@tillrohrmann
Copy link
Contributor

The current state with the RichMapPartitionFunctions looks good to me 👍

You're right that user usually want to fix the size for the whole sample. An easy solution could be to assign each item an index, see DataSetUtils.zipWithIndex. Then we can compute the maximum index (which is effectively counting the data set elements). This gives us the range from which have to sample. By generating a parallel sequence of the size of our sample size with env.generateSequence(maxIndex), we could then sample from [0, maxIndex]. At last we would have to join this data set with the original data set which has the indices assigned. There are probably more efficient algorithms out there than this one.

Just ping me when you've found a solution for the problem. Looking forward reviewing it :-)

@sachingoel0101
Copy link
Contributor

I have worked on this problem before. The idea is to divide the data into blocks and find the probability of selection of an element from a block.
Thus, suppose there are blocks B_1, B_2, ..., B_N with probabilities P_1, P_2, ..., P_N, then you sample k points by first sampling from the distribution {P_1, P_2, ..., P_N} and find the number of elements you require from each block. After that you select the required number of points from each block and take a union.
It is pretty easy to implement in a shared memory system but with a distributed system, it is hard. I tried the following approach some time before, although didn't quite finish working on it:
blockedData = Data -> (block_id, data)

blockNumbers = (block_id, data) -> (block_id, count)

(1...k) -> (list of block ids we'll be sampling from)
After this, I tried broadcasting the list and selecting the required number of elements from each block, which can be done quite easily. But what if k is very large?

@ChengXiangLi
Copy link
Author

Thanks for the input, @tillrohrmann and @sachingoel0101 . I would like to implement the fixed size sampling with only one pass through source dataset, since while user try to sample a dataset, the dataset should be quite large in most cases, pass through the dataset multi times would add much more effort. In my solution, the basic idea of fixed size sample in distributed stream is that: generate a random number for each input elements as its weight, select top K elements with max weight, as the weights are generated randomly, so the selected top K elements are selected randomly. You can see more detail information in the code and javadoc.

@thvasilo
Copy link

Hello @ChengXiangLi, perhaps looking at this paper may help with deciding which sampling algorithm to use for the exact sample size algorithm.

It provides an implementation specifically designed for a MapReduce environment.

@ChengXiangLi
Copy link
Author

Thanks, @thvasilo , that paper introduced an random sample algorithm which is an extend algorithm of the one i described before, it has two threshold the filter the element before sort, if element weight is bigger than "up threshold", it would be included in final top K elements with very high possibility, if element weight is smaller than "down threshold", it would not be included in final top K elements with very high possibility. With accepted possibility, we can filter the element with weigh larger than "up threshold" or smaller than "down threshold", only sort the elements with weight between the thresholds.

This is a very good algorithm, i would add it on my notebook for further improvement, but i don't want to implement it right way. This PR is large enough to me, so i would like to leave all the algorithms optimization in future, and just keep the basic implementations of sample algorithms here, make sure they are simple, easy to understand, work correctly, and they can be used as the performance base line for the further improvement.

@thvasilo
Copy link

Agreed, we can take a look at the optimized algorithm later.

@ChengXiangLi
Copy link
Author

Hi, @tillrohrmann , it's ready for review now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having some comments for some of the verify[..] functions explaining what we verify at each would help with code understanding.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @thvasilo , I've add more comments in the latest commit.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc is from the fraction function.

@tillrohrmann
Copy link
Contributor

Sorry for the late review @ChengXiangLi. I finished it now and the PR looks really good.

There is only one thing I would like to change before merging it. This is to move the sample and the sampleWithSize methods from the DataSet to the DataSetUtils class. This will effectively make the sample methods not part of the core API. The reason for this is that the sampleWithSize method breaks with one of Flink's guarantees, which is the robustness of the core API functions against OutOfMemoryExceptions. Let me elaborate on it for better understanding.

All of Flink internal operations work on serialized data which is stored in Flink's managed memory. The managed memory allows Flink to detect when to spill elements from memory to disk to avoid out of memory exceptions and, thus, to make the system robust. The managed memory is a pre-allocated area of memory which is administered by the MemoryManager. The MemoryManager allows you to allocate and deallocate MemorySegments in a c-style fashion. However, once a data item enters a UDF, the item has to be deserialized putting it on the remaining heap. This is not bad if your UDF does not accumulate these elements. However, the sampleWithSize method materializes up to numSamples elements on the heap. Depending on the number of samples and the data item size, this might be enough to eat up all remaining heap memory space and to crash the JVM.

I think that your current implementation will work for most use cases but in order to make it part of the core API, we also have to deal with the case where our sample cannot be materialized on the remaining heap of a running Flink program. In order to achieve this, I think it would be necessary to implement a native topK operator. With native I mean an operator which works on Flink's managed memory and, thus, can also deal with spilling records to disk. Having such a topK operator, we could reimplement the reservoir sampling algorithm the following way: For sampling without replacement we first assign weights in a map operation to each element. Then we call topK with respect to the weights and obtain the sample. For the sampling with replacement we could simply use a flat map operation to assign numSamples times a weight to each element. Then we again call topK with respect to the weight.

For the topK implementation, we would need something like a PriorityQueue which operates on managed memory (similar to the CompactingHashTable which is a hash table working on managed memory). Thus, we would have a priority queue which stores the priority values of each record and a pointer to the record which is kept in managed memory. Whenever an element is removed from the priority queue, we can also free the occupied managed memory. In case that we run out of managed memory, we have to spill some of the records to disk which are still in the race for the top k. As a first step, we can skip the spilling and just throw a proper exception (other than OutOfMemoryException) when we run out of memory. Afterwards, we can incrementally add the spilling functionality.

I know that you've already spent a lot of effort into writing the sampling operator and that this result might be a little bit demotivating. However, if we want to make it right and robust, then I think this is the way to go. Additionally we would add a proper topK operator to Flink's API which is missing big time :-) If you want to, then you could also take the lead here. The further discussion should then happen in a separate issue. I'm more than willing to assist you in implementing this operator. What do you think?

@ChengXiangLi
Copy link
Author

Thanks for the detail explanation, @tillrohrmann . As the owner of this issue, make it work correctly and efficiently is my top desire, so i would never say no to a reasonable and proper suggestion(although i did expect it happens earlier :-)). OutOfMemoryError is one of my concern during the implementation as well, unlimited usage of Java heap is unacceptable, so i limit the reservoir size(PriorityQueue size in implementation) same as numSamples parameter, which give the user full control of the memory used in java heap for sample operator. It may lead to OutOfMemoryError if the numSamples is too large, but other operators may lead to OutOfMemoryError as well if user cache too much objects inside UDF. Anyway, it's just my first thought, a fully managed memory usage is a better solution, and i would like to implement that.
As mentioned in the previous comment, is this the next step you prefer?

  1. move the sample and the sampleWithSize methods from the DataSet to the DataSetUtils class, and merge this PR.
  2. implement topK operator in a separate PR.
  3. update sampleWithSize implementation, and move back to DataSet.

@tillrohrmann
Copy link
Contributor

@ChengXiangLi, you're right, I should have noticed earlier and raised a flag. That's my bad, sorry. But your work is not in vain. I think it's some excellent piece of work and the sample method could also become part of the core API right away.

For the sake of completeness, let's do it once the sampleWithSize method works also robustly. I think your proposition for the next steps is a good way to continue with it. Once you've moved the sample and sampleWithSize methods to the DataSetUtils class, we close and merge this PR. In the meantime, I'll create a JIRA for the topK operator, where we can discuss the matter further.

@ChengXiangLi
Copy link
Author

OK, @tillrohrmann , great to hear that. Besides, I've created FLINK-2549 for topK operator.

@tillrohrmann
Copy link
Contributor

Oh cool, then you were faster than me :-)

@ChengXiangLi
Copy link
Author

Move sample/sampleWithSize operator to DataSetUtil and update unit test.

@tillrohrmann
Copy link
Contributor

Looks great. Thanks a lot for your contribution @ChengXiangLi. Will merge it now.

@asfgit asfgit closed this in c9cfb17 Aug 21, 2015
@sachingoel0101
Copy link
Contributor

Hey @ChengXiangLi , I just observed a failure on a test case: https://travis-ci.org/sachingoel0101/flink/jobs/76649177
Here is the relevant statement:
RandomSamplerTest.testPoissonSamplerFraction:116->verifySamplerFraction:249 expected fraction: 0.010000, result fraction: 0.009000

@thvasilo
Copy link

It's great to have this in, I'll try to update the cross-validation and SGD to use this.

@ChengXiangLi
Copy link
Author

Hi, @sachingoel0101 , while sample with fraction, it's not easy to verify whether the DataSet is sampled with input fraction. In the test, i take 5 times sample, use the average size to computer the result fraction, and then compare the result fraction with input fraction, verify their difference is not more than 10% percent. The following case may happens as well, Sampler sample the DataSet with input fraction, but the sampled result size is too small or too large that beyond our verification condition, it happens, just with very little possibility, say less than 0.001 in this test. it should be ok if this failure happens very occasionally, please let me know if you found it's not.

@chiwanpark
Copy link
Member

@ChengXiangLi I know that it is hard to verify random sampler implementation. But we need to fix this test failing because of difficulty of other pull requests verification. Some tests of other pull requests are failed by K-S test and sampling test with fraction. There is a JIRA issue covered this.

I'm testing with increased count of samples and source size. If I get a notable result, I'll post the result.

@ChengXiangLi
Copy link
Author

Thanks, @chiwanpark , waiting for your result.
Besides, we can reduce the fail-positive case to an acceptable possibility by expanding the verification boundary, while at the cost of making verification weak.

@sachingoel0101
Copy link
Contributor

Hey @ChengXiangLi, I have another concern, regarding the seed for sampling. It doesn't seem to serve its purpose. I tried sampling with fraction three times with the same seed, however, every time I get different results.
I've been stuck on this problem myself for quite some time. The fix is, instead of seed + index, you should just use seed. But then, the sample is not truly random. Far as I could figure out, splits of data don't arrive at the exactly same index subtask every time.

@tillrohrmann
Copy link
Contributor

@thvasilo, the PR did not yet include the proper sampling behaviour within iterations. See FLINK-2396.

@tillrohrmann
Copy link
Contributor

@sachingoel0101, you're right. The problem is that Flink does not give you a guarantee in which order the elements will arrive. But this problem won't be fixed by setting the seed for all sampling operators to the same value. There always might be an operator, e.g. rebalance, which will completely randomize your element order.

@sachingoel0101
Copy link
Contributor

Yes. I was only wondering if we should at least ensure this when it is done right at the source though.

@tillrohrmann
Copy link
Contributor

IMHO, this makes understanding the semantics of the sampling operator only more complicated because it behaves differently depending on the job graph structure. I would rather document this limitation more prominently.

nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
[FLINK-1901] [core] enable sample with fixed size on the whole dataset.

[FLINK-1901] [core] add more comments for RandomSamplerTest.

[FLINK-1901] [core] refactor PoissonSampler output Iterator.

[FLINK-1901] [core] move sample/sampleWithSize operator to DataSetUtils.

Adds notes for commons-math3 to LICENSE and NOTICE file

This closes apache#949.
pnowojski pushed a commit that referenced this pull request Jul 3, 2024
#949)

* [FLINKCC-1345] Convert alter model reset to AlterModelOptionsOperation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants