Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3128] [flink-ml] Add Isotonic Regression To ML Library #1565

Closed
wants to merge 22 commits into from

Conversation

f-sander
Copy link

Adds isotonic regression to the ml library.
It's a port of the implementation in Apache Spark.

@f-sander
Copy link
Author

f-sander commented Feb 2, 2016

Are the build failures related to us? I don't really understand how...

The first failure happens in oraclejd8 with hadoop 2.7.1:

Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 351.368 sec <<< FAILURE! - in org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase
testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase)  Time elapsed: 318.792 sec  <<< FAILURE!
java.lang.AssertionError: The program did not finish in time
    at org.junit.Assert.fail(Assert.java:88)
    at org.junit.Assert.assertTrue(Assert.java:41)
    at org.junit.Assert.assertFalse(Assert.java:64)
    at org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest.testTaskManagerProcessFailure(AbstractTaskManagerProcessFailureRecoveryTest.java:212)

The second on in openjdk 7 with hadoop 1 appears to experience a deadlock (?):

==============================================================================
Maven produced no output for 300 seconds.
==============================================================================
==============================================================================
The following Java processes are running (JPS)
==============================================================================
2286 Launcher
77113 Jps
76276 surefirebooter4006285424712115006.jar
==============================================================================
Printing stack trace of Java process 2286
==============================================================================

After that only lots of lots of process stack traces.


def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x:
Double):
Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting off

@tillrohrmann
Copy link
Contributor

Really good work @f-sander. Good test coverage and good code documentation.

It would be good to add some online documentation for this algorithm (see flink/docs/libraries/ml).

I had a comment concerning scalability. I fear that with the current implementation, the algorithm is effectively bound by the capacities of a single machine. Especially sorting the data on the heap is destined to quickly crash the system. I'm not an expert on isotonic regression but it would be nice to get rid of the operator which collects all the input data in a single task to sort them.

I also haven't gone through the math details yet. Will do, once the scalability issue is fixed.

@f-sander
Copy link
Author

f-sander commented Feb 3, 2016

Thanks for your Feedback!

Yes, scalability is the main issue for us too. We are not aware of any other parallel implementation. We also talked to the original author of Spark's IR implementation (which is equivalent too ours) about this with the same result. However, we think we have a theoretical approach to solving this, but it depends on the self join without duplicates. Remember our discussion on the user-mailing list with subject join with no element appearing in multiple join-pairs? I need that for this.

I will link a sketch to our algorithm design here in a few days, If we haven't found a way to solve this. I guess IR won't make it into Flink without a fully parallelized way?

@tillrohrmann
Copy link
Contributor

Yeah I thought so that your question on the mailing list was related to
this PR. It would be great to have a fully parallelized version of the
algorithm, because if it only runs on a single machine then you could
directly use sklearn or another ML-library to solve the problem.

You can also share the sketch of your algorithm right now, if you want to.
That way, others could directly chime in and maybe someone knows how to do
the alternating pair join operation.

Cheers,
Till

On Wed, Feb 3, 2016 at 12:32 PM, Fridtjof notifications@github.com wrote:

Thanks for your Feedback!

Yes, scalability is the main issue for us too. We are not aware of any
other parallel implementation
he main issue for us too. We also talked to the original author of Spark's
IR implementation (which is equivalent too ours) about this with the same
result. However, we think we have a theoretical approach to solving this,
but it depends on the self join without duplicates. Remember our discussion
on the user-mailing list with subject join with no element appearing in
multiple join-pairs? I need that for this.

I will link a sketch to our algorithm design here in a few days, If we
haven't found a way to solve this. I guess IR won't make it into Flink
without a fully parallelized way?


Reply to this email directly or view it on GitHub
#1565 (comment).

@f-sander
Copy link
Author

f-sander commented Feb 3, 2016

There is one advantage of this over using a single-node ML-Lib: This implementation contains the compression procedure used in Spark that combines data points with equal label. The hope of this parallelization strategy is, that in each partition enough points are compressed so that the combined dataset in the last step fits into one node.

I will try to outline our algorithm tonight, but I'm very busy right now and can't promise. But I'll try.

@f-sander
Copy link
Author

f-sander commented Feb 6, 2016

Sorry for the long delay. I still don't really have time for this, but I wan't to describe it anyways. That's why the writing and formatting is pretty sloppy in this. Sorry for that, I hope you bare with me:

We only consider isotonic regression on weighted, two dimensional data. Thus, datapoints are tuples of three doubles: (y, x, w).

PAV assumes the data to be sorted by x. It starts on the left and goes to the right. Whenever two Point's (or more) are found that are descending in order of x, it "pools" them, which means all y values (multiplied by their weight) in that pool are averaged by the sum of all weights. Any point in the pool then looks like this: (y_weighted_pool_avg, x, w). Because the y values where changed, we have to look back in x order if the new pool avg is lower than the value before the pool. If that's the case, we have to pool again until now higher y value is present before the pool.

Any sequence of data points from i to j sharing the same y value is compressed in the following way: (y, x_i, sum_of_weights), (y, x_j, 0). The hope of Sparks implementation is that enough data gets compressed that way, that all remaining data fits into one node in the last step. However, there are of course cases, where this simply doesn't work.

Our approach (not implemented in this PR) works like this:

compare two consecutive data points i and j:
if y_i < y_j, leave them untouched
if y_i > y_j, replace both with ((y_i * w_i + y_j * w_j) / (w_i + w_j), x_i, w_i + w_j). Also remember x_j
if y_i = y_j, replace both with (y_i, x_i, w_i + w_j). Also remember x_j
Repeat that until no pairs are combined to one

After the iteration terminated: Foreach point that has a "remembered" x_j, add another (y, x_j, 0) directly behind it.

We are able to compare each point with its successor, by attaching each point with an index (zipWithIndex) and a "next-pointer" (index+1) and then doing a:
set.join(set).where(next).equalTo(index)
However, because of the weight summation, we must avoid that a point appears in multiple join pairs. Otherwise a point's weight might be summed into multiple combined points.

We worked around that by doing two joins in each iteration step:

step 1: left join side has only points with even indices, right side only with odd
step 2: left join side has only points with odd indices, right side only with even
if nothing happened during these two runs, we are done

Unfortunately, because of the merging the indices are not incrementing by 1 anymore. That's why we wanted to apply another zipWithIndex after the two joins, but the join repartitioned the data, so we loose our range-partitioning. But, this is required to get indices representing the total order of the data.

I hope you can understand the problem. Again sorry for sloppy writing.

@zentol
Copy link
Contributor

zentol commented Feb 28, 2019

Closing since flink-ml is effectively frozen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants