Skip to content

Commit

Permalink
Merge pull request #1934 from EchterAgo/fix_dnspython2
Browse files Browse the repository at this point in the history
dnssec: fix compat with dnspython 1.16 and 2.0.0
  • Loading branch information
cculianu committed Aug 3, 2020
2 parents ebc7f40 + 049cf9a commit 9954742
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 107 deletions.
2 changes: 1 addition & 1 deletion contrib/requirements/requirements.txt
Expand Up @@ -3,7 +3,7 @@ ecdsa>=0.9
requests
qrcode
protobuf
dnspython
dnspython[DNSSEC]
jsonrpclib-pelix
PySocks>=1.6.6
qdarkstyle==2.6.8
Expand Down
223 changes: 118 additions & 105 deletions lib/dnssec.py
Expand Up @@ -31,8 +31,6 @@
# https://github.com/rthalley/dnspython/blob/master/tests/test_dnssec.py


# import traceback
# import sys
import time
import struct

Expand All @@ -58,121 +56,136 @@
import dns.rdtypes.IN.AAAA


# Pure-Python version of dns.dnssec._validate_rsig
import ecdsa
from . import rsakey
if not hasattr(dns, 'version'):
# Do some monkey patching needed for versions of dnspython < 2

# Pure-Python version of dns.dnssec._validate_rsig
import ecdsa
import hashlib
from . import rsakey

def python_validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
from dns.dnssec import ValidationFailure, ECDSAP256SHA256, ECDSAP384SHA384
from dns.dnssec import _find_candidate_keys, _make_hash, _is_ecdsa, _is_rsa, _to_rdata, _make_algorithm_id
def python_validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
from dns.dnssec import ValidationFailure, ECDSAP256SHA256, ECDSAP384SHA384
from dns.dnssec import _find_candidate_keys, _make_hash, _is_ecdsa, _is_rsa, _to_rdata, _make_algorithm_id # pylint: disable=no-name-in-module

if isinstance(origin, str):
origin = dns.name.from_text(origin, dns.name.root)
if isinstance(origin, str):
origin = dns.name.from_text(origin, dns.name.root)

for candidate_key in _find_candidate_keys(keys, rrsig):
if not candidate_key:
raise ValidationFailure('unknown key')
for candidate_key in _find_candidate_keys(keys, rrsig):
if not candidate_key:
raise ValidationFailure('unknown key')

# For convenience, allow the rrset to be specified as a (name, rdataset)
# tuple as well as a proper rrset
if isinstance(rrset, tuple):
rrname = rrset[0]
rdataset = rrset[1]
else:
rrname = rrset.name
rdataset = rrset

if now is None:
now = time.time()
if rrsig.expiration < now:
raise ValidationFailure('expired')
if rrsig.inception > now:
raise ValidationFailure('not yet valid')

hash = _make_hash(rrsig.algorithm)

if _is_rsa(rrsig.algorithm):
keyptr = candidate_key.key
(bytes,) = struct.unpack('!B', keyptr[0:1])
keyptr = keyptr[1:]
if bytes == 0:
(bytes,) = struct.unpack('!H', keyptr[0:2])
keyptr = keyptr[2:]
rsa_e = keyptr[0:bytes]
rsa_n = keyptr[bytes:]
n = ecdsa.util.string_to_number(rsa_n)
e = ecdsa.util.string_to_number(rsa_e)
pubkey = rsakey.RSAKey(n, e)
sig = rrsig.signature

