New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4563][core] Allow driver to advertise a different network address. #15120
Conversation
…ess. The goal of this feature is to allow the Spark driver to run in an isolated environment, such as a docker container, and be able to use the host's port forwarding mechanism to be able to accept connections from the outside world. The change is restricted to the driver: there is no support for achieving the same thing on executors (or the YARN AM for that matter). Those still need full access to the outside world so that, for example, connections can be made to an executor's block manager. The core of the change is simple: add a new configuration that tells what's the address the driver should bind to, which can be different than the address it advertises to executors (spark.driver.host). Everything else is plumbing the new configuration where it's needed. To use the feature, the host starting the container needs to set up the driver's port range to fall into a range that is being forwarded; this required the block manager port to need a special configuration just for the driver, which falls back to the existing spark.blockManager.port when not set. This way, users can modify the driver settings without affecting the executors; it would theoretically be nice to also have different retry counts for driver and executors, but given that docker (at least) allows forwarding port ranges, we can probably live without that for now. Because of the nature of the feature it's kinda hard to add unit tests; I just added a simple one to make sure the configuration works. This was tested with a docker image running spark-shell with the following command: docker blah blah blah \ -p 38000-38100:38000-38100 \ [image] \ spark-shell \ --num-executors 3 \ --conf spark.shuffle.service.enabled=false \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.driver.host=[host's address] \ --conf spark.driver.port=38000 \ --conf spark.driver.blockManager.port=38020 \ --conf spark.ui.port=38040 Running on YARN; verified the driver works, executors start up and listen on ephemeral ports (instead of using the driver's config), and that caching and shuffling (without the shuffle service) works. Clicked through the UI to make sure all pages (including executor thread dumps) worked. Also tested apps without docker, and ran unit tests.
Test build #65509 has finished for PR 15120 at commit
|
Test build #65516 has finished for PR 15120 at commit
|
/cc @zsxwing |
private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress") | ||
.doc("Address where to bind network listen sockets on the driver.") | ||
.stringConf | ||
.createWithDefault(Utils.localHostName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a broken change. If a user uses spark.driver.host
to specify the bind address, it won't work now. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should, as long as he doesn't set "spark.driver.bindAddress" - which is a new setting that is being added to override that behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I miss something. val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
won't use spark.driver.host
. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see what you mean. I might have inverted the config resolution order... let me take a look.
And not the other way around, for backwards compatibility. Includes a small change to the resolution of fallback conf entry values.
Test build #65692 has finished for PR 15120 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just some nits
s"service$serviceString (for example spark.ui.port for SparkUI) to an available " + | ||
"port or increasing spark.port.maxRetries." | ||
s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " + | ||
s"the appropriate port for the service$serviceString (for example spark.ui.port " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since you are touching this, could you add a space between service
and $serviceString
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The space is actually in $serviceString
because the service name is allowed to be empty. (Historical reasons? Don't ask me.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
_conf.setIfMissing("spark.driver.host", Utils.localHostName()) | ||
// Set Spark driver host and port system properties. This explicitly sets the configuration | ||
// instead of relying on the default value of the config constant. | ||
_conf.set(DRIVER_HOST_ADDRESS, conf.get(DRIVER_HOST_ADDRESS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you use _conf
instead? I thought _conf
and conf
were different at first glance. But actually they are same.
@@ -66,7 +66,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con | |||
findEntry(key) match { | |||
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString) | |||
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString) | |||
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key) | |||
case e: FallbackConfigEntry[_] => get(e.fallback.key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to backport this fix to 2.0.1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this code is only in master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are facing this issue with Spark 1.6 . Are we going to backport this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What issue? The code you're commenting on does not exist in 1.6. If you're having issues, please ask questions on the mailing lists or use the bug tracker.
LGTM |
Test build #65726 has finished for PR 15120 at commit
|
Merging to master. Thanks! |
Hi, I'm currently in need of using this change because we're running inside containers that need port mapping. We're not using master (currently using 2.0.2). Do you know if there are any alternatives while the commit isn't released? |
Not really. |
The goal of this feature is to allow the Spark driver to run in an
isolated environment, such as a docker container, and be able to use
the host's port forwarding mechanism to be able to accept connections
from the outside world.
The change is restricted to the driver: there is no support for achieving
the same thing on executors (or the YARN AM for that matter). Those still
need full access to the outside world so that, for example, connections
can be made to an executor's block manager.
The core of the change is simple: add a new configuration that tells what's
the address the driver should bind to, which can be different than the address
it advertises to executors (spark.driver.host). Everything else is plumbing
the new configuration where it's needed.
To use the feature, the host starting the container needs to set up the
driver's port range to fall into a range that is being forwarded; this
required the block manager port to need a special configuration just for
the driver, which falls back to the existing spark.blockManager.port when
not set. This way, users can modify the driver settings without affecting
the executors; it would theoretically be nice to also have different
retry counts for driver and executors, but given that docker (at least)
allows forwarding port ranges, we can probably live without that for now.
Because of the nature of the feature it's kinda hard to add unit tests;
I just added a simple one to make sure the configuration works.
This was tested with a docker image running spark-shell with the following
command:
docker blah blah blah
-p 38000-38100:38000-38100
[image]
spark-shell
--num-executors 3
--conf spark.shuffle.service.enabled=false
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.host=[host's address]
--conf spark.driver.port=38000
--conf spark.driver.blockManager.port=38020
--conf spark.ui.port=38040
Running on YARN; verified the driver works, executors start up and listen
on ephemeral ports (instead of using the driver's config), and that caching
and shuffling (without the shuffle service) works. Clicked through the UI
to make sure all pages (including executor thread dumps) worked. Also tested
apps without docker, and ran unit tests.