Skip to content
This repository has been archived by the owner on Jan 27, 2022. It is now read-only.

Bytearray support #81

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions lib/Crypto/Cipher/blockalgo.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def _ignite(self, data):
if self._mac:
raise TypeError("_ignite() cannot be called twice")

self._buffer.insert(0, data)
self._buffer.insert(0, bstr(data))
self._buffer_len += len(data)
self._mac = self._factory.new(self._key, MODE_CBC, bchr(0) * 16)
self.update(b(""))
Expand Down Expand Up @@ -429,10 +429,10 @@ def _start_gcm(self, factory, key, *args, **kwargs):

# Step 2 - Compute J0 (integer, not byte string!)
if len(self.nonce) == 12:
self._j0 = bytes_to_long(self.nonce + b("\x00\x00\x00\x01"))
self._j0 = bytes_to_long(bstr(self.nonce) + b("\x00\x00\x00\x01"))
else:
fill = (16 - (len(self.nonce) % 16)) % 16 + 8
ghash_in = (self.nonce +
ghash_in = (bstr(self.nonce) +
bchr(0) * fill +
long_to_bytes(8 * len(self.nonce), 8))

Expand Down Expand Up @@ -599,7 +599,7 @@ def _start_ccm(self, assoc_len=None, msg_len=None):
8 * divmod(self._mac_len - 2, 2)[0] +
(q - 1)
)
b_0 = bchr(flags) + self.nonce + long_to_bytes(self._msg_len, q)
b_0 = bchr(flags) + bstr(self.nonce) + long_to_bytes(self._msg_len, q)

# Start CBC MAC with zero IV
assoc_len_encoded = b('')
Expand All @@ -616,7 +616,7 @@ def _start_ccm(self, assoc_len=None, msg_len=None):
self._cipherMAC._ignite(b_0 + assoc_len_encoded)

# Start CTR cipher
prefix = bchr(q - 1) + self.nonce
prefix = bchr(q - 1) + bstr(self.nonce)
ctr = Counter.new(128 - len(prefix) * 8, prefix, initial_value=0)
self._cipher = self._factory.new(self._key, MODE_CTR, counter=ctr)
# Will XOR against CBC MAC
Expand Down
2 changes: 1 addition & 1 deletion lib/Crypto/Hash/CMAC.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def update(self, data):
self._total_len += len(data)
return

self._buffer.append(data)
self._buffer.append(bstr(data))
self._buffer_len += len(data)
self._total_len += len(data)

Expand Down
10 changes: 9 additions & 1 deletion lib/Crypto/Hash/MD5.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *


PY26 = sys.version_info[:2] == (2, 6)


def __make_constructor():
try:
# The md5 module is deprecated in Python 2.6, so use hashlib when possible.
Expand All @@ -53,7 +57,7 @@ def __make_constructor():
from md5 import new as _hash_new

h = _hash_new()
if hasattr(h, 'new') and hasattr(h, 'name') and hasattr(h, 'digest_size') and hasattr(h, 'block_size'):
if not PY26 and hasattr(h, 'new') and hasattr(h, 'name') and hasattr(h, 'digest_size') and hasattr(h, 'block_size'):
# The module from stdlib has the API that we need. Just use it.
return _hash_new
else:
Expand All @@ -67,10 +71,14 @@ def __init__(self, *args):
if args and args[0] is _copy_sentinel:
self._h = args[1]
else:
if PY26:
args = map(bytes, args)
self._h = _hash_new(*args)
def copy(self):
return _MD5(_copy_sentinel, self._h.copy())
def update(self, *args):
if PY26:
args = map(bytes, args)
f = self.update = self._h.update
f(*args)
def digest(self):
Expand Down
8 changes: 8 additions & 0 deletions lib/Crypto/Hash/SHA1.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *


PY26 = sys.version_info[:2] == (2, 6)


def __make_constructor():
try:
# The sha module is deprecated in Python 2.6, so use hashlib when possible.
Expand All @@ -67,11 +71,15 @@ def __init__(self, *args):
if args and args[0] is _copy_sentinel:
self._h = args[1]
else:
if PY26:
args = map(bytes, args)
self._h = _hash_new(*args)
def copy(self):
return _SHA1(_copy_sentinel, self._h.copy())
def update(self, *args):
f = self.update = self._h.update
if PY26:
args = map(bytes, args)
f(*args)
def digest(self):
f = self.digest = self._h.digest
Expand Down
78 changes: 68 additions & 10 deletions lib/Crypto/SelfTest/Cipher/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def dict(**kwargs):
else:
dict = dict

PYGTE26 = sys.version_info[:2] >= (2, 6)

class _NoDefault: pass # sentinel object
def _extract(d, k, default=_NoDefault):
"""Get an item from a dictionary, and remove it from the dictionary."""
Expand Down Expand Up @@ -98,12 +100,22 @@ def __init__(self, module, params):
# Stream cipher
self.mode = None
self.iv = None
self.encrypted_iv = None

self.extra_params = params

def shortDescription(self):
return self.description

def _getCipherInitData(self):
key = a2b_hex(self.key)
iv = encrypted_iv = None
if self.encrypted_iv is not None:
encrypted_iv = a2b_hex(self.encrypted_iv)
if self.iv is not None:
iv = a2b_hex(self.iv)
return key, iv, encrypted_iv

def _new(self, do_decryption=0):
params = self.extra_params.copy()

Expand All @@ -118,31 +130,34 @@ def _new(self, do_decryption=0):
ctr_params['nbits'] = 8*(self.module.block_size - len(ctr_params.get('prefix', '')) - len(ctr_params.get('suffix', '')))
params['counter'] = ctr_class(**ctr_params)

key, iv, encrypted_iv = self._getCipherInitData()
if self.mode is None:
# Stream cipher
return self.module.new(a2b_hex(self.key), **params)
elif self.iv is None:
return self.module.new(key, **params)
elif iv is None:
# Block cipher without iv
return self.module.new(a2b_hex(self.key), self.mode, **params)
return self.module.new(key, self.mode, **params)
else:
# Block cipher with iv
if do_decryption and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, the IV to feed for decryption is the *encrypted* one
return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(self.encrypted_iv), **params)
return self.module.new(key, self.mode, encrypted_iv, **params)
else:
return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(self.iv), **params)
return self.module.new(key, self.mode, iv, **params)

def isMode(self, name):
if not hasattr(self.module, "MODE_"+name):
return False
return self.mode == getattr(self.module, "MODE_"+name)

def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
def _getInputDataForTest(self):
assoc_data = []
if self.assoc_data:
assoc_data = [ a2b_hex(b(x)) for x in self.assoc_data]
return a2b_hex(self.plaintext), a2b_hex(self.ciphertext), assoc_data

def runTest(self):
plaintext, ciphertext, assoc_data = self._getInputDataForTest()

ct = None
pt = None
Expand Down Expand Up @@ -187,6 +202,27 @@ def runTest(self):
self.assertEqual(self.mac, mac)
decipher.verify(a2b_hex(self.mac))

class CipherByteArrayTest(CipherSelfTest):

def _getCipherInitData(self):
key = bytearray(a2b_hex(self.key))
iv = encrypted_iv = None
if self.encrypted_iv is not None:
encrypted_iv = bytearray(a2b_hex(self.encrypted_iv))
if self.iv is not None:
iv = bytearray(a2b_hex(self.iv))
return key, iv, encrypted_iv

def _getInputDataForTest(self):
assoc_data = []
if self.assoc_data:
assoc_data = [ bytearray(a2b_hex(b(x))) for x in self.assoc_data]
return (
bytearray(a2b_hex(self.plaintext)),
bytearray(a2b_hex(self.ciphertext)),
assoc_data
)

class CipherStreamingSelfTest(CipherSelfTest):

def shortDescription(self):
Expand All @@ -196,8 +232,7 @@ def shortDescription(self):
return "%s should behave like a stream cipher" % (desc,)

def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
plaintext, ciphertext, _assoc_data = self._getInputDataForTest()

# The cipher should work like a stream cipher

Expand All @@ -218,6 +253,24 @@ def runTest(self):
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)


class CipherStreamingByteArrayTest(CipherStreamingSelfTest):

def _getCipherInitData(self):
key = bytearray(a2b_hex(self.key))
iv = encrypted_iv = None
if self.encrypted_iv is not None:
encrypted_iv = bytearray(a2b_hex(self.encrypted_iv))
if self.iv is not None:
iv = bytearray(a2b_hex(self.iv))
return key, iv, encrypted_iv

def _getInputDataForTest(self):
plaintext = bytearray(a2b_hex(self.plaintext))
ciphertext = bytearray(a2b_hex(self.ciphertext))
return plaintext, ciphertext, []


class CTRSegfaultTest(unittest.TestCase):

def __init__(self, module, params):
Expand Down Expand Up @@ -734,6 +787,8 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):

# Add the current test to the test suite
tests.append(CipherSelfTest(module, params))
if PYGTE26:
tests.append(CipherByteArrayTest(module, params))

# When using CTR mode, test that the interface behaves like a stream cipher
if p_mode in ('OFB', 'CTR'):
Expand Down Expand Up @@ -806,6 +861,9 @@ def make_stream_tests(module, module_name, test_data):
# Add the test to the test suite
tests.append(CipherSelfTest(module, params))
tests.append(CipherStreamingSelfTest(module, params))
if PYGTE26:
tests.append(CipherByteArrayTest(module, params))
tests.append(CipherStreamingByteArrayTest(module, params))
return tests

# vim:set ts=4 sw=4 sts=4 expandtab:
15 changes: 15 additions & 0 deletions lib/Crypto/SelfTest/Hash/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def dict(**kwargs):
else:
dict = dict

PYGTE26 = sys.version_info[:2] >= (2, 6)

from Crypto.Util.strxor import strxor_c

class HashDigestSizeSelfTest(unittest.TestCase):
Expand Down Expand Up @@ -160,6 +162,17 @@ def runTest(self):
self.assert_(isinstance(self.hashmod, obj5))
self.assert_(isinstance(self.hashmod, obj6))

class ByteArrayHashTest(unittest.TestCase):
def __init__(self, hashmod):
unittest.TestCase.__init__(self)
self.hashmod = hashmod

def runTest(self):
obj1 = self.hashmod.new(bytearray(b("foo")))
obj2 = Crypto.Hash.new(obj1, bytearray(b("foo")))
obj1.update(bytearray(b("bar")))
obj2.update(bytearray(b("bar")))

class MACSelfTest(unittest.TestCase):

def __init__(self, module, description, result, input, key, params):
Expand Down Expand Up @@ -243,6 +256,8 @@ def make_hash_tests(module, module_name, test_data, digest_size, oid=None):
if oid is not None:
tests.append(HashTestOID(module, oid))
tests.append(HashDocStringTest(module))
if PYGTE26:
tests.append(ByteArrayHashTest(module))
if getattr(module, 'name', None) is not None:
tests.append(GenericHashConstructorTest(module))
return tests
Expand Down
Loading