elif _is_ecdsa(rrsig.algorithm):
if rrsig.algorithm == ECDSAP256SHA256:
curve = ecdsa.curves.NIST256p
key_len = 32
digest_len = 32
elif rrsig.algorithm == ECDSAP384SHA384:
curve = ecdsa.curves.NIST384p
key_len = 48
digest_len = 48
# For convenience, allow the rrset to be specified as a (name, rdataset)
# tuple as well as a proper rrset
if isinstance(rrset, tuple):
rrname = rrset[0]
rdataset = rrset[1]
else:
# shouldn't happen
raise ValidationFailure('unknown ECDSA curve')
keyptr = candidate_key.key
x = ecdsa.util.string_to_number(keyptr[0:key_len])
y = ecdsa.util.string_to_number(keyptr[key_len:key_len * 2])
assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y)
point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point, curve)
r = rrsig.signature[:key_len]
s = rrsig.signature[key_len:]
sig = ecdsa.ecdsa.Signature(ecdsa.util.string_to_number(r),
ecdsa.util.string_to_number(s))
rrname = rrset.name
rdataset = rrset

if now is None:
now = time.time()
if rrsig.expiration < now:
raise ValidationFailure('expired')
if rrsig.inception > now:
raise ValidationFailure('not yet valid')

hash = _make_hash(rrsig.algorithm)

if _is_rsa(rrsig.algorithm):
keyptr = candidate_key.key
(bytes,) = struct.unpack('!B', keyptr[0:1])
keyptr = keyptr[1:]
if bytes == 0:
(bytes,) = struct.unpack('!H', keyptr[0:2])
keyptr = keyptr[2:]
rsa_e = keyptr[0:bytes]
rsa_n = keyptr[bytes:]
n = ecdsa.util.string_to_number(rsa_n)
e = ecdsa.util.string_to_number(rsa_e)
pubkey = rsakey.RSAKey(n, e)
sig = rrsig.signature

elif _is_ecdsa(rrsig.algorithm):
if rrsig.algorithm == ECDSAP256SHA256:
curve = ecdsa.curves.NIST256p
key_len = 32
digest_len = 32
elif rrsig.algorithm == ECDSAP384SHA384:
curve = ecdsa.curves.NIST384p
key_len = 48
digest_len = 48
else:
# shouldn't happen
raise ValidationFailure('unknown ECDSA curve')
keyptr = candidate_key.key
x = ecdsa.util.string_to_number(keyptr[0:key_len])
y = ecdsa.util.string_to_number(keyptr[key_len:key_len * 2])
assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y)
point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point, curve)
r = rrsig.signature[:key_len]
s = rrsig.signature[key_len:]
sig = ecdsa.ecdsa.Signature(ecdsa.util.string_to_number(r),
ecdsa.util.string_to_number(s))

else:
raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)

hash.update(_to_rdata(rrsig, origin)[:18])
hash.update(rrsig.signer.to_digestable(origin))

if rrsig.labels < len(rrname) - 1:
suffix = rrname.split(rrsig.labels + 1)[1]
rrname = dns.name.from_text('*', suffix)
rrnamebuf = rrname.to_digestable(origin)
rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
rrsig.original_ttl)
rrlist = sorted(rdataset);
for rr in rrlist:
hash.update(rrnamebuf)
hash.update(rrfixed)
rrdata = rr.to_digestable(origin)
rrlen = struct.pack('!H', len(rrdata))
hash.update(rrlen)
hash.update(rrdata)

digest = hash.digest()

if _is_rsa(rrsig.algorithm):
digest = _make_algorithm_id(rrsig.algorithm) + digest
if pubkey.verify(bytearray(sig), bytearray(digest)):
return

elif _is_ecdsa(rrsig.algorithm):
diglong = ecdsa.util.string_to_number(digest)
if verifying_key.pubkey.verifies(diglong, sig):
return
else:
raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)

hash.update(_to_rdata(rrsig, origin)[:18])
hash.update(rrsig.signer.to_digestable(origin))

if rrsig.labels < len(rrname) - 1:
suffix = rrname.split(rrsig.labels + 1)[1]
rrname = dns.name.from_text('*', suffix)
rrnamebuf = rrname.to_digestable(origin)
rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
rrsig.original_ttl)
rrlist = sorted(rdataset);
for rr in rrlist:
hash.update(rrnamebuf)
hash.update(rrfixed)
rrdata = rr.to_digestable(origin)
rrlen = struct.pack('!H', len(rrdata))
hash.update(rrlen)
hash.update(rrdata)

