These tests seem to assume the availability of a certain number of ciphers, but that does not appear portable:
_____________________________ test_connection_args _____________________________
def test_connection_args():
def basic_checks(ctx):
assert ctx.verify_mode == ssl.CERT_REQUIRED
assert ctx.check_hostname is False
def many_ciphers(ctx):
if sys.version_info >= (3, 6):
assert len(ctx.get_ciphers()) > 2 # Most likely
c = {
'tls': {
'ca-file': ca_file,
'scheduler': {
'key': key1,
'cert': cert1,
},
'worker': {
'cert': keycert1,
},
},
}
with new_config(c):
sec = Security()
d = sec.get_connection_args('scheduler')
assert not d['require_encryption']
ctx = d['ssl_context']
basic_checks(ctx)
many_ciphers(ctx)
d = sec.get_connection_args('worker')
ctx = d['ssl_context']
basic_checks(ctx)
many_ciphers(ctx)
# No cert defined => no TLS
d = sec.get_connection_args('client')
assert d.get('ssl_context') is None
# With more settings
c['tls']['ciphers'] = FORCED_CIPHER
c['require-encryption'] = True
with new_config(c):
sec = Security()
d = sec.get_listen_args('scheduler')
assert d['require_encryption']
ctx = d['ssl_context']
basic_checks(ctx)
if sys.version_info >= (3, 6):
supported_ciphers = ctx.get_ciphers()
tls_12_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.2']
assert len(tls_12_ciphers) == 1
tls_13_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.3']
if len(tls_13_ciphers):
> assert len(tls_13_ciphers) == 3
E AssertionError: assert 4 == 3
E + where 4 = len([{'aead': True, 'alg_bits': 256, 'auth': 'auth-any', 'description': 'TLS_AES_256_GCM_SHA384 TLSv1.3 Kx=any Au=an...auth': 'auth-any', 'description': 'TLS_AES_128_CCM_SHA256 TLSv1.3 Kx=any Au=any Enc=AESCCM(128) Mac=AEAD', ...}])
distributed/tests/test_security.py:217: AssertionError
_______________________________ test_listen_args _______________________________
def test_listen_args():
def basic_checks(ctx):
assert ctx.verify_mode == ssl.CERT_REQUIRED
assert ctx.check_hostname is False
def many_ciphers(ctx):
if sys.version_info >= (3, 6):
assert len(ctx.get_ciphers()) > 2 # Most likely
c = {
'tls': {
'ca-file': ca_file,
'scheduler': {
'key': key1,
'cert': cert1,
},
'worker': {
'cert': keycert1,
},
},
}
with new_config(c):
sec = Security()
d = sec.get_listen_args('scheduler')
assert not d['require_encryption']
ctx = d['ssl_context']
basic_checks(ctx)
many_ciphers(ctx)
d = sec.get_listen_args('worker')
ctx = d['ssl_context']
basic_checks(ctx)
many_ciphers(ctx)
# No cert defined => no TLS
d = sec.get_listen_args('client')
assert d.get('ssl_context') is None
# With more settings
c['tls']['ciphers'] = FORCED_CIPHER
c['require-encryption'] = True
with new_config(c):
sec = Security()
d = sec.get_listen_args('scheduler')
assert d['require_encryption']
ctx = d['ssl_context']
basic_checks(ctx)
if sys.version_info >= (3, 6):
supported_ciphers = ctx.get_ciphers()
tls_12_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.2']
assert len(tls_12_ciphers) == 1
tls_13_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.3']
if len(tls_13_ciphers):
> assert len(tls_13_ciphers) == 3
E AssertionError: assert 4 == 3
E + where 4 = len([{'aead': True, 'alg_bits': 256, 'auth': 'auth-any', 'description': 'TLS_AES_256_GCM_SHA384 TLSv1.3 Kx=any Au=an...auth': 'auth-any', 'description': 'TLS_AES_128_CCM_SHA256 TLSv1.3 Kx=any Au=any Enc=AESCCM(128) Mac=AEAD', ...}])
distributed/tests/test_security.py:276: AssertionError
--------------------------- Captured stderr teardown ---------------------------
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:56680 remote=tcp://127.0.0.1:36091>
These tests seem to assume the availability of a certain number of ciphers, but that does not appear portable:
_____________________________ test_connection_args _____________________________ def test_connection_args(): def basic_checks(ctx): assert ctx.verify_mode == ssl.CERT_REQUIRED assert ctx.check_hostname is False def many_ciphers(ctx): if sys.version_info >= (3, 6): assert len(ctx.get_ciphers()) > 2 # Most likely c = { 'tls': { 'ca-file': ca_file, 'scheduler': { 'key': key1, 'cert': cert1, }, 'worker': { 'cert': keycert1, }, }, } with new_config(c): sec = Security() d = sec.get_connection_args('scheduler') assert not d['require_encryption'] ctx = d['ssl_context'] basic_checks(ctx) many_ciphers(ctx) d = sec.get_connection_args('worker') ctx = d['ssl_context'] basic_checks(ctx) many_ciphers(ctx) # No cert defined => no TLS d = sec.get_connection_args('client') assert d.get('ssl_context') is None # With more settings c['tls']['ciphers'] = FORCED_CIPHER c['require-encryption'] = True with new_config(c): sec = Security() d = sec.get_listen_args('scheduler') assert d['require_encryption'] ctx = d['ssl_context'] basic_checks(ctx) if sys.version_info >= (3, 6): supported_ciphers = ctx.get_ciphers() tls_12_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.2'] assert len(tls_12_ciphers) == 1 tls_13_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.3'] if len(tls_13_ciphers): > assert len(tls_13_ciphers) == 3 E AssertionError: assert 4 == 3 E + where 4 = len([{'aead': True, 'alg_bits': 256, 'auth': 'auth-any', 'description': 'TLS_AES_256_GCM_SHA384 TLSv1.3 Kx=any Au=an...auth': 'auth-any', 'description': 'TLS_AES_128_CCM_SHA256 TLSv1.3 Kx=any Au=any Enc=AESCCM(128) Mac=AEAD', ...}]) distributed/tests/test_security.py:217: AssertionError_______________________________ test_listen_args _______________________________ def test_listen_args(): def basic_checks(ctx): assert ctx.verify_mode == ssl.CERT_REQUIRED assert ctx.check_hostname is False def many_ciphers(ctx): if sys.version_info >= (3, 6): assert len(ctx.get_ciphers()) > 2 # Most likely c = { 'tls': { 'ca-file': ca_file, 'scheduler': { 'key': key1, 'cert': cert1, }, 'worker': { 'cert': keycert1, }, }, } with new_config(c): sec = Security() d = sec.get_listen_args('scheduler') assert not d['require_encryption'] ctx = d['ssl_context'] basic_checks(ctx) many_ciphers(ctx) d = sec.get_listen_args('worker') ctx = d['ssl_context'] basic_checks(ctx) many_ciphers(ctx) # No cert defined => no TLS d = sec.get_listen_args('client') assert d.get('ssl_context') is None # With more settings c['tls']['ciphers'] = FORCED_CIPHER c['require-encryption'] = True with new_config(c): sec = Security() d = sec.get_listen_args('scheduler') assert d['require_encryption'] ctx = d['ssl_context'] basic_checks(ctx) if sys.version_info >= (3, 6): supported_ciphers = ctx.get_ciphers() tls_12_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.2'] assert len(tls_12_ciphers) == 1 tls_13_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.3'] if len(tls_13_ciphers): > assert len(tls_13_ciphers) == 3 E AssertionError: assert 4 == 3 E + where 4 = len([{'aead': True, 'alg_bits': 256, 'auth': 'auth-any', 'description': 'TLS_AES_256_GCM_SHA384 TLSv1.3 Kx=any Au=an...auth': 'auth-any', 'description': 'TLS_AES_128_CCM_SHA256 TLSv1.3 Kx=any Au=any Enc=AESCCM(128) Mac=AEAD', ...}]) distributed/tests/test_security.py:276: AssertionError --------------------------- Captured stderr teardown --------------------------- distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:56680 remote=tcp://127.0.0.1:36091>