Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMOA-16: Add an adapter for Apache Flink-Streaming #11

Closed
wants to merge 26 commits into from

Conversation

senorcarbone
Copy link
Contributor

This PR includes support for Apache Flink as an adapter. We haven't updated the documentation yet but we could perhaps do it in a follow up PR, since it seems you are already working on a docs redesign.

@senorcarbone senorcarbone changed the title Flink integration SAMOA-16: Add an adapter for Apache Flink-Streaming Mar 2, 2015
@gdfm
Copy link
Contributor

gdfm commented Mar 3, 2015

Thanks. There seem to be some errors in the tests (Kryo serialization for what I can see).
Probably some Kryo version mismatch. Could you have a look at them?

@senorcarbone
Copy link
Contributor Author

Sure, we are looking into it! Kryo version incompatibility seems to be trickier than we thought.

@gdfm
Copy link
Contributor

gdfm commented Mar 3, 2015

If we need to update Kryo on our side, we'll be happy to do so.

@senorcarbone
Copy link
Contributor Author

It seems that the earliest Kryo version that Flink is compatible with is v2.23.0. However, Storm tests seem to fail to initialise the serialiser for versions greater than 2.17. Judging from the changelog It looks like the current Apache Storm version (v.0.10.0) supports newer Kryo versions so maybe a first approach would be to try upgrading Storm first. What do you think?

@gdfm
Copy link
Contributor

gdfm commented Mar 3, 2015

Makes sense, as we want to update the Storm dependency anyway.
However, it will for sure happen to have conflicting dependencies on different platforms, and we don't want to mix them. The tests should ensure this isolation, i.e., never mix two platforms, and the different profiles aim at that.

We have a single kryo.version variable to ensure that we don't use different Kryo versions around the codebase, but in this case it seems to me that it is necessary.
If Flink depends on 2.23, but we have it set on 2.17, then we should be able to override it in the samoa-flink module. Do you agree?

@senorcarbone
Copy link
Contributor Author

Good point. I just overrided the kryo version in the flink build and tested it locally. Also, all module tests seem to be passing on travis.

@arinto
Copy link

arinto commented Mar 10, 2015

I can't build this PR successfully on my local. Here's the error message:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project samoa-flink: Compilation failure: Compilation failure:
[ERROR] /Users/arinto/git/incubator-samoa/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java:[54,79] <anonymous com.yahoo.labs.flink.topology.impl.FlinkEntranceProcessingItem$1> is not abstract and does not override abstract method cancel() in org.apache.flink.streaming.api.function.source.SourceFunction
[ERROR] /Users/arinto/git/incubator-samoa/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java:[64,25] method does not override or implement a method from a supertype

Apparently there are API changes apache/flink@8436e9c due to FLINK-1625. Probably we can use stable version of Flink for this PR. What do you guys think?

@senorcarbone
Copy link
Contributor Author

hey Arinto! Thanks for looking into it. I agree, let's stick to the last stable release (0.8.1) for now. We will commit some appropriate changes for it today. cheers

@senorcarbone
Copy link
Contributor Author

We reviewed the changes after 0.8.1 and a lot of things have changed since then, many constructs such as the type extractors we are currently using will not be functioning in 0.8.1. Thus, we decided to just make a final patch for the first RC of 0.9.0 that is coming very soon, most probably end of next week. It looks like it is the best way to go :)

@rmetzger
Copy link

+1
I think we need to release some kind of 0.9-preview release soon.

@gdfm
Copy link
Contributor

gdfm commented Mar 30, 2015

Hi,
if I understand correctly, you are waiting for Flink to release 0.9 before updating the PR, am I correct?

@senorcarbone
Copy link
Contributor Author

yes, we are planning a 0.9.0-milestone release so we can use that as the first supported stable version for the PR

@senorcarbone
Copy link
Contributor Author

Hey @gdfm @arinto @abifet . Thanks for the review so far. Let me know if you have any additional comments to look into. I think there is nothing critical left, we will keep supporting the flink adapter with subsequent updates such as new releases and improvements.

@gdfm
Copy link
Contributor

gdfm commented May 5, 2015

Hi @senorcarbone,
Thanks, I will have another look at it tomorrow.
We also need another review from @abifet or @arinto.
And thanks for offering to keep supporting the adapter :)

@abifet
Copy link
Contributor

abifet commented May 6, 2015

Hi @senorcarbone,

This adapter for Apache Flink seems amazing! Thanks so much! I tested and reviewed, and found the same issues as @gdfm. The accuracy of the VHT was quite good, and it was very slow in my computer. Any thoughts about this? Do we need to tune any parameter in the conf file?

@rmetzger
Copy link

rmetzger commented May 7, 2015

We have a open JIRA in Flink to start a "streaming only" mode, which starts up Flink in an streaming optimized mode.

For now, you can set the following configuration values in the conf/flink-conf.yaml to achieve a similar effect:

  • taskmanager.memory.fraction: This fraction describes the division between managed and unmanaged heap space. Flink batch is using managed memory for its operations, flink streaming uses unmanaged memory only. By default the value is set to 0.7, which leaves not much memory for streaming. I would recommend setting the value to 0.001 ;)
  • taskmanager.heap.mb: Thats an integer specifying the task manager's heap size. By default its set to 512. I suspect your machine has more memory.

@senorcarbone
Copy link
Contributor Author

Hello again @gdfm and @abifet ,
I did a lot of cross-profiling between storm and flink, running the same VerticalHoeffdingTree task under different configurations during the last two days and I think the results are quite interesting.