digest = hash.digest()

if _is_rsa(rrsig.algorithm):
digest = _make_algorithm_id(rrsig.algorithm) + digest
if pubkey.verify(bytearray(sig), bytearray(digest)):
return

elif _is_ecdsa(rrsig.algorithm):
diglong = ecdsa.util.string_to_number(digest)
if verifying_key.pubkey.verifies(diglong, sig):
return

else:
raise ValidationFailure('unknown algorithm %s' % rrsig.algorithm)
else:
raise ValidationFailure('unknown algorithm %s' % rrsig.algorithm)

raise ValidationFailure('verify failure')

raise ValidationFailure('verify failure')

class PyCryptodomexHashAlike:
def __init__(self, hashlib_func):
self._hash = hashlib_func
def new(self):
return self._hash()

# replace validate_rrsig
dns.dnssec._validate_rrsig = python_validate_rrsig
dns.dnssec.validate_rrsig = python_validate_rrsig
dns.dnssec.validate = dns.dnssec._validate

# replace validate_rrsig
dns.dnssec._validate_rrsig = python_validate_rrsig
dns.dnssec.validate_rrsig = python_validate_rrsig
dns.dnssec.validate = dns.dnssec._validate
dns.dnssec._have_ecdsa = True
dns.dnssec.MD5 = PyCryptodomexHashAlike(hashlib.md5)
dns.dnssec.SHA1 = PyCryptodomexHashAlike(hashlib.sha1)
dns.dnssec.SHA256 = PyCryptodomexHashAlike(hashlib.sha256)
dns.dnssec.SHA384 = PyCryptodomexHashAlike(hashlib.sha384)
dns.dnssec.SHA512 = PyCryptodomexHashAlike(hashlib.sha512)


from .util import print_error
Expand Down
2 changes: 1 addition & 1 deletion lib/interface.py
Expand Up @@ -137,7 +137,7 @@ def get_ssl_context(cert_reqs, ca_certs):
def _get_socket_and_verify_ca_cert(self, *, suppress_errors) -> Tuple[Optional[ssl.SSLSocket], bool]:
''' Attempts to connect to the remote host, assuming it is using a CA
signed certificate. If the cert is valid then a tuple of: (wrapped
SSLSocket, True) is returned. Otherwise (None, bool) is returned on
SSLSocket, False) is returned. Otherwise (None, bool) is returned on
error. If the second item in the tuple is True, then the entire
operation should be aborted due to low-level error. '''
s = self.get_simple_socket()
Expand Down
30 changes: 30 additions & 0 deletions lib/tests/__init__.py
@@ -0,0 +1,30 @@
import unittest
import threading
import tempfile
import shutil


# some unit tests are modifying globals...
class SequentialTestCase(unittest.TestCase):

test_lock = threading.Lock()

def setUp(self):
super().setUp()
self.test_lock.acquire()

def tearDown(self):
super().tearDown()
self.test_lock.release()


class ElectronCashTestCase(SequentialTestCase):
"""Base class for our unit tests."""

def setUp(self):
super().setUpClass()
self.electrum_path = tempfile.mkdtemp()

def tearDown(self):
super().tearDownClass()
shutil.rmtree(self.electrum_path)
59 changes: 59 additions & 0 deletions lib/tests/test_dnssec.py
@@ -0,0 +1,59 @@
import dns

from dns.dnssec import ValidationFailure

from electroncash import dnssec

from . import ElectronCashTestCase


class TestDnsSec(ElectronCashTestCase):

