Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19900][core]Remove driver when relaunching. #18084

Closed
wants to merge 10 commits into from

Conversation

@liyichao
Copy link
Contributor

commented May 24, 2017

This is #17888 .

Below are some spark ui snapshots.

Master, after worker disconnects:

master_disconnect

Master, after worker reconnects, notice the running drivers part:

master_reconnects

This patch, after worker disconnects:
patch_disconnect

This patch, after worker reconnects:
image

cc @cloud-fan @jiangxb1987

@gatorsmile

This comment has been minimized.

Copy link
Member

commented May 24, 2017

ok to test

driver.state = DriverState.RELAUNCHING
waitingDrivers += driver
removeDriver(driver.id, DriverState.RELAUNCHING, None)
val newDriver = createDriver(driver.desc)

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 May 24, 2017

Contributor

Do you have a good reason to remove and create the driver in this case? It looks like some kind of overkill compared to the old logic.

This comment has been minimized.

Copy link
@liyichao

liyichao May 25, 2017

Author Contributor

First, we must distinguish the original driver and the newly relaunched one, because there will be statusUpdate of the two versions to arrive at master. For example, when the network partitioned worker reconnects to master, it will send DriverStateChanged with the driver id, and master must recognize it is the state of the original driver and not state of the newly launched driver.

The patch simply choose a new driver id to do this, which also has some Shortcomings, however. For example, In the UI, the two versions of driver are not related, and the final state is RELAUNCHING(which seems better to be relaunched).

Another way is to add some like attemptId to driver state, and then Let DriverStateChanged bring the attemptId to indicate its entity. This seems more complex.

What's your opinion?

}

val driverEnv2 = RpcEnv.create("driver2", "localhost", 22345, conf, new SecurityManager(conf))
val fakeDriver2 = driverEnv2.setupEndpoint("driver", new RpcEndpoint {

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 May 24, 2017

Contributor

I believe these duplicate code can be combined.

This comment has been minimized.

Copy link
@liyichao

liyichao May 25, 2017

Author Contributor

updated, please have a look.

@SparkQA

This comment has been minimized.

Copy link

commented May 25, 2017

Test build #77312 has finished for PR 18084 at commit 9ea2061.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented May 25, 2017

Test build #77353 has finished for PR 18084 at commit 6ab9a0f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extends RpcEndpoint
@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented May 25, 2017

Maybe some more actions should be done in relaunchDriver() such as have driver.worker removes the dependency of the relaunched driver, but it will be sort of wasting resources to remove and later create a new driver, we should always prevent doing such things.

Now, to help us step forward, would you like to spend some time to create a valid regression test case? That will help a lot when we are discussing further about the proper bug-fix proposal.

@liyichao

This comment has been minimized.

Copy link
Contributor Author

commented May 26, 2017

Thanks for the reply. I have added some more tests to verify the state of master and worker after relaunching.

I will try to think about if there are ways to reuse the old driver struct.

@liyichao

This comment has been minimized.

Copy link
Contributor Author

commented May 26, 2017

Hi, I've thought more thoroughly about this.

The main state involved here is Master.workers, Master.idToWorker, and WorkerInfo.drivers. Say driverId1 runs on Worker A. Assume A is network partitioned, master calls removeWorker which set the worker's state to DEAD, and remove the worker from persistenceEngine, but does not remove it from Master.workers. Then launch the driver on Worker B.

When A reconnects, it will reregister to master, then master will remove the old WorkerInfo (whose drivers field is not empty), and add a new WorkerInfo (say wf_A), whose drivers are empty. After registered, the worker then re-sync state with master by sending WorkerLatestState with a driverId1, the master does not find it in wf_A.drivers, so it asks worker A to kill it. After killed the driver, worker A sends DriverStateChanged(driverId1, DriverState.KILLED), the master then mistakenly removes driverId1, which now runs on worker B.

How to recognize the DriverStateChanged come from worker A, not worker B? Maybe we can add a field workerId to DriverStateChanged, but is it possible the second run of driverId1 is on worker A? consider the following scenario:

  1. worker A network partitioned
  2. master put driverId1 to waitingDrivers
  3. worker A reconnects and register
  4. master launch driverId1 on worker A
  5. worker A's WorkerLatestState(_,_,Seq(driverId1)) arrives at master

Now, how does worker A handle the LaunchDriver(driverId1) when it has already running a driver with driverId1? how does the master process WorkerLatestState? With the above message order, master will send KillDriver to worker A, then worker will kill driverId1, which is the relaunched one, then send DriverStateChanged to master, master will relaunch it...

After all this, I think it better to relaunch the driver with a new id to make it simple. As to the cost, removeDriver will be called anyway, if not here, it will be called when DriverStateChanged come. persistenceEngine have to be called because the persistent state driver.id changed. So the cost is justified. And relaunchDriver is called when worker down or master down, it seems rarely because framework code is more stable than application code, so software bugs are less likely.

@SparkQA

This comment has been minimized.

Copy link

commented May 26, 2017

Test build #77423 has finished for PR 18084 at commit da0f977.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@liyichao

This comment has been minimized.

Copy link
Contributor Author

commented Jun 1, 2017

Copy link
Contributor

left a comment

Since the issue is caused by we may have two running Driver instance with the same id under some condition(worker lost and later rejoin), we have to resolve the root cause by adding a workerId on the DriverStateChanged message, so we can decide whether we should remove the driver we recorded.

@liyichao

This comment has been minimized.

Copy link
Contributor Author

commented Jun 13, 2017

Hi, add a workerId may not work. For example, this scenario:

  • driver with driverId1 started on worker1
  • worker1 lost
  • master add driverId1 to waitingDrivers
  • worker1 reconnects and sends DriverStateChanged(driverId1), but the message delayed in the network.
  • master starts driverId1 on worker1.
  • master receives the message.

Now, what master should do?

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented Jun 13, 2017

We should also check in Worker that we don't launch duplicate drivers, I think the logic should be added in handling LaunchDriver message.

@liyichao

This comment has been minimized.

Copy link
Contributor Author

commented Jun 13, 2017

OK, another scenario:

  • driver with driverId1 started on worker1
  • worker1 lost
  • master add driverId1 to waitingDrivers
  • worker1 reconnects and sends DriverStateChanged(driverId1), but the message delayed in the network, and remove driverId1 from local state
  • master starts driverId1 on worker1.
  • master receives the message.
@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented Jun 13, 2017

I think your point is, in case a LaunchDriver message and a KillDriver message are send out simultaneously, there is a race condition that which message arrives to worker earlier is not determined. If the KillDriver message arrives later, then we finally get a finished driver instead of a running driver.
So, I'm not against the idea that we should rise the driver id on relaunch now, because it is kind of a new epoch that help us bypass the nested race condition issue. But please be sure to add comment to illustrate the condition in detail and add test cases to cover this.
Also cc @cloud-fan FYI

@cloud-fan

This comment has been minimized.

Copy link
Contributor

commented Jun 14, 2017

the fix LGTM, it would be better to add some comments to explain it clearly

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented Jun 14, 2017

Also please rebase the latest master :)

* 'master' of https://github.com/apache/spark: (149 commits)
  [SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure
  [SPARK-20986][SQL] Reset table's statistics after PruneFileSourcePartitions rule.
  [SPARK-12552][CORE] Correctly count the driver resource when recovering from failure for Master
  [SPARK-21016][CORE] Improve code fault tolerance for converting string to number
  [SPARK-21051][SQL] Add hash map metrics to aggregate
  [SPARK-21064][CORE][TEST] Fix the default value bug in NettyBlockTransferServiceSuite
  [SPARK-21060][WEB-UI] Css style about paging function is error in the executor page. Css style about paging function is error in the executor page. It is different of history server ui paging function css style.
  [SPARK-21039][SPARK CORE] Use treeAggregate instead of aggregate in DataFrame.stat.bloomFilter
  [SPARK-21006][TESTS][FOLLOW-UP] Some Worker's RpcEnv is leaked in WorkerSuite
  [SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions
  [TEST][SPARKR][CORE] Fix broken SparkSubmitSuite
  [SPARK-19910][SQL] `stack` should not reject NULL values due to type mismatch
  Revert "[SPARK-21046][SQL] simplify the array offset and length in ColumnVector"
  [SPARK-20979][SS] Add RateSource to generate values for tests and benchmark
  [SPARK-21050][ML] Word2vec persistence overflow bug fix
  [SPARK-21059][SQL] LikeSimplification can NPE on null pattern
  [SPARK-20345][SQL] Fix STS error handling logic on HiveSQLException
  [SPARK-17914][SQL] Fix parsing of timestamp strings with nanoseconds
  [SPARK-21046][SQL] simplify the array offset and length in ColumnVector
  [SPARK-21041][SQL] SparkSession.range should be consistent with SparkContext.range
  ...
@SparkQA

This comment has been minimized.

Copy link

commented Jun 14, 2017

Test build #78023 has finished for PR 18084 at commit 0887eab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class KVStoreSerializer
  • public abstract class KVStoreView<T> implements Iterable<T>
  • public class KVTypeInfo
  • public class LevelDB implements KVStore
  • public static class TypeAliases
  • class LevelDBIterator<T> implements KVStoreIterator<T>
  • class LevelDBTypeInfo
  • class Index
  • public class UnsupportedStoreVersionException extends IOException
  • logError(s\"Not measuring processing time for listener class $className because a \" +
  • class FilteredObjectInputStream extends ObjectInputStream
  • String.format(\"Unexpected class in stream: %s\", desc.getName()));
  • class HasMinSupport(Params):
  • class HasNumPartitions(Params):
  • class HasMinConfidence(Params):
  • case class UnresolvedRelation(
  • case class DayOfWeek(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
  • case class Uuid() extends LeafExpression
  • case class StringReplace(srcExpr: Expression, searchExpr: Expression, replaceExpr: Expression)
  • case class Chr(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
  • trait Command extends LogicalPlan
  • case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan)
  • case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())
  • case class HintInfo(broadcast: Boolean = false)
  • public final class ParquetDictionary implements Dictionary
  • case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan
  • class RateSourceProvider extends StreamSourceProvider with DataSourceRegister
  • class RateStreamSource(
  • case class StateStoreId(
  • class UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null)
  • trait StateStoreWriter extends StatefulOperator
case RegisteredWorker(masterRef, _, _) =>
masterRef.send(WorkerLatestState(id, Nil, drivers.keys.toSeq))
case LaunchDriver(driverId, desc) =>
drivers(driverId) = driverId

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jun 14, 2017

Contributor

seems drivers can be a set instead of a map?

val master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jun 14, 2017

Contributor

shall we move this out of the eventually {...} block?

This comment has been minimized.

Copy link
@liyichao

liyichao Jun 14, 2017

Author Contributor

Hi, this can not be moved because MasterStateResponse is changed over time. If we move the rpc out, the masterState will never change, and the assert will fail.

See the above test SPARK-20529:..., there is a same eventually assert.

}

eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jun 14, 2017

Contributor

ditto

@cloud-fan

This comment has been minimized.

Copy link
Contributor

commented Jun 14, 2017

LGTM, pending test

@SparkQA

This comment has been minimized.

Copy link

commented Jun 14, 2017

Test build #78049 has finished for PR 18084 at commit 9ddf23a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor

commented Jun 15, 2017

thanks, merging to master!

@asfgit asfgit closed this in 2924674 Jun 15, 2017
dataknocker pushed a commit to dataknocker/spark that referenced this pull request Jun 16, 2017
This is apache#17888 .

Below are some spark ui snapshots.

Master, after worker disconnects:

<img width="1433" alt="master_disconnect" src="https://cloud.githubusercontent.com/assets/2576762/26398687/d0ee228e-40ac-11e7-986d-d3b57b87029f.png">

Master, after worker reconnects, notice the `running drivers` part:

<img width="1412" alt="master_reconnects" src="https://cloud.githubusercontent.com/assets/2576762/26398697/d50735a4-40ac-11e7-80d8-6e9e1cf0b62f.png">

This patch, after worker disconnects:
<img width="1412" alt="patch_disconnect" src="https://cloud.githubusercontent.com/assets/2576762/26398009/c015d3dc-40aa-11e7-8bb4-df11a1f66645.png">

This patch, after worker reconnects:
![image](https://cloud.githubusercontent.com/assets/2576762/26398037/d313769c-40aa-11e7-8613-5f157d193150.png)

cc cloud-fan jiangxb1987

Author: Li Yichao <lyc@zhihu.com>

Closes apache#18084 from liyichao/SPARK-19900-1.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.