It looks like the algorithm performance (and accuracy) depends heavily on the ingestion speed of the local statistics processors. The irony is that the greater the speed the slower the whole computation gets by time since more and more attribute events are sent to the local statistics processors with higher rate, the more updates the model aggregator gets back.

The average processing delay (in num of flatten instances processed by the aggregator between sending a process event and receiving the respective local statistics) is ~2k instances for Flink and around 400k instances for Storm. Also in Storm the aggregator continuously broadcasts ~100-200 attribute messages to local processors on average while Flink broadcasts ~2100 attribute messages due to the rate it gets results back I assume. These are collected locally on each component and there was no message duplication.
Since you worked on the algorithm, do you find this behavior reasonable?

@senorcarbone
Copy link
Contributor Author

I also tried adding a 2sec sleep on the local flink processors to delay their results back and the algorithm finished in 12sec with less (57%) accuracy. Perhaps the model aggregator could be enhanced with some flow control logic to compensate between model updating rates and accuracy.

@gdfm
Copy link
Contributor

gdfm commented May 10, 2015

Hi @senorcarbone,
Thanks for digging into the issue.
Interesting findings, though I still need to digest what they mean exactly.
We know that accuracy depends on speed, and the higher the speed the lower the accuracy. What I expect, however, is that the number of messages generated by the algorithm is independent of the speed (at least mostly). If this is not the case, there must be some wrong assumption I am making about the system.

@senorcarbone
Copy link
Contributor Author

I think it has to do with the number of splits. If the model aggr. gets more local statistics during the time it processes the same amount of instances, it will have to split (exponentially?) more times and send more attributes in the same period. That is what I got from the experiments at least.

Maybe there should be a separate issue on VHT but you can keep me and @fobeligi in sync regarding it. @fobeligi is working on an experimental native implementation of VHT on Flink which we can share soon and she had to deal with similar issues. Also, feel free to let me know if you want me to try out more experiments and share results to speed up the process.

Regarding the PR, do you think we should look into something more?

@gdfm
Copy link
Contributor

gdfm commented May 13, 2015

Indeed, I see what you mean. Given that the feedback loop in Flink is faster, the number of attempts to split should increase.
This is expected, but the number of such attempts is upper bounded by the ones tried on the Local engine, where there is no delay between request of the split criterion and response by the local statistics.

We already have some flow control to regulate the rate of ingestion in PrequentialEvaluation. I'll play a bit with it to see what happens.
When you put the 2 seconds delay in the Flink Processors, what happens (I guess) is that the whole data streams through a very rough, sub-optimal version of the tree. So it's very fast, but the precision drops considerably because of the artificial limit on the number of split attempts.

public void setInstanceInformation(InstancesHeader instanceInformation) {
this.instanceInformation = instanceInformation;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these changes for Flink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were test leftovers apparently, we do need them according to @fobeligi

@gdfm
Copy link
Contributor

gdfm commented May 18, 2015

Hi,

I think we should address the issue with VHT in a separate PR.
I'd like to finalize this one.

@senorcarbone there are just a few issues outstanding.
There are a few files in samoa-api/build which should not be part of the patch.
Also, there is a question about the partitioning enum that I'd like to be answered.
Finally, there are some changes in samoa-instances that are not clear to me (I had missed them before).

Once these are fixed I'm +1

@senorcarbone senorcarbone force-pushed the flink-integration branch 5 times, most recently from 18d6d27 to 8c54b73 Compare May 18, 2015 14:25
@senorcarbone
Copy link
Contributor Author

Thanks @gdfm . I hope I addressed everything in the last commit and inline comments. Let me know if you want me to look into something more!

@gdfm
Copy link
Contributor

gdfm commented May 25, 2015

Yes, the patch looks good to me.
+1

@abifet @arinto any comment?
I will merge the patch tomorrow otherwise.

@abifet
Copy link
Contributor

abifet commented May 26, 2015

+1

@gdfm
Copy link
Contributor

gdfm commented May 26, 2015

Merged.
@senorcarbone thanks for the contribution!

@rmetzger
Copy link

Great to see this merged.

Is the Samoa website located in the gh-pages branch?
I would like to update the front page to mention Flink as well ;)

@gdfm
Copy link
Contributor

gdfm commented May 26, 2015

Yes, the sources are in gh-pages and we use jekyll to build the website.
@rmetzger it would be great if you could add docs :)
Instructions here: https://cwiki.apache.org/confluence/display/SAMOA/SAMOA+Home

@rmetzger
Copy link

Thank you.
I've filed a JIRA: https://issues.apache.org/jira/browse/SAMOA-33
Lets see when I have time for this ;)

@senorcarbone
Copy link
Contributor Author

Awesome, thanks @gdfm! Will you merge from local branch? Mind that it is missing some last commits. I will close the PR once everything is merged.

On a slightly related note, we ran some experiments today on the new stable streaming api that will be released soon that contains many good fixes and additions. The VHT experiment mentioned above now takes approximately around 80sec for 1m instances since filters are properly chained, making input ingestion much faster so that it catches up with local processors. You can find the branch with the patch here. Special thanks go to @gyfora for fixing chaining in iterations!

I will create a new PR once we have a stable release.

@gdfm
Copy link
Contributor

gdfm commented May 29, 2015

Weird, I have already merged the PR and pushed back to Apache.
https://git-wip-us.apache.org/repos/asf?p=incubator-samoa.git

Something in the mirroring is not working, the PR should be closed automatically.

Thanks for the info, improving performances would be a great contribution!

@asfgit asfgit closed this in 64ef7a9 Jun 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants