Skip to content

Commit

Permalink
Remove password, add RSA
Browse files Browse the repository at this point in the history
  • Loading branch information
christopherhesse committed Nov 3, 2010
1 parent e1f7d4b commit f830d15
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 79 deletions.
14 changes: 13 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Installation
3. In urls.py add::

(r'^r/', include('urlcrypt.urls')),
4. If you wish to use RSA encryption on your tokens, set ``URLCRYPT_USE_RSA_ENCRYPTION = True`` in your settings, generate a private key with ``ssh-keygen -t rsa -f <path to private key>`` and then set the path to the private key as URLCRYPT_PRIVATE_KEY_PATH. RSA encryption makes the token much longer but is more secure. The ``pycrypto`` library is required.

Usage
******
Expand Down Expand Up @@ -75,7 +77,17 @@ Settings
- default: ``60``
- The number of urlcrypt requests a unique visitor is allowed to make per minute.

- ``RUNNING_TESTS``
- ``URLCRYPT_USE_RSA_ENCRYPTION``

- default: ``False``
- Set ``URLCRYPT_USE_RSA_ENCRYPTION`` to True to enable RSA encryption of tokens.

- ``URLCRYPT_PRIVATE_KEY_PATH``

- default: ``/path/to/private_key``
- The path to the RSA private key file in PEM format, only used if URLCRYPT_USE_RSA_ENCRYPTION is True.

- ``RUNNING_TESTS``

- default: ``False``
- Set ``RUNNING_TESTS`` to True when running the urlcrypt tests.
Expand Down
19 changes: 14 additions & 5 deletions urlcrypt/conf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import hashlib
import os

from django.conf import settings

SECRET_KEY = settings.SECRET_KEY
# kind of ghetto, is there a better way to do this other than os.urandom?
OBFUSCATE_KEY = hashlib.sha512(SECRET_KEY).digest() + hashlib.sha512(SECRET_KEY[::-1]).digest()
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))

URLCRYPT_LOGIN_URL = getattr(settings, 'URLCRYPT_LOGIN_URL', settings.LOGIN_URL)
SECRET_KEY = getattr(settings, 'SECRET_KEY', 'sekrit')
RUNNING_TESTS = getattr(settings, 'RUNNING_TESTS', False)

# Changing this setting, or SECRET_KEY, invalidates existing tokens, the pycrypto library is required if enabled
URLCRYPT_USE_RSA_ENCRYPTION = getattr(settings, 'URLCRYPT_USE_RSA_ENCRYPTION', False)
# if URLCRYPT_USE_RSA_ENCRYPTION is True, the path to an RSA private key file must be set here

if RUNNING_TESTS:
URLCRYPT_PRIVATE_KEY_PATH = os.path.join(SCRIPT_DIR, "test", "test_private_key")
else:
URLCRYPT_PRIVATE_KEY_PATH = getattr(settings, 'URLCRYPT_PRIVATE_KEY_PATH', '/path/to/private_key')

URLCRYPT_LOGIN_URL = getattr(settings, 'URLCRYPT_LOGIN_URL', settings.LOGIN_URL)
URLCRYPT_RATE_LIMIT = getattr(settings, 'URLCRYPT_RATE_LIMIT', 60)
99 changes: 51 additions & 48 deletions urlcrypt/lib.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
# A library for safely encoding and obfuscating data in urls

import base64
import hashlib
import hmac
import itertools
import time

try:
from hashlib import sha1 as sha_hmac
except ImportError:
import sha as sha_hmac

try:
from urlcrypt.conf import OBFUSCATE_KEY, SECRET_KEY
except ImportError:
SECRET_KEY = 'sekrit'
OBFUSCATE_KEY = 'supersekrit'
from django.contrib.auth.models import User

def obfuscate(text):
# copy out our OBFUSCATE_KEY to the length of the text
key = OBFUSCATE_KEY * (len(text)//len(OBFUSCATE_KEY) + 1)

# XOR each character from our input with the corresponding character
# from the key
xor_gen = (chr(ord(t) ^ ord(k)) for t, k in zip(text, key))
return ''.join(xor_gen)
from urlcrypt.conf import SECRET_KEY, URLCRYPT_USE_RSA_ENCRYPTION

deobfuscate = obfuscate
if URLCRYPT_USE_RSA_ENCRYPTION:
import urlcrypt.rsa

# generate a key for obfuscation
# kind of ghetto, is there a better way to do this other than os.urandom?
OBFUSCATE_KEY = hashlib.sha512(SECRET_KEY).digest() + hashlib.sha512(SECRET_KEY[::-1]).digest()

def base64url_encode(text):
padded_b64 = base64.urlsafe_b64encode(text)
Expand All @@ -47,40 +39,51 @@ def pack(*strings):
def unpack(packed_string):
return packed_string.split('|')

def generate_login_token(user, url):
return encode_token(*map(str, (user.id, user.password, url.strip(), int(time.time()))))
def obfuscate(text):
# copy out our OBFUSCATE_KEY to the length of the text
key = OBFUSCATE_KEY * (len(text)//len(OBFUSCATE_KEY) + 1)

def decode_login_token(token):
data = decode_token(str(token), ('user_id', 'password', 'url', 'timestamp'))
data['user_id'] = int(data['user_id'])
return data
# XOR each character from our input with the corresponding character
# from the key
xor_gen = (chr(ord(t) ^ ord(k)) for t, k in zip(text, key))
return ''.join(xor_gen)

def encode_token(*strings):
token = ''.join(itertools.chain(strings, (SECRET_KEY,)))
token_hash = hmac.new(SECRET_KEY, token, sha_hmac).hexdigest()
packed_string = pack(token_hash, *strings)
obfuscated_string = obfuscate(packed_string)
return base64url_encode(obfuscated_string)

def decode_token(token, keys):
# if you add more fields, you need to use .get() so that old tokens
# don't cause a KeyError
obfuscated_string = base64url_decode(token)
packed_string = deobfuscate(obfuscated_string)
deobfuscate = obfuscate

def encode_token(strings, secret_key_f):
secret_key = secret_key_f(*strings)
signature = hmac.new(secret_key, pack(*strings), sha_hmac).hexdigest()
packed_string = pack(signature, *strings)
return obfuscate(packed_string)

def decode_token(token, keys, secret_key_f):
packed_string = deobfuscate(token)
strings = unpack(packed_string)[1:]
assert token == encode_token(*strings)
assert token == encode_token(strings, secret_key_f)
return dict(zip(keys, strings))

if __name__ == '__main__':
message = {
'url': u'/users/following',
'user_id': '12345'
}
def secret_key_f(user_id, *args):
# generate a secret key given the user id
user = User.objects.get(id=int(user_id))
return user.password + SECRET_KEY

def generate_login_token(user, url):
strings = [str(user.id), url.strip(), str(int(time.time()))]
token_byte_string = encode_token(strings, secret_key_f)

if URLCRYPT_USE_RSA_ENCRYPTION:
token_byte_string = urlcrypt.rsa.encrypt(token_byte_string)

return base64url_encode(token_byte_string)

def decode_login_token(token):
token_byte_string = base64url_decode(str(token))

token = encode_token(message['user_id'], message['url'])
decoded_message = decode_token(token,('user_id', 'url', 'timestamp'))
print 'token: {0}'.format(token)
print 'token length: {0}'.format(len(token))
print 'decoded: {0}'.format(decoded_message)
for key, val in message.iteritems():
assert val == decoded_message[key]
if URLCRYPT_USE_RSA_ENCRYPTION:
token_byte_string = urlcrypt.rsa.decrypt(token_byte_string)

keys = ('user_id', 'url', 'timestamp')
data = decode_token(token_byte_string, keys, secret_key_f)
data['user_id'] = int(data['user_id'])
data['timestamp'] = int(data['timestamp'])
return data
161 changes: 161 additions & 0 deletions urlcrypt/oaep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# from https://bugs.launchpad.net/pycrypto/+bug/328027

from math import ceil
from hashlib import sha1
from Crypto.Util.strxor import strxor
from Crypto.Util.number import long_to_bytes


def make_mgf1(hash):
"""Make an MFG1 function using the given hash function.
Given a hash function implementing the standard hash function interface,
this function returns a Mask Generation Function using that hash.
"""
def mgf1(mgfSeed,maskLen):
"""Mask Generation Function based on a hash function.
Given a seed byte string 'mgfSeed', this function will generate
and return a mask byte string of length 'maskLen' in a manner
approximating a Random Oracle.
The algorithm is from PKCS#1 version 2.1, appendix B.2.1.
"""
hLen = hash().digest_size
if maskLen > 2**32 * hLen:
raise ValueError("mask too long")
T = ""
for counter in range(int(ceil(maskLen / (hLen*1.0)))):
C = long_to_bytes(counter)
C = ('\x00'*(4 - len(C))) + C
assert len(C) == 4, "counter was too big"
T += hash(mgfSeed + C).digest()
assert len(T) >= maskLen, "generated mask was too short"
return T[:maskLen]
return mgf1


MGF1_SHA1 = make_mgf1(sha1)


class OAEP(object):
"""Class implementing OAEP encoding/decoding.
This class can be used to encode/decode byte strings using the
Optimal Asymmetic Encryption Padding Scheme. It requires a source
of random bytes, a hash function and a mask generation function.
By default SHA-1 is used as the hash function, and MGF1-SHA1 is used
as the mask generation function.
The method 'encode' will encode a byte string using this padding
scheme, and the complimenary method 'decode' will decode it.
The algorithms are from PKCS#1 version 2.1, section 7.1
"""

def __init__(self,randbytes,hash=sha1,mgf=MGF1_SHA1):
self.randbytes = randbytes
self.hash = hash
self.mgf = mgf

def encode(self,k,M,L=""):
"""Encode a message using OAEP.
This method encodes a byte string 'M' using Optimal Asymmetric
Encryption Padding. The argument 'k' must be the size of the
private key modulus in bytes. If specified, 'L' is a label
for the encoding.
"""
# Calculate label hash, unless it is too long
if L:
limit = getattr(self.hash,"input_limit",None)
if limit and len(L) > limit:
raise ValueError("label too long")
lHash = self.hash(L).digest()
# Check length of message against size of key modulus
mLen = len(M)
hLen = len(lHash)
if mLen > k - 2*hLen - 2:
raise ValueError("message too long")
# Perform the encoding
PS = "\x00" * (k - mLen - 2*hLen - 2)
DB = lHash + PS + "\x01" + M
assert len(DB) == k - hLen - 1, "DB length is incorrect"
seed = self.randbytes(hLen)
dbMask = self.mgf(seed,k - hLen - 1)
maskedDB = strxor(DB,dbMask)
seedMask = self.mgf(maskedDB,hLen)
maskedSeed = strxor(seed,seedMask)
return "\x00" + maskedSeed + maskedDB

def decode(self,k,EM,L=""):
"""Decode a message using OAEP.
This method decodes a byte string 'EM' using Optimal Asymmetric
Encryption Padding. The argument 'k' must be the size of the
private key modulus in bytes. If specified, 'L' is the label
used for the encoding.
"""
# Generate label hash, for sanity checking
lHash = self.hash(L).digest()
hLen = len(lHash)
# Split the encoded message
Y = EM[0]
maskedSeed = EM[1:hLen+1]
maskedDB = EM[hLen+1:]
# Perform the decoding
seedMask = self.mgf(maskedDB,hLen)
seed = strxor(maskedSeed,seedMask)
dbMask = self.mgf(seed,k - hLen - 1)
DB = strxor(maskedDB,dbMask)
# Split the DB string
lHash1 = DB[:hLen]
x01pos = hLen
while x01pos < len(DB) and DB[x01pos] != "\x01":
x01pos += 1
PS = DB[hLen:x01pos]
M = DB[x01pos+1:]
# All sanity-checking done at end, to avoid timing attacks
valid = True
if x01pos == len(DB): # No \x01 byte
valid = False
if lHash1 != lHash: # Mismatched label hash
valid = False
if Y != "\x00": # Invalid leading byte
valid = False
if not valid:
raise ValueError("decryption error")
return M


def test_oaep():
"""Run through the OAEP encode/decode for lots of random values."""
from os import urandom
p = OAEP(urandom)
for k in xrange(45,300):
for i in xrange(0,1000):
b = i % (k - 2*20 - 3) # message length
if b == 0:
j = -1
else:
j = i % b # byte to corrupt
print "test %s:%s (%s bytes, corrupt at %s)" % (k,i,b,j)
msg = urandom(b)
pmsg = p.encode(k,msg)
# Test that padding actually does something
assert msg != pmsg, "padded message was just the message"
# Test that padding is removed correctly
assert p.decode(k,pmsg) == msg, "message was not decoded properly"
# Test that corrupted padding gives an error
try:
if b == 0: raise ValueError
newb = urandom(1)
while newb == pmsg[j]:
newb = urandom(1)
pmsg2 = pmsg[:j] + newb + pmsg[j+1:]
p.decode(k,pmsg2)
except ValueError:
pass
else:
raise AssertionError("corrupted padding was still decoded")

40 changes: 40 additions & 0 deletions urlcrypt/rsa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os

from urlcrypt.conf import URLCRYPT_PRIVATE_KEY_PATH
from urlcrypt.oaep import OAEP

# load the private key from the specified file
from Crypto.PublicKey import RSA

with open(URLCRYPT_PRIVATE_KEY_PATH) as f:
pem_private_key = f.read()

PRIVATE_KEY = RSA.importKey(pem_private_key)
KEY_LENGTH_BYTES = int((PRIVATE_KEY.size() + 1) / 8)
PADDER = OAEP(os.urandom)
BLOCK_BYTES = KEY_LENGTH_BYTES - 2 * 20 - 2 # from oaep.py

def split_string(s, block_size):
blocks = []
start = 0
while start < len(s):
block = s[start:start+block_size]
blocks.append(block)
start += block_size
return blocks

def encrypt(s):
encrypted_blocks = []
for block in split_string(s, BLOCK_BYTES):
padded_block = PADDER.encode(KEY_LENGTH_BYTES, block) # will raise ValueError if token is too long
encrypted_block = PRIVATE_KEY.encrypt(padded_block, None)[0]
encrypted_blocks.append(encrypted_block)
return ''.join(encrypted_blocks)

def decrypt(s):
decrypted_blocks = []
for block in split_string(s, KEY_LENGTH_BYTES):
padded_block = '\x00' + PRIVATE_KEY.decrypt(block) # NUL byte is apparently dropped by decryption
decrypted_block = PADDER.decode(KEY_LENGTH_BYTES, padded_block) # will raise ValueError on corrupt token
decrypted_blocks.append(decrypted_block)
return ''.join(decrypted_blocks)
Loading

0 comments on commit f830d15

Please sign in to comment.