-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests to pass, add additional tests for field cryptor
- Loading branch information
Showing
25 changed files
with
748 additions
and
565 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,36 @@ | ||
import sys | ||
|
||
from django.apps import AppConfig | ||
from django_crypto_fields.classes.keys import Keys | ||
|
||
from django.core.management.color import color_style | ||
|
||
|
||
class DjangoCryptoFieldsError(Exception): | ||
pass | ||
|
||
|
||
class DjangoCryptoFieldsConfig(AppConfig): | ||
name = 'django_crypto_fields' | ||
verbose_name = "Data Encryption" | ||
encryption_keys = Keys() | ||
encryption_keys = None | ||
|
||
def __init__(self, app_label, model_name): | ||
"""Placed here instead of `ready()`. For models to load correctly that use | ||
field classes from this module the keys need to be loaded before models.""" | ||
super(DjangoCryptoFieldsConfig, self).__init__(app_label, model_name) | ||
from django_crypto_fields.keys import Keys | ||
keys = Keys() | ||
if not self.encryption_keys: | ||
style = color_style() | ||
sys.stdout.write('Loading {} ...\n'.format(self.verbose_name)) | ||
if not keys.key_files_exist(): | ||
sys.stdout.write(style.NOTICE('Warning: {} failed to load encryption keys.\n'.format( | ||
self.verbose_name))) | ||
sys.stdout.write('Confirm that settings.KEY_PATH points to the correct folder.\n') | ||
sys.stdout.write('Loading the wrong encryption keys can corrupt sensitive data.\n') | ||
sys.stdout.write('If this is your first time loading the project, ' | ||
'new keys will be generated\n') | ||
sys.stdout.write('and placed in the settings.KEY_PATH folder.\n') | ||
keys.create_keys(self.keys.key_path, self.keys.key_prefix) | ||
keys.load_keys() | ||
self.encryption_keys = keys |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import sys | ||
|
||
from Crypto import Random | ||
from Crypto.Cipher import AES as AES_CIPHER | ||
|
||
from django.apps import apps as django_apps | ||
from django.core.exceptions import AppRegistryNotReady | ||
|
||
from .constants import RSA, AES, PRIVATE, PUBLIC, ENCODING, style | ||
from .exceptions import EncryptionError | ||
|
||
|
||
class Cryptor(object): | ||
"""Base class for all classes providing RSA and AES encryption methods. | ||
The PEM file names and paths are in KEY_FILENAMES. KEYS is a copy of this except the | ||
filenames are replaced with the actual keys.""" | ||
|
||
def __init__(self, keys=None): | ||
try: | ||
# ignore "keys" parameter if Django is loaded | ||
self.keys = django_apps.get_app_config('django_crypto_fields').encryption_keys | ||
except AppRegistryNotReady: | ||
self.keys = keys | ||
|
||
def aes_encrypt(self, plaintext, mode): | ||
try: | ||
plaintext = plaintext.encode(ENCODING) | ||
except AttributeError: | ||
pass | ||
attr = '_'.join([AES, mode, PRIVATE, 'key']) | ||
aes_key = getattr(self.keys, attr) | ||
iv = Random.new().read(AES_CIPHER.block_size) | ||
cipher = AES_CIPHER.new(aes_key, AES_CIPHER.MODE_CFB, iv) | ||
return iv + cipher.encrypt(plaintext) | ||
|
||
def aes_decrypt(self, ciphertext, mode): | ||
attr = '_'.join([AES, mode, PRIVATE, 'key']) | ||
aes_key = getattr(self.keys, attr) | ||
iv = ciphertext[:AES_CIPHER.block_size] | ||
cipher = AES_CIPHER.new(aes_key, AES_CIPHER.MODE_CFB, iv) | ||
plaintext = cipher.decrypt(ciphertext)[AES_CIPHER.block_size:] | ||
return plaintext.decode(ENCODING) | ||
|
||
def rsa_encrypt(self, plaintext, mode): | ||
attr = '_'.join([RSA, mode, PUBLIC, 'key']) | ||
rsa_key = getattr(self.keys, attr) | ||
try: | ||
plaintext = plaintext.encode(ENCODING) | ||
except AttributeError: | ||
pass | ||
try: | ||
ciphertext = rsa_key.encrypt(plaintext) | ||
except (ValueError, TypeError) as e: | ||
raise EncryptionError('RSA encryption failed for value. Got \'{}\''.format(e)) | ||
return ciphertext | ||
|
||
def rsa_decrypt(self, ciphertext, mode): | ||
attr = '_'.join([RSA, mode, PRIVATE, 'key']) | ||
rsa_key = getattr(self.keys, attr) | ||
plaintext = rsa_key.decrypt(ciphertext) | ||
return plaintext.decode(ENCODING) | ||
|
||
def test_rsa(self): | ||
""" Tests keys roundtrip""" | ||
plaintext = 'erik is a pleeb! ERIK IS A PLEEB 0123456789!@#$%^&*()_-+={[}]|\"\':;>.<,?/~`±§' | ||
for mode in self.keys.key_filenames.get(RSA): | ||
try: | ||
ciphertext = self.rsa_encrypt(plaintext, mode) | ||
sys.stdout.write(style.SUCCESS('(*) Passed encrypt: {}\n'.format( | ||
self.keys.key_filenames[RSA][mode][PUBLIC]))) | ||
except (AttributeError, TypeError) as e: | ||
sys.stdout.write(style.ERROR('( ) Failed encrypt: {} public ({})\n'.format(mode, e))) | ||
try: | ||
assert plaintext == self.rsa_decrypt(ciphertext, mode) | ||
sys.stdout.write(style.SUCCESS('(*) Passed decrypt: {}\n'.format( | ||
self.keys.key_filenames[RSA][mode][PRIVATE]))) | ||
except (AttributeError, TypeError) as e: | ||
sys.stdout.write(style.ERROR('( ) Failed decrypt: {} private ({})\n'.format(mode, e))) | ||
|
||
def test_aes(self): | ||
""" Tests keys roundtrip""" | ||
plaintext = 'erik is a pleeb!\nERIK IS A PLEEB\n0123456789!@#$%^&*()_-+={[}]|\"\':;>.<,?/~`±§\n' | ||
for mode in self.keys.key_filenames[AES]: | ||
ciphertext = self.aes_encrypt(plaintext, mode) | ||
assert plaintext != ciphertext | ||
sys.stdout.write(style.SUCCESS('(*) Passed encrypt: {}\n'.format( | ||
self.keys.key_filenames[AES][mode][PRIVATE]))) | ||
assert plaintext == self.aes_decrypt(ciphertext, mode) | ||
sys.stdout.write(style.SUCCESS('(*) Passed decrypt: {}\n'.format( | ||
self.keys.key_filenames[AES][mode][PRIVATE]))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.