def test_validate_rrsig_ecdsa(self):
rrset = dns.rrset.from_text("getmonero.org.", 3599, 1, 48,
"257 3 13 mdsswUyr3DPW132mOi8V9xESWE8jTo0d xCjjnopKl+GqJxpVXckHAeF+KkxLbxIL fDLUT0rAK9iUzy1L53eKGQ==",
"256 3 13 koPbw9wmYZ7ggcjnQ6ayHyhHaDNMYELK TqT+qRGrZpWSccr/lBcrm10Z1PuQHB3A zhii+sb0PYFkH1ruxLhe5g==")
rrsig = dns.rdtypes.ANY.RRSIG.RRSIG.from_text(1, 46,
dns.tokenizer.Tokenizer("DNSKEY 13 2 3600 20180612115508 20180413115508 2371 getmonero.org. SSjtP2jCtXPukps7E3kum709xq2TH6Lt Ur32UhE7WKwSUfLTZ4EAoD5g22mi1fpB GDGb30kCMndDVjnHAEBDWw=="))
keys = {dns.name.Name([b'getmonero', b'org', b'']): rrset}
origin = None
now = 1527185178.7842247

# 'None' means it is valid
self.assertEqual(None, dns.dnssec.validate_rrsig(rrset, rrsig, keys, origin, now))

def test_validate_rrsig_rsa(self):
rrset = dns.rrset.from_text("getmonero.org.", 12698, 1, 43,
"2371 13 2 3b7f818a879ecb9931dae983d4529afedeb53993759d8080735083f954d40bc8")
rrsig = dns.rdtypes.ANY.RRSIG.RRSIG.from_text(1, 46,
dns.tokenizer.Tokenizer("DS 7 2 86400 20180609010045 20180519000045 1862 org. SgdGsY4BAm7c3qpwzVLy3ua4orvrsJQO 0rUQDDrrXR6lElnbF+AS0gEEfdZfDv11 65AuNil/+kT2Qh/ExgstvhWQ88XdDnHB ouvRMf9pg3p/q5Otet/StRzf33SMPgC1 zLzkfkSBCjJkwVmwde8saGnjdcW522ra Ge/6JcsryRw="))

rrset2 = dns.rrset.from_text("org.", 866, 1, 48,
"256 3 7 AwEAAXxsMmN/JgpEE9Y4uFNRJm7Q9GBw mEYUCsCxuKlgBU9WrQEFRrvAeMamUBeX 4SE8s3V/TEk/TgGmPPp0pMkKD7mseluK 6Ard2HZ6O3nPAzL4i8py/UDRUmYNSCxw fdfjUWRmcB9H+NKWMsJoDhAkLFqg5HS7 f0j4Vb99Wac24Fk7",
"256 3 7 AwEAAcLdAPt3vn/ND00zZlyTx7OBko+9 YeCrSl2eGuEXjef0Lqf0tKGikoHwnmTH tT8J/aGqkZImLMVByJbknE0wKDnbvbKD oTQxPwUQZLH6k3sTdsPKESKDSBSc6VFM q35gx6CeuRYZ9KkGWiUsKqJhXPo6tyJF CBxfaNQQyrzBnv4/",
"257 3 7 AwEAAZTjbIO5kIpxWUtyXc8avsKyHIIZ +LjC2Dv8naO+Tz6X2fqzDC1bdq7HlZwt kaqTkMVVJ+8gE9FIreGJ4c8G1GdbjQgb P1OyYIG7OHTc4hv5T2NlyWr6k6QFz98Q 4zwFIGTFVvwBhmrMDYsOTtXakK6QwHov A1+83BsUACxlidpwB0hQacbD6x+I2RCD zYuTzj64Jv0/9XsX6AYV3ebcgn4hL1jI R2eJYyXlrAoWxdzxcW//5yeL5RVWuhRx ejmnSVnCuxkfS4AQ485KH2tpdbWcCopL JZs6tw8q3jWcpTGzdh/v3xdYfNpQNcPI mFlxAun3BtORPA2r8ti6MNoJEHU=",
"257 3 7 AwEAAcMnWBKLuvG/LwnPVykcmpvnntwx fshHlHRhlY0F3oz8AMcuF8gw9McCw+Bo C2YxWaiTpNPuxjSNhUlBtcJmcdkz3/r7 PIn0oDf14ept1Y9pdPh8SbIBIWx50ZPf VRlj8oQXv2Y6yKiQik7bi3MT37zMRU2k w2oy3cgrsGAzGN4s/C6SFYon5N1Q2O4h GDbeOq538kATOy0GFELjuauV9guX/431 msYu4Rgb5lLuQ3Mx5FSIxXpI/RaAn2mh M4nEZ/5IeRPKZVGydcuLBS8GZlxW4qbb 8MgRZ8bwMg0pqWRHmhirGmJIt3UuzvN1 pSFBfX7ysI9PPhSnwXCNDXk0kk0=")
keys = {dns.name.Name([b'org', b'']): rrset2}
origin = None
now = 1527191953.6527798

