When calling retire_workers on a cluster with TLS security, the worker is successfully removed from the cluster, but will fail in shutdown and endup in a hung state.
Steps to reproduce:
1. Create Keypair:
# create_keypair.py
from datetime import datetime, timedelta
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
def write_keypair():
"""Create a new self-signed certificate & key pair."""
key = rsa.generate_private_key(public_exponent=65537,
key_size=2048,
backend=default_backend())
key_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
dask_internal = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, 'dask-internal')]
)
now = datetime.utcnow()
cert = (x509.CertificateBuilder()
.subject_name(dask_internal)
.issuer_name(dask_internal)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=365))
.sign(key, hashes.SHA256(), default_backend()))
cert_bytes = cert.public_bytes(serialization.Encoding.PEM)
with open('dask.crt', 'wb') as f:
f.write(cert_bytes)
with open('dask.key', 'wb') as f:
f.write(key_bytes)
if __name__ == '__main__':
write_keypair()
2. Start scheduler
$ dask-scheduler --tls-ca-file dask.crt --tls-key dask.key --tls-cert dask.crt
3. Start worker
$ dask-worker <SCHEDULER_ADDRESS> --tls-ca-file dask.crt --tls-key dask.key --tls-cert dask.crt
4. Create client, and call retire_workers
from distributed.security import Security
from distributed import Client
sec = Security(tls_client_key='dask.key', tls_client_cert='dask.crt', tls_ca_file='dask.crt')
client = Client('<SCHEDULER_ADDRESS>', security=sec)
client.retire_workers(n=1, close_workers=True)
5. Observer error from worker during shutdown:
$ dask-worker tls://192.168.1.5:8786 --tls-ca-file dask.crt --tls-key dask.key --tls-cert dask.crt
distributed.nanny - INFO - Start Nanny at: 'tls://192.168.1.5:61830'
distributed.worker - INFO - Start worker at: tls://192.168.1.5:61831
distributed.worker - INFO - Listening to: tls://192.168.1.5:61831
distributed.worker - INFO - nanny at: 192.168.1.5:61830
distributed.worker - INFO - bokeh at: 192.168.1.5:61832
distributed.worker - INFO - Waiting to connect to: tls://192.168.1.5:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 8
distributed.worker - INFO - Memory: 17.18 GB
distributed.worker - INFO - Local Directory: /Users/jcrist/Code/dask-gateway/worker-n6oomr1q
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tls://192.168.1.5:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tls://192.168.1.5:61831
distributed.comm.tcp - WARNING - Listener on 'tls://192.168.1.5:61830': TLS handshake failed with remote 'tls://192.168.1.5:61841': [SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1056)
Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed')>
Traceback (most recent call last):
File "/Users/jcrist/Code/distributed/distributed/comm/tcp.py", line 181, in read
n_frames = yield stream.read_bytes(8)
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/Users/jcrist/Code/distributed/distributed/worker.py", line 863, in _close
yield r.terminate()
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/Users/jcrist/Code/distributed/distributed/core.py", line 657, in send_recv_from_rpc
result = yield send_recv(comm=comm, op=key, **kwargs)
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/Users/jcrist/Code/distributed/distributed/core.py", line 472, in send_recv
response = yield comm.read(deserializers=deserializers)
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/Users/jcrist/anaconda/envs/dask-gateway/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/Users/jcrist/Code/distributed/distributed/comm/tcp.py", line 201, in read
convert_stream_closed_error(self, e)
File "/Users/jcrist/Code/distributed/distributed/comm/tcp.py", line 130, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
When calling
retire_workerson a cluster withTLSsecurity, the worker is successfully removed from the cluster, but will fail in shutdown and endup in a hung state.Steps to reproduce:
1. Create Keypair:
2. Start scheduler
3. Start worker
4. Create client, and call retire_workers
5. Observer error from worker during shutdown: