Skip to content

Commit

Permalink
Implement security option in VM cluster managers (#222)
Browse files Browse the repository at this point in the history
* Implement security option

* Make secure connections the default option

* Update docstrings

* Fix disabling security

* Add a security documentation page

* Add intersphinx mappings

* Try to get intersphinx to work
  • Loading branch information
jacobtomlinson committed Jan 27, 2021
1 parent d412a9e commit a251330
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dask_cloudprovider/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class EC2Cluster(VMCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.
Notes
-----
Expand Down
2 changes: 1 addition & 1 deletion dask_cloudprovider/azure/azurevm.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class AzureVMCluster(VMCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.
Examples
--------
Expand Down
2 changes: 1 addition & 1 deletion dask_cloudprovider/digitalocean/droplet.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class DropletCluster(VMCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.
Examples
--------
Expand Down
10 changes: 6 additions & 4 deletions dask_cloudprovider/gcp/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@ async def start_scheduler(self):
# scheduler must be publicly available, and firewall
# needs to be in place to allow access to 8786 on
# the external IP
self.address = f"tcp://{self.external_ip}:8786"
self.address = f"{self.cluster.protocol}://{self.external_ip}:8786"
else:
# if the client is running inside GCE environment
# it's better to use internal IP, which doesn't
# require firewall setup
self.address = f"tcp://{self.internal_ip}:8786"
self.address = f"{self.cluster.protocol}://{self.internal_ip}:8786"
await self.wait_for_scheduler()

# need to reserve internal IP for workers
Expand All @@ -320,7 +320,9 @@ def __init__(
self.scheduler = scheduler
self.worker_class = worker_class
self.name = f"dask-{self.cluster.uuid}-worker-{str(uuid.uuid4())[:8]}"
internal_scheduler = f"{self.cluster.scheduler_internal_ip}:8786"
internal_scheduler = (
f"{self.cluster.protocol}://{self.cluster.scheduler_internal_ip}:8786"
)
self.command = " ".join(
[
self.set_env,
Expand Down Expand Up @@ -444,7 +446,7 @@ class GCPCluster(VMCluster):
security : Security or bool (optional)
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.
Examples
--------
Expand Down
7 changes: 4 additions & 3 deletions dask_cloudprovider/gcp/tests/test_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ async def test_init():
@pytest.mark.asyncio
async def test_get_cloud_init():
skip_without_credentials()

cloud_init = GCPCluster.get_cloud_init(docker_args="--privileged")
cloud_init = GCPCluster.get_cloud_init(security=True, docker_args="--privileged")
assert "dask-scheduler" in cloud_init
assert "# Bootstrap" in cloud_init
assert " --privileged " in cloud_init
Expand All @@ -68,7 +67,9 @@ async def test_get_cloud_init():
async def test_create_cluster():
skip_without_credentials()

async with GCPCluster(asynchronous=True, env_vars={"FOO": "bar"}) as cluster:
async with GCPCluster(
asynchronous=True, env_vars={"FOO": "bar"}, security=True
) as cluster:

assert cluster.status == Status.running

Expand Down
47 changes: 43 additions & 4 deletions dask_cloudprovider/generic/vmcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from distributed.core import Status
from distributed.worker import Worker as _Worker
from distributed.scheduler import Scheduler as _Scheduler
from distributed.security import Security
from distributed.deploy.spec import SpecCluster, ProcessInterface
from distributed.utils import warn_on_duration, serialize_for_cli, cli_keywords

Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(
async def start(self):
self.cluster._log("Creating scheduler instance")
ip = await self.create_vm()
self.address = f"tcp://{ip}:8786"
self.address = f"{self.protocol}://{ip}:8786"
await self.wait_for_scheduler()
await super().start()

Expand Down Expand Up @@ -197,7 +198,7 @@ class VMCluster(SpecCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.
"""

Expand All @@ -221,28 +222,66 @@ def __init__(
docker_image="daskdev/dask:latest",
docker_args: str = "",
env_vars: dict = {},
security: bool = True,
protocol: str = None,
**kwargs,
):
if self.scheduler_class is None or self.worker_class is None:
raise RuntimeError(
"VMCluster is not intended to be used directly. See docstring for more info."
)
self._n_workers = n_workers

if not security:
self.security = None
elif security is True:
# True indicates self-signed temporary credentials should be used
self.security = Security.temporary()
elif not isinstance(security, Security):
raise TypeError("security must be a Security object")
else:
self.security = security

if protocol is None:
if self.security and self.security.require_encryption:
self.protocol = "tls"
else:
self.protocol = "tcp"
else:
self.protocol = protocol

if self.security and self.security.require_encryption:
dask.config.set(
{
"distributed.comm.default-scheme": self.protocol,
"distributed.comm.require-encryption": True,
"distributed.comm.tls.ca-file": self.security.tls_ca_file,
"distributed.comm.tls.scheduler.key": self.security.tls_scheduler_key,
"distributed.comm.tls.scheduler.cert": self.security.tls_scheduler_cert,
"distributed.comm.tls.worker.key": self.security.tls_worker_key,
"distributed.comm.tls.worker.cert": self.security.tls_worker_cert,
"distributed.comm.tls.client.key": self.security.tls_client_key,
"distributed.comm.tls.client.cert": self.security.tls_client_cert,
}
)

image = self.scheduler_options.get("docker_image", False) or docker_image
self.options["docker_image"] = image
self.scheduler_options["docker_image"] = image
self.scheduler_options["env_vars"] = env_vars
self.scheduler_options["protocol"] = protocol
self.scheduler_options["scheduler_options"] = scheduler_options
self.worker_options["env_vars"] = env_vars
self.options["docker_args"] = docker_args
self.scheduler_options["docker_args"] = docker_args
self.worker_options["docker_args"] = docker_args
self.worker_options["docker_image"] = image
self.worker_options["worker_class"] = worker_class
self.worker_options["protocol"] = protocol
self.worker_options["worker_options"] = worker_options
self.scheduler_options["scheduler_options"] = scheduler_options
self.uuid = str(uuid.uuid4())[:8]

super().__init__(**kwargs)
super().__init__(**kwargs, security=self.security)

async def call_async(self, f, *args, **kwargs):
"""Run a blocking function in a thread as a coroutine.
Expand Down
7 changes: 5 additions & 2 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,8 @@
]


# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {"https://docs.python.org/": None}
intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"dask": ("https://docs.dask.org/en/latest/", None),
"distributed": ("https://distributed.dask.org/en/latest/", None),
}
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ this code.
:caption: Advanced

troubleshooting.rst
security.rst
gpus.rst
packer.rst

Expand Down
47 changes: 47 additions & 0 deletions doc/source/security.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
Security
========

Dask Cloudprovider aims to balance ease of use with security best practices. The two are not always compatible so this document aims to outline the compromises and decisions made in this library.

Public Schedulers
-----------------

For each cluster manager to work correctly it must be able to make a connection to the Dask scheduler on port ``8786``.
In many cluster managers the default option is to expose the Dask scheduler and dashboard to the internet via a public IP address.
This makes things quick and easy for new users to get up and running, but may pose a security risk long term.

Many organisations have policies which do not allow users to assign public IP addresses or open ports. Our best practices
advice is to use Dask Cloudprovider from within a cloud platform, either from a VM or a managed environment. Then disable public
networking.

See each cluster manager for configuration options.

Authentication and encryption
-----------------------------

Cluster managers such as :class:`dask_cloudprovider.aws.EC2Cluster`, :class:`dask_cloudprovider.azure.AzureVMCluster`,
:class:`dask_cloudprovider.gcp.GCPCluster` and :class:`dask_cloudprovider.digitalocean.DropletCluster` enable certificate based authentication
and encryption by default.

When a cluster is launched with any of these cluster managers a set of temporary keys will be generated and distributed to the cluster nodes
via their startup script. All communication between the client, scheduler and workers will then be encrypted and only clients and workers with
valid certificates will be able to connect to the scheduler.

You can also specify your own certificates using the :class:`distributed.security.Security` object.

.. code-block:: python
>>> from dask_cloudprovider.gcp import GCPCluster
>>> from dask.distributed import Client
>>> from distributed.security import Security
>>> sec = Security(tls_ca_file='cluster_ca.pem',
... tls_client_cert='cli_cert.pem',
... tls_client_key='cli_key.pem',
... require_encryption=True)
>>> cluster = GCPCluster(n_workers=1, security=sec)
>>> client = Client(cluster)
>>> client
<Client: 'tls://10.142.0.29:8786' processes=0 threads=0, memory=0 B>
You can disable secure connections by setting the ``security`` keyword argument to ``False``. This may be desirable when troubleshooting or
when running on a trusted network (entirely inside a VPC for example).

0 comments on commit a251330

Please sign in to comment.