# 'None' means it is valid
self.assertEqual(None, dns.dnssec.validate_rrsig(rrset, rrsig, keys, origin, now))

def test_validate_rrsig_fail(self):
rrset = dns.rrset.from_text("dnssec-failed.org.", 86400, 1, 43,
"2371 13 2 3b7f818a879ecb9931dae983d4529afedeb53993759d8080735083f954d40bc8",
"106 5 1 4F219DCE274F820EA81EA1150638DABE21EB27FC")
rrsig = dns.rdtypes.ANY.RRSIG.RRSIG.from_text(1, 46,
dns.tokenizer.Tokenizer("DS 7 2 86400 20200822152404 20200801142404 21869 org. TdOzuXZkr5dlHiien4T8LSBXszhMkBJoM6E7NrljV/k2xi5d8/AbKTcJ Vsj0Jo2ss/Tak1O4SM8qlOhgHSW2wb4IwYCZTMvuu0bC2b4wWeXkOoQT 5as8ImTEA2SrOm/uoA/v6dWzbiBzOFkkhJk//jAT+SNCRmgF1envhq78 7DQ="))

rrset2 = dns.rrset.from_text("org.", 866, 1, 48,
"256 3 7 AwEAAZwBxCB7AIhIWiqjusg2lfHSi8orabyy5BM/UtidQEZKIvU5Mrh7 7eV4C3WyTOwd2AwoGYAUgPjzAC5lFFnCg0LsQpsV7sYy5k+bZBlpxF1o 9KuBOe+iUQt2YM4TjTD38mW1aN8OFf8mkMxkRzo3dfskzsT881CdJRiD Cg18hJJt",
"256 3 7 AwEAAdZenjsGF9Xmh+hjv1FV0w8rRC6SHKeMNuk53BRsqruVK2xCbLGm gtue1yMElMs5+4B5A+uZY8pj4c5fHgC06h3gd0XoIF+KvWhk5WDqohrv 0nUADQjBBAGRaaO4FDTuu8i19sRg3p3h1LoAgZi+Gcls+JxOdnohVUkp 0by82buT",
"257 3 7 AwEAAcMnWBKLuvG/LwnPVykcmpvnntwxfshHlHRhlY0F3oz8AMcuF8gw 9McCw+BoC2YxWaiTpNPuxjSNhUlBtcJmcdkz3/r7PIn0oDf14ept1Y9p dPh8SbIBIWx50ZPfVRlj8oQXv2Y6yKiQik7bi3MT37zMRU2kw2oy3cgr sGAzGN4s/C6SFYon5N1Q2O4hGDbeOq538kATOy0GFELjuauV9guX/431 msYu4Rgb5lLuQ3Mx5FSIxXpI/RaAn2mhM4nEZ/5IeRPKZVGydcuLBS8G ZlxW4qbb8MgRZ8bwMg0pqWRHmhirGmJIt3UuzvN1pSFBfX7ysI9PPhSn wXCNDXk0kk0=")
keys = {dns.name.Name([b'org', b'']): rrset2}
origin = None
now = 1596432184.621979

with self.assertRaises(ValidationFailure):
dns.dnssec.validate_rrsig(rrset, rrsig, keys, origin, now)

0 comments on commit 9954742

Please sign in to comment.