Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1019] Implementation of akka based RPC system #149

Merged
merged 47 commits into from
Dec 18, 2014

Conversation

tillrohrmann
Copy link
Contributor

Replaced the old Nephele RPC service with akka based system. Thus, several components are now implemented as actors. This includes the JobManager, TaskManager, MemoryArchivist, JobClient. The legacy RPC service and the corresponding protocols are removed.

Replaced also the execution service of the ExecutionGraph by akka's futures to unify the system.

Removed the LocalInstanceManager whose task is now handled by the InstanceManager. The responsibility to create local task managers is now delegated to the FlinkMiniCluster.

The EventCollector was removed and the respective event classes. The events are now directly sent to the respective listeners.

Moved the resources of the WebInfoServer and the WebInterfaceServer to the resource folders of the corresponding projects. As a consequence these resources are bundled with the jars and directly served from them by Jetty.

The yarn client was adapted to communicate with the actors. The former ApplicationMaster is combined with the JobManager to simplify the system. The uber-jar is now created with maven's shading plugin.

Since this is a big change I would be happy if another pair of eyes could take a look at it.

@rmetzger
Copy link
Contributor

I vote to merge this rather soon:

  • The pull request touches A LOT of code, so it will quickly become incompatible with the rest of the system.
  • We are going to merge this anyways. The only think we need to ensure here is that the change does not immediately break our system. Minor bugfixes will come anyways in the next few days.

@rmetzger
Copy link
Contributor

What are the plans for merging this change?

@tillrohrmann
Copy link
Contributor Author

There are some minor issues we discovered when going over the code with Stephan. I first have to address them. These include amongst others:

  • Local mode for JobManager in main
  • Better error message when LibraryCacheManager fails (or BlobManager)
  • GlobalExecutionContext limit amount of created threads?
  • TaskManager: Watch jobManager in case of network error or jobManager crash
  • Check exception handling in ExecutionGraph
  • Expose Akka timeouts via config

@tillrohrmann tillrohrmann force-pushed the akka_scala branch 6 times, most recently from 3048efc to 10a1a3a Compare November 17, 2014 17:09
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, Await}

abstract class FlinkMiniCluster(userConfiguration: Configuration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user config argument is a great idea. I was checking this, because I need something similar for my changes. I will add a similar thing to my branch for now.

@hsaputra
Copy link
Contributor

Look like rebase is needed for this PR?

@tillrohrmann
Copy link
Contributor Author

Yes it is. But there are still some performance issues I have to figure out first.

val nextInputSplit = currentJobs.get(jobID) match {
case Some((executionGraph,_)) => executionGraph.getJobVertex(vertexID) match {
case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When passing a null hostname here, input split localization is impossible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I probably just copied the old bug.

@tillrohrmann tillrohrmann force-pushed the akka_scala branch 6 times, most recently from a9c7381 to 077fdfb Compare December 16, 2014 16:04
import org.junit.BeforeClass;
import org.junit.Test;

//TODO: Update test case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I forgot to delete it.

@uce
Copy link
Contributor

uce commented Dec 17, 2014

Looks like a big ass change ;-) My inline comments were more or less random. I just wanted to get a feeling of the changes.

I can have a look in the next days. :-)

@tillrohrmann
Copy link
Contributor Author

Thanks for the feedback Ufuk. I tried to address the points you have mentioned. Concerning the problem of failing travis builds, it turned out to be race condition in the execution graph I stumbled upon. Due to this race condition, it was possible for a job to finish before all vertices have properly reached a finished state. It is fixed with the latest commit.

@uce
Copy link
Contributor

uce commented Dec 18, 2014

It looks like it was really hard to catch. :-) Congrats! I don't know how much work it is, but we might want to make sure to have a test case for this (if possible) or add an extra comment that no one messes with the patch. ;-)

… blocking calls.

Fixed ExecutionVertexCancelTest after removing submitTask and cancelTask.
…s reregistration in case of disconnect. Introduced akka.ask.timeout config parameter to configure akka timeouts.
…er their tasks. Replaced the scheduler's execution service with akka's futures. Introduced TestStreamEnvironment to use ForkableFlinkMiniCluster for test execution.
…ch mechanism is the current mean to detect dead instances.
…AllocatedSlot, Instance, CoLocationConstraint, SharedSlot and SlotSharingGroupAssignment serializable. Integrated Kryo to be used to serialize Akka messages.
…integration tests. Increase akka logger startup timeout.
…connection manager if a single task manager is used for local execution. Remove synchronized blcok in getReceiverList of ChannelManager which effectively serialized the connection lookup calls of a single task manager.

Fix Java6 problem that File has no method toPath
…ndency conflicts. Adjust code to comply to respective Akka API. Remove obsolete TODO.
… all vertices have called the finalizeOnMaster method.
…nager is still alive. Terminate waiting for a response in case of a job manager outage.
@asfgit asfgit merged commit 88e64fc into apache:master Dec 18, 2014
@tillrohrmann tillrohrmann deleted the akka_scala branch September 16, 2015 12:51
tzulitai pushed a commit to tzulitai/flink that referenced this pull request Jan 15, 2021
jnh5y pushed a commit to jnh5y/flink that referenced this pull request Dec 18, 2023
RocMarshal pushed a commit to RocMarshal/flink that referenced this pull request May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants