Skip to content

Commit

Permalink
[SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The current code in `HeartbeatReceiverSuite`, executorId is set as below:
```
  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"
```

The executorId is sent to driver when register as below:

```
test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
  ...
}
```

Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below:
```
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
    executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
    currentExecutorIdCounter = executorId.toInt
  }
  ...
```

`executorId.toInt` will cause NumberformatException.

This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true.

**To fix**
Rectify executorId and replace `askWithRetry` with `askSync`, refer to #16690
## How was this patch tested?
This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: jinxing <jinxing@meituan.com>

Closes #16779 from jinxing64/SPARK-19437.
  • Loading branch information
jinxing authored and zsxwing committed Feb 3, 2017
1 parent 1d5d2a9 commit c86a57f
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class HeartbeatReceiverSuite
with PrivateMethodTester
with LocalSparkContext {

private val executorId1 = "executor-1"
private val executorId2 = "executor-2"
private val executorId1 = "1"
private val executorId2 = "2"

// Shared state that must be reset before and after each test
private var scheduler: TaskSchedulerImpl = null
Expand Down Expand Up @@ -93,12 +93,12 @@ class HeartbeatReceiverSuite

test("task scheduler is set correctly") {
assert(heartbeatReceiver.scheduler === null)
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
assert(heartbeatReceiver.scheduler !== null)
}

test("normal heartbeat") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
Expand All @@ -116,14 +116,14 @@ class HeartbeatReceiverSuite
}

test("reregister if heartbeat from unregistered executor") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
// Received heartbeat from unknown executor, so we ask it to re-register
triggerHeartbeat(executorId1, executorShouldReregister = true)
assert(getTrackedExecutors.isEmpty)
}

test("reregister if heartbeat from removed executor") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
// Remove the second executor but not the first
Expand All @@ -140,7 +140,7 @@ class HeartbeatReceiverSuite

test("expire dead hosts") {
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
Expand All @@ -149,7 +149,7 @@ class HeartbeatReceiverSuite
heartbeatReceiverClock.advance(executorTimeout / 2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
heartbeatReceiverClock.advance(executorTimeout)
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
// Only the second executor should be expired as a dead host
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
val trackedExecutors = getTrackedExecutors
Expand All @@ -173,11 +173,11 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
Expand All @@ -195,7 +195,7 @@ class HeartbeatReceiverSuite
// Here we use a timeout of O(seconds), but in practice this whole test takes O(10ms).
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverClock.advance(executorTimeout * 2)
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread())
killThread.shutdown() // needed for awaitTermination
killThread.awaitTermination(10L, TimeUnit.SECONDS)
Expand All @@ -213,7 +213,7 @@ class HeartbeatReceiverSuite
executorShouldReregister: Boolean): Unit = {
val metrics = TaskMetrics.empty
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId))
if (executorShouldReregister) {
assert(response.reregisterBlockManager)
Expand Down

0 comments on commit c86a57f

Please sign in to comment.