Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1489] Fixes blocking scheduleOrUpdateConsumers message calls #378

Closed

Conversation

tillrohrmann
Copy link
Contributor

Replaces the blocking calls with futures which in case of an exception let the respective task fail. Furthermore, the PartitionInfos are buffered on the JobManager in case that some of the consumers are not yet scheduled. Once the state of the consumers switched to running, all buffered partition infos are sent to the consumers.

@uce
Copy link
Contributor

uce commented Feb 10, 2015

Very nice. I will have a detailed look later.

@zentol Can you also test it with the Python API? I think you initially noticed the problem.


// double check to resolve race conditions
if(consumerVertex.getExecutionState() == RUNNING){
consumerVertex.sendPartitionInfos();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to verify: the double check & send relies on the fact that update messages at the task manager are idempotent, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UpdateTask messages are idempotent in the BufferReader. But my intention was not to send any UpdateTask messages twice. The ConcurrentLinkedQueue should make sure that every element is only dequeued once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, true. :-)

@uce
Copy link
Contributor

uce commented Feb 10, 2015

Looks good to me. +1

We chatted about batching update task calls. Did you realize a problem with it or can we open an "improvement" issue for it?

@tillrohrmann
Copy link
Contributor Author

You're right. At the moment there is no aggregation of messages. I'll add it.

@uce
Copy link
Contributor

uce commented Feb 11, 2015

There is a problem: https://travis-ci.org/apache/flink/jobs/50215407

java.lang.IllegalStateException: Consumer state is FINISHED but was expected to be RUNNING.
    at org.apache.flink.runtime.deployment.PartialPartitionInfo.createPartitionInfo(PartialPartitionInfo.java:81)
    at org.apache.flink.runtime.executiongraph.Execution.sendPartitionInfos(Execution.java:581)
    at org.apache.flink.runtime.executiongraph.Execution.switchToRunning(Execution.java:654)
    at org.apache.flink.runtime.executiongraph.Execution.access$100(Execution.java:88)
    at org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:336)
    at akka.dispatch.OnComplete.internal(Future.scala:247)
    at akka.dispatch.OnComplete.internal(Future.scala:244)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

…s with asynchronous futures. Buffers PartitionInfos at the JobManager in case that the respective consumer has not been scheduled.

Conflicts:
	flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala

Adds TaskUpdate message aggregation before sending the messages to the TaskManagers
@tillrohrmann
Copy link
Contributor Author

I added the UpdateTask message aggregation. I also had to rework the PartitionInfo creation to make it work with the concurrent task updates. This requires another review of the code before we can merge it.

@rmetzger
Copy link
Contributor

Cool. I'm testing this PR on a cluster now.

@rmetzger
Copy link
Contributor

The job that was previously failing is fixed with this change.

We should merge this change ASAP, because its kinda impossible right now to seriously use flink 0.9-SNAPSHOT without it.

@asfgit asfgit closed this in aedbacf Feb 11, 2015
marthavk pushed a commit to marthavk/flink that referenced this pull request Jun 9, 2015
…s with asynchronous futures. Buffers PartitionInfos at the JobManager in case that the respective consumer has not been scheduled.

Conflicts:
	flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala

Adds TaskUpdate message aggregation before sending the messages to the TaskManagers

This closes apache#378
@tillrohrmann tillrohrmann deleted the fixScheduleOrUpdateConsumers branch September 16, 2015 13:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants