Permalink
Browse files

add support for Anonymous Diffie-Hellman ciphersuites

  • Loading branch information...
1 parent 19614b0 commit 91ce7ebde80dbb9aa9d7fccbd010c48d683e9b3d @d-mo committed Mar 21, 2012
View
@@ -86,6 +86,12 @@ def connect():
badFault = False
+ print "Test 0 - anonymous handshake"
+ connection = connect()
+ connection.handshakeClientAnonymous()
+ testConnClient(connection)
+ connection.close()
+
print "Test 1 - good X509"
connection = connect()
connection.handshakeClientCert()
@@ -368,7 +374,7 @@ def connect():
p.quit()
print "Test 29: POP3 good"
except socket.error, e:
- print "Non-critical error: socket error trying to reach internet server: ", e
+ print "Non-critical error: socket error trying to reach internet server: ", e
if not badFault:
print "Test succeeded"
@@ -405,6 +411,12 @@ def serverTestCmd(argv):
def connect():
return TLSConnection(lsock.accept()[0])
+ print "Test 0 - Anonymous server handshake"
+ connection = connect()
+ connection.handshakeServer(anon=True)
+ testConnServer(connection)
+ connection.close()
+
print "Test 1 - good X.509"
x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
x509Chain = X509CertChain([x509Cert])
View
@@ -121,6 +121,9 @@ class CipherSuite:
TLS_RSA_WITH_AES_256_CBC_SHA = 0x0035
TLS_RSA_WITH_RC4_128_SHA = 0x0005
+ TLS_DH_ANON_WITH_AES_128_CBC_SHA = 0x0034
+ TLS_DH_ANON_WITH_AES_256_CBC_SHA = 0x003A
+
srpSuites = []
srpSuites.append(TLS_SRP_SHA_WITH_3DES_EDE_CBC_SHA)
srpSuites.append(TLS_SRP_SHA_WITH_AES_128_CBC_SHA)
@@ -182,6 +185,20 @@ def getCertSuites(ciphers):
suites.append(CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA)
return suites
+ anonSuites = []
+ anonSuites.append(TLS_DH_ANON_WITH_AES_128_CBC_SHA)
+ anonSuites.append(TLS_DH_ANON_WITH_AES_256_CBC_SHA)
+
+ @staticmethod
+ def getAnonSuites(ciphers):
+ suites = []
+ for cipher in ciphers:
+ if cipher == "aes128":
+ suites.append(CipherSuite.TLS_DH_ANON_WITH_AES_128_CBC_SHA)
+ elif cipher == "aes256":
+ suites.append(CipherSuite.TLS_DH_ANON_WITH_AES_256_CBC_SHA)
+ return suites
+
tripleDESSuites = []
tripleDESSuites.append(TLS_SRP_SHA_WITH_3DES_EDE_CBC_SHA)
tripleDESSuites.append(TLS_SRP_SHA_RSA_WITH_3DES_EDE_CBC_SHA)
@@ -191,11 +208,13 @@ def getCertSuites(ciphers):
aes128Suites.append(TLS_SRP_SHA_WITH_AES_128_CBC_SHA)
aes128Suites.append(TLS_SRP_SHA_RSA_WITH_AES_128_CBC_SHA)
aes128Suites.append(TLS_RSA_WITH_AES_128_CBC_SHA)
+ aes128Suites.append(TLS_DH_ANON_WITH_AES_128_CBC_SHA)
aes256Suites = []
aes256Suites.append(TLS_SRP_SHA_WITH_AES_256_CBC_SHA)
aes256Suites.append(TLS_SRP_SHA_RSA_WITH_AES_256_CBC_SHA)
aes256Suites.append(TLS_RSA_WITH_AES_256_CBC_SHA)
+ aes256Suites.append(TLS_DH_ANON_WITH_AES_256_CBC_SHA)
rc4Suites = []
rc4Suites.append(TLS_RSA_WITH_RC4_128_SHA)
@@ -18,7 +18,8 @@ def __init__(self,
x509Fingerprint=None,
tackID=None,
hardTack=None,
- settings = None):
+ settings = None,
+ anon = False):
"""
For client authentication, use one of these argument
combinations:
@@ -79,6 +80,7 @@ def __init__(self,
self.certChain = None
self.privateKey = None
self.checker = None
+ self.anon = anon
#SRP Authentication
if username and password and not \
@@ -118,6 +120,8 @@ def _handshake(self, tlsConnection):
checker=self.checker,
settings=self.settings,
session=self.tlsSession)
+ elif self.anon:
+ tlsConnection.handshakeClientAnonymous()
else:
tlsConnection.handshakeClientCert(certChain=self.certChain,
privateKey=self.privateKey,
@@ -24,7 +24,8 @@ def __init__(self, host, port=None, strict=None,
tackID=None,
hardTack=None,
settings=None,
- ignoreAbruptClose=False):
+ ignoreAbruptClose=False,
+ anon=False):
"""Create a new HTTPTLSConnection.
For client authentication, use one of these argument
@@ -105,7 +106,8 @@ def __init__(self, host, port=None, strict=None,
x509Fingerprint,
tackID,
hardTack,
- settings)
+ settings,
+ anon)
def connect(self):
httplib.HTTPConnection.connect(self)
View
@@ -365,6 +365,11 @@ def __init__(self, cipherSuite):
self.srp_g = 0L
self.srp_s = createByteArraySequence([])
self.srp_B = 0L
+ self.dh_p = 0L
+ self.dh_g = 0L
+ self.dh_Ys = 0L
+ self.rsa_modulus = 0L
+ self.rsa_exponent = 0L
self.signature = createByteArraySequence([])
def createSRP(self, srp_N, srp_g, srp_s, srp_B):
@@ -373,26 +378,53 @@ def createSRP(self, srp_N, srp_g, srp_s, srp_B):
self.srp_s = srp_s
self.srp_B = srp_B
return self
+
+ def createDH(self, dh_p, dh_g, dh_Ys):
+ self.dh_p = dh_p
+ self.dh_g = dh_g
+ self.dh_Ys = dh_Ys
+ return self
def parse(self, p):
p.startLengthCheck(3)
- self.srp_N = bytesToNumber(p.getVarBytes(2))
- self.srp_g = bytesToNumber(p.getVarBytes(2))
- self.srp_s = p.getVarBytes(1)
- self.srp_B = bytesToNumber(p.getVarBytes(2))
- if self.cipherSuite in CipherSuite.srpCertSuites:
- self.signature = p.getVarBytes(2)
+ if self.cipherSuite in CipherSuite.srpSuites + \
+ CipherSuite.srpCertSuites:
+ self.srp_N = bytesToNumber(p.getVarBytes(2))
+ self.srp_g = bytesToNumber(p.getVarBytes(2))
+ self.srp_s = p.getVarBytes(1)
+ self.srp_B = bytesToNumber(p.getVarBytes(2))
+ if self.cipherSuite in CipherSuite.srpCertSuites:
+ self.signature = p.getVarBytes(2)
+ elif self.cipherSuite in CipherSuite.certSuites:
+ self.rsa_modulus = bytesToNumber(p.getVarBytes(2))
+ self.rsa_exponent = bytesToNumber(p.getVarBytes(2))
+ elif self.cipherSuite in CipherSuite.anonSuites:
+ self.dh_p = bytesToNumber(p.getVarBytes(2))
+ self.dh_g = bytesToNumber(p.getVarBytes(2))
+ self.dh_Ys = bytesToNumber(p.getVarBytes(2))
p.stopLengthCheck()
return self
def write(self):
w = Writer()
- w.addVarSeq(numberToBytes(self.srp_N), 1, 2)
- w.addVarSeq(numberToBytes(self.srp_g), 1, 2)
- w.addVarSeq(self.srp_s, 1, 1)
- w.addVarSeq(numberToBytes(self.srp_B), 1, 2)
- if self.cipherSuite in CipherSuite.srpCertSuites:
- w.addVarSeq(self.signature, 1, 2)
+ if self.cipherSuite in CipherSuite.srpAllSuites:
+ w.addVarSeq(numberToBytes(self.srp_N), 1, 2)
+ w.addVarSeq(numberToBytes(self.srp_g), 1, 2)
+ w.addVarSeq(self.srp_s, 1, 1)
+ w.addVarSeq(numberToBytes(self.srp_B), 1, 2)
+ if self.cipherSuite in CipherSuite.srpCertSuites:
+ w.addVarSeq(self.signature, 1, 2)
+ elif self.cipherSuite in CipherSuite.certSuites:
+ w.addVarSeq(numberToBytes(self.rsa_modulus), 1, 2)
+ w.addVarSeq(numberToBytes(self.rsa_exponent), 1, 2)
+ if self.cipherSuite in []: # TODO support for signed_params
+ w.addVarSeq(self.signature, 1, 2)
+ elif self.cipherSuite in CipherSuite.anonSuites:
+ w.addVarSeq(numberToBytes(self.dh_p), 1, 2)
+ w.addVarSeq(numberToBytes(self.dh_g), 1, 2)
+ w.addVarSeq(numberToBytes(self.dh_Ys), 1, 2)
+ if self.cipherSuite in []: # TODO support for signed_params
+ w.addVarSeq(self.signature, 1, 2)
return self.postWrite(w)
def hash(self, clientRandom, serverRandom):
@@ -436,7 +468,11 @@ def createSRP(self, srp_A):
def createRSA(self, encryptedPreMasterSecret):
self.encryptedPreMasterSecret = encryptedPreMasterSecret
return self
-
+
+ def createDH(self, dh_Yc):
+ self.dh_Yc = dh_Yc
+ return self
+
def parse(self, p):
p.startLengthCheck(3)
if self.cipherSuite in CipherSuite.srpAllSuites:
@@ -449,6 +485,8 @@ def parse(self, p):
p.getFixBytes(len(p.bytes)-p.index)
else:
raise AssertionError()
+ elif self.cipherSuite in CipherSuite.anonSuites:
+ self.dh_Yc = bytesToNumber(p.getVarBytes(2))
else:
raise AssertionError()
p.stopLengthCheck()
@@ -465,6 +503,8 @@ def write(self):
w.addFixSeq(self.encryptedPreMasterSecret, 1)
else:
raise AssertionError()
+ elif self.cipherSuite in CipherSuite.anonSuites:
+ w.addVarSeq(numberToBytes(self.dh_Yc), 1, 2)
else:
raise AssertionError()
return self.postWrite(w)
Oops, something went wrong.

0 comments on commit 91ce7eb

Please sign in to comment.