Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite. #16779

Closed
wants to merge 2 commits into from

Conversation

jinxing64
Copy link

What changes were proposed in this pull request?

The current code in HeartbeatReceiverSuite, executorId is set as below:

  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"

The executorId is sent to driver when register as below:

test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
  ...
}

Receiving RegisterExecutor in CoarseGrainedSchedulerBackend, the executorId will be compared with currentExecutorIdCounter as below:

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
    executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
    currentExecutorIdCounter = executorId.toInt
  }
  ...

executorId.toInt will cause NumberformatException.

This unit test can pass currently because of askWithRetry, when catching exception, RPC will call again, thus it will go if branch and return true.

To fix
Rectify executorId and replace askWithRetry with askSync, refer to #16690

How was this patch tested?

This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

@srowen
Copy link
Member

srowen commented Feb 2, 2017

CC @zsxwing

@SparkQA
Copy link

SparkQA commented Feb 2, 2017

Test build #3553 has finished for PR 16779 at commit 06efcac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 2, 2017

@jinxing64 Good catch. Since you are touching this file, could you also replace other askWithRetry in this file with askSync?

@zsxwing
Copy link
Member

zsxwing commented Feb 3, 2017

LGTM pending tests

@jinxing64
Copy link
Author

@zsxwing
Thanks a lot for reviewing this. Not sure why the test doesn't start automatically.

@zsxwing
Copy link
Member

zsxwing commented Feb 3, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Feb 3, 2017

Test build #72297 has finished for PR 16779 at commit a9bc3f4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 3, 2017

Thanks! Merging to master.

@asfgit asfgit closed this in c86a57f Feb 3, 2017
@jinxing64
Copy link
Author

Thanks a lot for reviewing this.

cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

The current code in `HeartbeatReceiverSuite`, executorId is set as below:
```
  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"
```

The executorId is sent to driver when register as below:

```
test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
  ...
}
```

Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below:
```
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
    executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
    currentExecutorIdCounter = executorId.toInt
  }
  ...
```

`executorId.toInt` will cause NumberformatException.

This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true.

**To fix**
Rectify executorId and replace `askWithRetry` with `askSync`, refer to apache#16690
## How was this patch tested?
This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: jinxing <jinxing@meituan.com>

Closes apache#16779 from jinxing64/SPARK-19437.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants