From f7b17a1d34677ea6effc6517fb9558e7732dddc4 Mon Sep 17 00:00:00 2001 From: Vjeran Grozdanic Date: Tue, 18 Nov 2025 12:01:57 +0100 Subject: [PATCH 1/5] feat(encrryption): Add Fernet Key Store --- src/sentry/conf/server.py | 17 +- src/sentry/conf/types/encrypted_field.py | 8 + src/sentry/db/models/fields/encryption.py | 95 +-- .../security/encrypted_field_key_store.py | 99 +++ .../db/models/fields/test_encryption.py | 587 +++++++++--------- .../test_encrypted_field_key_store.py | 376 +++++++++++ 6 files changed, 777 insertions(+), 405 deletions(-) create mode 100644 src/sentry/conf/types/encrypted_field.py create mode 100644 src/sentry/utils/security/encrypted_field_key_store.py create mode 100644 tests/sentry/utils/security/test_encrypted_field_key_store.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 64e2c283434618..8f0e356088b6e9 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -20,6 +20,7 @@ SENTRY_API_PAGINATION_ALLOWLIST_DO_NOT_MODIFY, ) from sentry.conf.types.bgtask import BgTaskConfig +from sentry.conf.types.encrypted_field import EncryptedFieldSettings from sentry.conf.types.kafka_definition import ConsumerDefinition from sentry.conf.types.logging_config import LoggingConfig from sentry.conf.types.region_config import RegionConfig @@ -794,18 +795,14 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: }, } -# Fernet keys for database encryption. -# First key in the dict is used as a primary key, and if -# encryption method options is "fernet", the first key will be -# used to decrypt the data. -# -# Other keys are used only for data decryption. This structure -# is used to allow easier key rotation when "fernet" is used -# as an encryption method. -DATABASE_ENCRYPTION_FERNET_KEYS = { - os.getenv("DATABASE_ENCRYPTION_KEY_ID_1"): os.getenv("DATABASE_ENCRYPTION_FERNET_KEY_1"), +# Settings for encrypted database fields. +DATABASE_ENCRYPTION_SETTINGS: EncryptedFieldSettings = { + "method": "plaintext", + "fernet_primary_key_id": os.getenv("DATABASE_ENCRYPTION_FERNET_PRIMARY_KEY_ID"), + "fernet_keys_location": os.getenv("DATABASE_ENCRYPTION_FERNET_KEYS_LOCATION"), } + ####################### # Taskworker settings # ####################### diff --git a/src/sentry/conf/types/encrypted_field.py b/src/sentry/conf/types/encrypted_field.py new file mode 100644 index 00000000000000..06af1354e71c34 --- /dev/null +++ b/src/sentry/conf/types/encrypted_field.py @@ -0,0 +1,8 @@ +from typing import Literal, TypedDict + + +class EncryptedFieldSettings(TypedDict): + method: Literal["fernet"] | Literal["plaintext"] + # fernet config + fernet_primary_key_id: str | None + fernet_keys_location: str | None diff --git a/src/sentry/db/models/fields/encryption.py b/src/sentry/db/models/fields/encryption.py index 83e94aa60a3330..15bd48aeb3ecda 100644 --- a/src/sentry/db/models/fields/encryption.py +++ b/src/sentry/db/models/fields/encryption.py @@ -6,11 +6,11 @@ from typing import Any, Literal, TypedDict import sentry_sdk -from cryptography.fernet import Fernet, InvalidToken +from cryptography.fernet import InvalidToken from django.conf import settings from django.db.models import CharField, Field -from sentry import options +from sentry.utils.security.encrypted_field_key_store import FernetKeyStore logger = logging.getLogger(__name__) @@ -102,15 +102,7 @@ def get_prep_value(self, value: Any) -> str | None: if value is None: return value - encryption_method = options.get("database.encryption.method") - - # Default to plaintext if method is not recognized - if encryption_method not in self._encryption_handlers: - logger.error( - "Unknown encryption method '%s', defaulting to plaintext", encryption_method - ) - encryption_method = "plaintext" - + encryption_method = settings.DATABASE_ENCRYPTION_SETTINGS["method"] handler = self._encryption_handlers[encryption_method] return handler["encrypt"](value) @@ -179,15 +171,8 @@ def _encrypt_fernet(self, value: Any) -> str: Always returns formatted string: enc:fernet:key_id:base64data The key_id is required to support key rotation. """ - key_id, key = self._get_fernet_key_for_encryption() - if not key: - raise ValueError( - "Fernet encryption key is required but not found. " - "Please set DATABASE_ENCRYPTION_FERNET_KEYS in your settings." - ) - try: - f = Fernet(key) + key_id, f = FernetKeyStore.get_primary_fernet() value_bytes = self._get_value_in_bytes(value) encrypted_data = f.encrypt(value_bytes) return self._format_encrypted_value(encrypted_data, MARKER_FERNET, key_id) @@ -215,14 +200,8 @@ def _decrypt_fernet(self, value: str) -> bytes: logger.warning("Failed to decode base64 data: %s", e) raise ValueError("Invalid base64 encoding") from e - # Get the decryption key - key = self._get_fernet_key(key_id) - if not key: - logger.warning("No decryption key found for key_id '%s'", key_id) - raise ValueError(f"Cannot decrypt without key for key_id '{key_id}'") - try: - f = Fernet(key) + f = FernetKeyStore.get_fernet_for_key_id(key_id) decrypted = f.decrypt(encrypted_data) return decrypted except InvalidToken: # noqa @@ -280,70 +259,6 @@ def _decrypt_with_fallback(self, value: str) -> bytes | str: logger.warning("No handler found for marker '%s'", marker) return value - def _get_fernet_key_for_encryption(self) -> tuple[str, bytes]: - """Get the first Fernet key for encryption along with its key_id.""" - keys = getattr(settings, "DATABASE_ENCRYPTION_FERNET_KEYS", None) - if keys is None: - raise ValueError("DATABASE_ENCRYPTION_FERNET_KEYS is not configured") - - if not isinstance(keys, dict): - logger.error("DATABASE_ENCRYPTION_FERNET_KEYS must be a dict, got %s", type(keys)) - raise ValueError("DATABASE_ENCRYPTION_FERNET_KEYS must be a dictionary") - - if not keys: - raise ValueError("DATABASE_ENCRYPTION_FERNET_KEYS is empty") - - # Use the first key for encryption - key_id = next(iter(keys.keys())) - key = keys[key_id] - - if not key_id or not key: - raise ValueError( - f"DATABASE_ENCRYPTION_FERNET_KEYS has invalid key_id or key ({key_id}, {key})" - ) - - if isinstance(key, str): - key = key.encode("utf-8") - - # Validate key - try: - Fernet(key) - return (key_id, key) - except Exception as e: - sentry_sdk.capture_exception(e) - logger.exception("Invalid Fernet key for key_id '%s'", key_id) - raise ValueError(f"Invalid Fernet key for key_id '{key_id}'") - - def _get_fernet_key(self, key_id: str) -> bytes | None: - """Get the Fernet key for the specified key_id from Django settings.""" - keys = getattr(settings, "DATABASE_ENCRYPTION_FERNET_KEYS", None) - if keys is None: - return None - - if not isinstance(keys, dict): - logger.error("DATABASE_ENCRYPTION_FERNET_KEYS must be a dict, got %s", type(keys)) - return None - - # Return specific key by key_id - if key_id not in keys: - logger.warning("Fernet key with id '%s' not found, cannot decrypt data", key_id) - return None - key = keys[key_id] - - if isinstance(key, str): - # If key is a string, encode it - key = key.encode("utf-8") - - # Validate key length (Fernet requires 32 bytes base64 encoded) - try: - Fernet(key) - return key - except Exception as e: - sentry_sdk.capture_exception(e) - logger.exception("Invalid Fernet key") - - return None - class EncryptedCharField(EncryptedField, CharField): def from_db_value(self, value: Any, expression: Any, connection: Any) -> Any: diff --git a/src/sentry/utils/security/encrypted_field_key_store.py b/src/sentry/utils/security/encrypted_field_key_store.py new file mode 100644 index 00000000000000..3a762c14aabac5 --- /dev/null +++ b/src/sentry/utils/security/encrypted_field_key_store.py @@ -0,0 +1,99 @@ +from logging import getLogger +from pathlib import Path + +import sentry_sdk +from cryptography.fernet import Fernet +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured + +logger = getLogger(__name__) + + +class FernetKeyStore: + _keys = {} + _is_loaded = False + + @classmethod + def _path_to_keys(cls) -> Path | None: + settings_path = settings.DATABASE_ENCRYPTION_SETTINGS.get("fernet_keys_location") + + return Path(settings_path) if settings_path is not None else None + + @classmethod + def load_keys(cls) -> None: + """ + Reads all files in the given directory. + Filename = Key ID + File Content = Fernet Key + """ + path = cls._path_to_keys() + + if path is None: + # No keys directory is configured, so we don't need to load any keys. + cls._keys = None + cls._is_loaded = True + return + + if not path.exists() or not path.is_dir(): + raise ImproperlyConfigured(f"Key directory not found: {path}") + + for file_path in path.iterdir(): + if file_path.is_file(): + # Skip hidden files + if file_path.name.startswith("."): + continue + + try: + with open(file_path) as f: + key_content = f.read().strip() + + if not key_content: + logger.warning("Empty key file found: %s", file_path.name) + continue + + # Store Fernet object in the dictionary + # Objects are stored instead of keys for performance optimization. + cls._keys[file_path.name] = Fernet(key_content.encode("utf-8")) + + except Exception as e: + logger.exception("Error reading key %s", file_path.name) + sentry_sdk.capture_exception(e) + + cls._is_loaded = True + logger.info("Successfully loaded %d Fernet encryption keys.", len(cls._keys)) + + @classmethod + def get_fernet_for_key_id(cls, key_id: str) -> Fernet: + """Retrieves a Fernet object for a specific key ID""" + if not cls._is_loaded: + # Fallback: if the keys are not already loaded, load them on first access. + logger.warning("Loading Fernet encryption keys on first access instead of on startup.") + cls.load_keys() + + if cls._keys is None: + # This can happen only if the keys settings are misconfigured + raise ValueError( + "Fernet encryption keys are not loaded. Please configure the keys directory." + ) + + fernet = cls._keys.get(key_id) + if not fernet: + raise ValueError(f"Encryption key with ID '{key_id}' not found.") + + return fernet + + @classmethod + def get_primary_fernet(cls) -> tuple[str, Fernet]: + """ + Reads the configuration and returns the primary key ID and the Fernet object + initialized with the primary Fernet key. + + The primary Fernet is the one that is used to encrypt the data, while + decryption can be done with any of the registered Fernet keys. + """ + + primary_key_id = settings.DATABASE_ENCRYPTION_SETTINGS.get("fernet_primary_key_id") + if primary_key_id is None: + raise ValueError("Fernet primary key ID is not configured.") + + return primary_key_id, cls.get_fernet_for_key_id(primary_key_id) diff --git a/tests/sentry/db/models/fields/test_encryption.py b/tests/sentry/db/models/fields/test_encryption.py index d96908f6b749b9..581725d0f901b8 100644 --- a/tests/sentry/db/models/fields/test_encryption.py +++ b/tests/sentry/db/models/fields/test_encryption.py @@ -11,7 +11,7 @@ EncryptedCharField, EncryptedField, ) -from sentry.testutils.helpers.options import override_options +from sentry.utils.security.encrypted_field_key_store import FernetKeyStore ENCRYPTION_METHODS = ("plaintext", "fernet") @@ -27,23 +27,49 @@ def fernet_instance(fernet_key): @pytest.fixture -def fernet_keys_value(fernet_key): - return {"key_id_1": fernet_key.decode()} +def fernet_keys_store(fernet_key): + """Single key for testing. Mocks the FernetKeyStore._keys attribute.""" + key_id = "key_id_1" + original_keys = FernetKeyStore._keys + original_is_loaded = FernetKeyStore._is_loaded + + # Mock the key store + FernetKeyStore._keys = {key_id: Fernet(fernet_key)} + FernetKeyStore._is_loaded = True + + yield key_id, fernet_key + + # Restore original state + FernetKeyStore._keys = original_keys + FernetKeyStore._is_loaded = original_is_loaded @pytest.fixture -def multi_fernet_keys_value(): - """Multiple keys for testing key rotation.""" +def multi_fernet_keys_store(): + """Multiple keys for testing key rotation. Mocks the FernetKeyStore._keys attribute.""" key1 = Fernet.generate_key() key2 = Fernet.generate_key() - return { - "key_primary": key1.decode(), - "key_secondary": key2.decode(), + keys_dict = { + "key_primary": Fernet(key1), + "key_secondary": Fernet(key2), } + original_keys = FernetKeyStore._keys + original_is_loaded = FernetKeyStore._is_loaded + + # Mock the key store + FernetKeyStore._keys = keys_dict + FernetKeyStore._is_loaded = True + + yield {"key_primary": key1, "key_secondary": key2} + + # Restore original state + FernetKeyStore._keys = original_keys + FernetKeyStore._is_loaded = original_is_loaded + def test_plaintext_encryption(): - with override_options({"database.encryption.method": "plaintext"}): + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): field = EncryptedField() # Test encryption (should return marker:base64 encoded string) @@ -59,116 +85,148 @@ def test_plaintext_encryption(): assert isinstance(decrypted, bytes) -@override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=None) def test_fernet_encryption_without_key(): """Test that Fernet encryption raises an error without key.""" - with override_options({"database.encryption.method": "fernet"}): - field = EncryptedField() - - with pytest.raises(ValueError, match="DATABASE_ENCRYPTION_FERNET_KEYS is not configured"): - field.get_prep_value("test value") + # Reset the key store to simulate no keys loaded + original_keys = FernetKeyStore._keys + original_is_loaded = FernetKeyStore._is_loaded + + FernetKeyStore._keys = None + FernetKeyStore._is_loaded = True + + try: + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": "test_key", + } + ): + field = EncryptedField() + with pytest.raises(ValueError, match="Fernet encryption keys are not loaded"): + field.get_prep_value("test value") + finally: + FernetKeyStore._keys = original_keys + FernetKeyStore._is_loaded = original_is_loaded -def test_fernet_encryption_with_key(multi_fernet_keys_value): +def test_fernet_encryption_with_key(multi_fernet_keys_store): """Test Fernet encryption with a valid key using new format.""" - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=multi_fernet_keys_value): - field = EncryptedField() + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": "key_primary", + } + ): + field = EncryptedField() - # Test encryption - encrypted = field.get_prep_value("test value") - assert isinstance(encrypted, str) - assert encrypted is not None + # Test encryption + encrypted = field.get_prep_value("test value") + assert isinstance(encrypted, str) + assert encrypted is not None - # Should have the new format: enc:fernet:key_id:data - parts = encrypted.split(":") - assert len(parts) == 4 - enc, method, key_id, encoded_data = parts + # Should have the new format: enc:fernet:key_id:data + parts = encrypted.split(":") + assert len(parts) == 4 + enc, method, key_id, encoded_data = parts - # Should start with enc:fernet marker - assert f"{enc}:{method}" == MARKER_FERNET + # Should start with enc:fernet marker + assert f"{enc}:{method}" == MARKER_FERNET - # Should use the first key (key_primary) - assert key_id == "key_primary" + # Should use the first key (key_primary) + assert key_id == "key_primary" - # Verify the rest is valid fernet encrypted data - fernet_data = base64.b64decode(encoded_data) - # Should be able to decrypt with the correct key - first_key = list(multi_fernet_keys_value.values())[0] - fernet_instance = Fernet(first_key.encode()) - decrypted_bytes = fernet_instance.decrypt(fernet_data) - assert decrypted_bytes == b"test value" + # Verify the rest is valid fernet encrypted data + fernet_data = base64.b64decode(encoded_data) + # Should be able to decrypt with the correct key + first_key = multi_fernet_keys_store["key_primary"] + fernet_instance = Fernet(first_key) + decrypted_bytes = fernet_instance.decrypt(fernet_data) + assert decrypted_bytes == b"test value" - # Test decryption through field - decrypted = field.to_python(encrypted) - assert decrypted == b"test value" + # Test decryption through field + decrypted = field.to_python(encrypted) + assert decrypted == b"test value" -def test_fernet_key_rotation(multi_fernet_keys_value): +def test_fernet_key_rotation(multi_fernet_keys_store): """Test that data encrypted with different keys can be decrypted.""" - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=multi_fernet_keys_value): - field = EncryptedField() + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": "key_primary", + } + ): + field = EncryptedField() - # Encrypt some data - encrypted_value = field.get_prep_value("test data") + # Encrypt some data + encrypted_value = field.get_prep_value("test data") - # Should be able to decrypt it - decrypted_value = field.to_python(encrypted_value) - assert decrypted_value == b"test data" + # Should be able to decrypt it + decrypted_value = field.to_python(encrypted_value) + assert decrypted_value == b"test data" - # Manually create encrypted data with the second key - second_key_id = list(multi_fernet_keys_value.keys())[1] - second_key = multi_fernet_keys_value[second_key_id] - fernet_instance = Fernet(second_key.encode()) - manual_encrypted = fernet_instance.encrypt(b"second key data") - manual_encoded = base64.b64encode(manual_encrypted).decode("ascii") - manual_formatted = f"{MARKER_FERNET}:{second_key_id}:{manual_encoded}" + # Manually create encrypted data with the second key + second_key_id = "key_secondary" + second_key = multi_fernet_keys_store[second_key_id] + fernet_instance = Fernet(second_key) + manual_encrypted = fernet_instance.encrypt(b"second key data") + manual_encoded = base64.b64encode(manual_encrypted).decode("ascii") + manual_formatted = f"{MARKER_FERNET}:{second_key_id}:{manual_encoded}" - # Should be able to decrypt data encrypted with the second key - decrypted_manual = field.to_python(manual_formatted) - assert decrypted_manual == b"second key data" + # Should be able to decrypt data encrypted with the second key + decrypted_manual = field.to_python(manual_formatted) + assert decrypted_manual == b"second key data" -def test_fernet_format_without_key_id_rejected(fernet_keys_value): +def test_fernet_format_without_key_id_rejected(fernet_keys_store): """Test that Fernet format without key_id is rejected.""" - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value): - field = EncryptedField() + key_id, fernet_key = fernet_keys_store - # Create invalid format without key_id - key = list(fernet_keys_value.values())[0] - fernet_instance = Fernet(key.encode()) - encrypted_data = fernet_instance.encrypt(b"test data") - encoded_data = base64.b64encode(encrypted_data).decode("ascii") - invalid_format = f"{MARKER_FERNET}:{encoded_data}" + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } + ): + field = EncryptedField() - # Should return the original value as it's invalid format - result = field.to_python(invalid_format) - assert result == invalid_format + # Create invalid format without key_id + fernet_instance = Fernet(fernet_key) + encrypted_data = fernet_instance.encrypt(b"test data") + encoded_data = base64.b64encode(encrypted_data).decode("ascii") + invalid_format = f"{MARKER_FERNET}:{encoded_data}" + # Should return the original value as it's invalid format + result = field.to_python(invalid_format) + assert result == invalid_format -def test_encryption_method_switching(fernet_keys_value): + +def test_encryption_method_switching(fernet_keys_store): """Test that values can be decrypted after switching encryption methods.""" + key_id, _fernet_key = fernet_keys_store + # encrypt with Fernet - with ( - override_options({"database.encryption.method": "fernet"}), - override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value), + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } ): field = EncryptedField() encrypted_fernet = field.get_prep_value("fernet encrypted") # encrypt with plain text - with override_options({"database.encryption.method": "plaintext"}): + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): field = EncryptedField() encrypted_plain = field.get_prep_value("plain text value") # assert that both can be decrypted independently of the encryption method for encryption_method in ENCRYPTION_METHODS: - with ( - override_options({"database.encryption.method": encryption_method}), - override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value), - ): + settings_dict = {"method": encryption_method} + if encryption_method == "fernet": + settings_dict["fernet_primary_key_id"] = key_id + + with override_settings(DATABASE_ENCRYPTION_SETTINGS=settings_dict): field = EncryptedField() # Should decrypt fernet value @@ -180,120 +238,15 @@ def test_encryption_method_switching(fernet_keys_value): assert decrypted_plain == b"plain text value" -def test_invalid_fernet_key(): - """Test handling of invalid Fernet keys.""" - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS={"key1": "invalid-key"}): - field = EncryptedField() - - # Should raise an error due to invalid key - with pytest.raises(ValueError, match="Invalid Fernet key for key_id 'key1'"): - field.get_prep_value("test value") - - -def test_fernet_key_dict_format(): - """Test that fernet key dictionary format works correctly.""" - key1 = Fernet.generate_key() - key2 = Fernet.generate_key() - keys_dict = { - "key1": key1.decode(), - "key2": key2.decode(), - } - - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=keys_dict): - field = EncryptedField() - - # Should use first key by default and include key_id - encrypted = field.get_prep_value("test value") - assert isinstance(encrypted, str) - - # Should have new format with key_id - parts = encrypted.split(":") - assert len(parts) == 4 - enc, method, key_id, _encoded_data = parts - assert f"{enc}:{method}" == MARKER_FERNET - assert key_id == "key1" # First key in dict - - # Should be able to decrypt - decrypted = field.to_python(encrypted) - assert decrypted == b"test value" - - -def test_fernet_key_dict_must_be_dict(): - """Test that DATABASE_ENCRYPTION_FERNET_KEYS must be a dictionary.""" - with override_options({"database.encryption.method": "fernet"}): - # Test with string instead of dict - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS="not-a-dict"): - field = EncryptedField() - - with pytest.raises( - ValueError, match="DATABASE_ENCRYPTION_FERNET_KEYS must be a dictionary" - ): - field.get_prep_value("test value") - - # Test with list instead of dict - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=["key1", "key2"]): - field = EncryptedField() - - with pytest.raises( - ValueError, match="DATABASE_ENCRYPTION_FERNET_KEYS must be a dictionary" - ): - field.get_prep_value("test value") - - -def test_fernet_empty_keys_dict(): - """Test handling of empty keys dictionary.""" - with ( - override_options({"database.encryption.method": "fernet"}), - override_settings(DATABASE_ENCRYPTION_FERNET_KEYS={}), - ): - field = EncryptedField() - - with pytest.raises(ValueError, match="DATABASE_ENCRYPTION_FERNET_KEYS is empty"): - field.get_prep_value("test value") - - -@pytest.mark.parametrize( - "key_id,key_value,expected_error_match", - [ - ( - "", - "valid_key", - r"DATABASE_ENCRYPTION_FERNET_KEYS has invalid key_id or key \(, valid_key\)", - ), - ( - None, - "valid_key", - r"DATABASE_ENCRYPTION_FERNET_KEYS has invalid key_id or key \(None, valid_key\)", - ), - ( - "valid_key_id", - "", - r"DATABASE_ENCRYPTION_FERNET_KEYS has invalid key_id or key \(valid_key_id, \)", - ), - ( - "valid_key_id", - None, - r"DATABASE_ENCRYPTION_FERNET_KEYS has invalid key_id or key \(valid_key_id, None\)", - ), - ], -) -def test_fernet_invalid_key_id_or_key_values(key_id, key_value, expected_error_match): - """Test handling of None or empty key_id or key values in the keys dictionary.""" - with override_options({"database.encryption.method": "fernet"}): - field = EncryptedField() - - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS={key_id: key_value}): - with pytest.raises(ValueError, match=expected_error_match): - field.get_prep_value("test value") - - -def test_fernet_non_utf_8_chars(fernet_keys_value): +def test_fernet_non_utf_8_chars(fernet_keys_store): """Test that different encrypted field types work correctly.""" - with ( - override_options({"database.encryption.method": "fernet"}), - override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value), + key_id, _fernet_key = fernet_keys_store + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } ): text_field = EncryptedField() invalid_utf_8 = b"\xc0" @@ -312,37 +265,43 @@ def test_fernet_non_utf_8_chars(fernet_keys_value): def test_keysets_not_implemented(): """Test that keysets method raises NotImplementedError.""" - with override_options({"database.encryption.method": "keysets"}): + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "keysets"}): field = EncryptedField() with pytest.raises(NotImplementedError, match="Keysets encryption not yet implemented"): field.get_prep_value("test value") -def test_fernet_marker_handling(fernet_keys_value): +def test_fernet_marker_handling(fernet_keys_store): """Test that the fernet marker is handled correctly.""" - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value): - field = EncryptedField() + key_id, _fernet_key = fernet_keys_store - # Create a value with fernet encryption - test_value = "test value" - encrypted = field.get_prep_value(test_value) - assert encrypted is not None + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } + ): + field = EncryptedField() - # Verify it has the new format: enc:fernet:key_id:data - parts = encrypted.split(":") - assert len(parts) == 4 - assert f"{parts[0]}:{parts[1]}" == MARKER_FERNET + # Create a value with fernet encryption + test_value = "test value" + encrypted = field.get_prep_value(test_value) + assert encrypted is not None - # Test that decryption works with marker and key_id - decrypted = field.to_python(encrypted) - assert decrypted == test_value.encode("utf-8") + # Verify it has the new format: enc:fernet:key_id:data + parts = encrypted.split(":") + assert len(parts) == 4 + assert f"{parts[0]}:{parts[1]}" == MARKER_FERNET + + # Test that decryption works with marker and key_id + decrypted = field.to_python(encrypted) + assert decrypted == test_value.encode("utf-8") def test_data_without_marker(): """Test handling of unencrypted data without method marker.""" - with override_options({"database.encryption.method": "plaintext"}): + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): field = EncryptedField() # Simulate unencrypted plain text data (no marker) @@ -362,26 +321,25 @@ def test_to_python_conversion(): assert field.to_python(None) is None # Test encrypted format - with override_options({"database.encryption.method": "plaintext"}): + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): encrypted = f"{MARKER_PLAINTEXT}:{base64.b64encode(b'test bytes').decode('ascii')}" assert field.to_python(encrypted) == b"test bytes" -def test_non_utf8_data_handling(fernet_keys_value): +def test_non_utf8_data_handling(fernet_keys_store): """Test handling of non-UTF8 data.""" - + key_id, _fernet_key = fernet_keys_store invalid_value = b"\xc0" # invalid UTF-8 char for encryption_method in ENCRYPTION_METHODS: - with override_options({"database.encryption.method": encryption_method}): - settings_override = {} - if encryption_method == "fernet": - settings_override["DATABASE_ENCRYPTION_FERNET_KEYS"] = fernet_keys_value + settings_dict = {"method": encryption_method} + if encryption_method == "fernet": + settings_dict["fernet_primary_key_id"] = key_id - with override_settings(**settings_override): - field = EncryptedField() - result = field.to_python(invalid_value) - assert result == invalid_value + with override_settings(DATABASE_ENCRYPTION_SETTINGS=settings_dict): + field = EncryptedField() + result = field.to_python(invalid_value) + assert result == invalid_value @pytest.mark.parametrize("encryption_method", ENCRYPTION_METHODS) @@ -398,84 +356,98 @@ def test_non_utf8_data_handling(fernet_keys_value): b"invalid utf-8: \xc0", ], ) -def test_encryption_decryption_roundtrip(encryption_method, test_value, fernet_keys_value): +def test_encryption_decryption_roundtrip(encryption_method, test_value, fernet_keys_store): """Test that encryption and decryption work correctly in roundtrip.""" - with override_options({"database.encryption.method": encryption_method}): - settings_override = {} - if encryption_method == "fernet": - settings_override["DATABASE_ENCRYPTION_FERNET_KEYS"] = fernet_keys_value + key_id, _fernet_key = fernet_keys_store - with override_settings(**settings_override): - field = EncryptedField() + settings_dict = {"method": encryption_method} + if encryption_method == "fernet": + settings_dict["fernet_primary_key_id"] = key_id - encrypted = field.get_prep_value(test_value) - decrypted = field.to_python(encrypted) + with override_settings(DATABASE_ENCRYPTION_SETTINGS=settings_dict): + field = EncryptedField() + + encrypted = field.get_prep_value(test_value) + decrypted = field.to_python(encrypted) - if test_value is None: - assert decrypted is None - elif isinstance(test_value, str): - assert decrypted == test_value.encode("utf-8") - else: - assert decrypted == test_value + if test_value is None: + assert decrypted is None + elif isinstance(test_value, str): + assert decrypted == test_value.encode("utf-8") + else: + assert decrypted == test_value -def test_marker_format_consistency(fernet_keys_value): +def test_marker_format_consistency(fernet_keys_store): """Test that the marker format is consistent across methods.""" + key_id, _fernet_key = fernet_keys_store field = EncryptedField() - with override_options({"database.encryption.method": "plaintext"}): + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): encrypted = field.get_prep_value("test") assert encrypted is not None assert encrypted.startswith(f"{MARKER_PLAINTEXT}:") - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value): - encrypted = field.get_prep_value("test") - assert encrypted is not None - assert encrypted.startswith(f"{MARKER_FERNET}:") + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } + ): + encrypted = field.get_prep_value("test") + assert encrypted is not None + assert encrypted.startswith(f"{MARKER_FERNET}:") -def test_fernet_missing_key_decryption(): +def test_fernet_missing_key_decryption(fernet_keys_store): """Test that decryption fails gracefully when key_id is not found.""" - keys_dict = { - "key1": Fernet.generate_key().decode(), - } + key_id, _fernet_key = fernet_keys_store - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=keys_dict): - field = EncryptedField() + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } + ): + field = EncryptedField() - # Try to decrypt data that was "encrypted" with a missing key - fake_encrypted_data = base64.b64encode(b"fake data").decode("ascii") - formatted_value = f"{MARKER_FERNET}:missing_key:{fake_encrypted_data}" + # Try to decrypt data that was "encrypted" with a missing key + fake_encrypted_data = base64.b64encode(b"fake data").decode("ascii") + formatted_value = f"{MARKER_FERNET}:missing_key:{fake_encrypted_data}" - # Should fall back to returning the original value when key is missing - result = field.to_python(formatted_value) - assert result == formatted_value # Should return the original encrypted string + # Should fall back to returning the original value when key is missing + result = field.to_python(formatted_value) + assert result == formatted_value # Should return the original encrypted string -def test_fernet_format_with_plaintext_data(fernet_keys_value): +def test_fernet_format_with_plaintext_data(fernet_keys_store): """Test that data in fernet format but containing plain text (not encrypted) falls back correctly.""" - with override_options({"database.encryption.method": "fernet"}): - with override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value): - field = EncryptedField() + key_id, _fernet_key = fernet_keys_store + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } + ): + field = EncryptedField() - # Create data that looks like fernet format but contains plain text instead of encrypted data - # This could happen during migration from plain text to encrypted storage - plaintext_content = "this is just plain text, not encrypted" - fake_fernet_data = f"{MARKER_FERNET}:key_id_1:{plaintext_content}" + # Create data that looks like fernet format but contains plain text instead of encrypted data + # This could happen during migration from plain text to encrypted storage + plaintext_content = "this is just plain text, not encrypted" + fake_fernet_data = f"{MARKER_FERNET}:{key_id}:{plaintext_content}" - # Should fall back to returning the original string since it's not valid encrypted data - result = field.to_python(fake_fernet_data) - assert result == fake_fernet_data # Should return the original string as-is + # Should fall back to returning the original string since it's not valid encrypted data + result = field.to_python(fake_fernet_data) + assert result == fake_fernet_data # Should return the original string as-is - # Test with a format that has valid base64 but invalid fernet data - fake_base64_data = base64.b64encode(b"not fernet encrypted").decode("ascii") - fake_fernet_with_base64 = f"{MARKER_FERNET}:key_id_1:{fake_base64_data}" + # Test with a format that has valid base64 but invalid fernet data + fake_base64_data = base64.b64encode(b"not fernet encrypted").decode("ascii") + fake_fernet_with_base64 = f"{MARKER_FERNET}:{key_id}:{fake_base64_data}" - result = field.to_python(fake_fernet_with_base64) - # This should also fall back to the original string since it's not valid Fernet data - assert result == fake_fernet_with_base64 + result = field.to_python(fake_fernet_with_base64) + # This should also fall back to the original string since it's not valid Fernet data + assert result == fake_fernet_with_base64 class EncryptedFieldModel(models.Model): @@ -487,12 +459,15 @@ class Meta: @pytest.mark.django_db -def test_encrypted_char_field_fernet_end_to_end(fernet_keys_value): +def test_encrypted_char_field_fernet_end_to_end(fernet_keys_store): """Test complete save/retrieve cycle with EncryptedField.""" + key_id, _fernet_key = fernet_keys_store - with ( - override_options({"database.encryption.method": "fernet"}), - override_settings(DATABASE_ENCRYPTION_FERNET_KEYS=fernet_keys_value), + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "method": "fernet", + "fernet_primary_key_id": key_id, + } ): test_data = "This is sensitive data that should be encrypted" @@ -518,38 +493,40 @@ def test_encrypted_char_field_fernet_end_to_end(fernet_keys_value): @pytest.mark.django_db def test_encrypted_char_field_plaintext_end_to_end(): """Test complete save/retrieve cycle with EncryptedCharField.""" - test_data = "This is plain text data" + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + test_data = "This is plain text data" - model_instance = EncryptedFieldModel.objects.create(data=test_data) - assert model_instance.id is not None + model_instance = EncryptedFieldModel.objects.create(data=test_data) + assert model_instance.id is not None - # Verify the data was correctly encrypted and decrypted - retrieved_instance = EncryptedFieldModel.objects.get(id=model_instance.id) - assert retrieved_instance.data == test_data + # Verify the data was correctly encrypted and decrypted + retrieved_instance = EncryptedFieldModel.objects.get(id=model_instance.id) + assert retrieved_instance.data == test_data - with connection.cursor() as cursor: - cursor.execute( - "SELECT data FROM fixtures_encryptedfieldmodel WHERE id = %s", - [model_instance.id], - ) - # Should be in a format enc:plaintext:base64data - raw_value = cursor.fetchone()[0] - assert raw_value.startswith(f"{MARKER_PLAINTEXT}:") - assert test_data not in raw_value + with connection.cursor() as cursor: + cursor.execute( + "SELECT data FROM fixtures_encryptedfieldmodel WHERE id = %s", + [model_instance.id], + ) + # Should be in a format enc:plaintext:base64data + raw_value = cursor.fetchone()[0] + assert raw_value.startswith(f"{MARKER_PLAINTEXT}:") + assert test_data not in raw_value @pytest.mark.django_db def test_encrypted_char_field_null_value(): - model_instance = EncryptedFieldModel.objects.create(data=None) - assert model_instance.id is not None + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + model_instance = EncryptedFieldModel.objects.create(data=None) + assert model_instance.id is not None - retrieved_instance = EncryptedFieldModel.objects.get(id=model_instance.id) - assert retrieved_instance.data is None + retrieved_instance = EncryptedFieldModel.objects.get(id=model_instance.id) + assert retrieved_instance.data is None - with connection.cursor() as cursor: - cursor.execute( - "SELECT data FROM fixtures_encryptedfieldmodel WHERE id = %s", - [model_instance.id], - ) - raw_value = cursor.fetchone()[0] - assert raw_value is None + with connection.cursor() as cursor: + cursor.execute( + "SELECT data FROM fixtures_encryptedfieldmodel WHERE id = %s", + [model_instance.id], + ) + raw_value = cursor.fetchone()[0] + assert raw_value is None diff --git a/tests/sentry/utils/security/test_encrypted_field_key_store.py b/tests/sentry/utils/security/test_encrypted_field_key_store.py new file mode 100644 index 00000000000000..482c67c4ac8161 --- /dev/null +++ b/tests/sentry/utils/security/test_encrypted_field_key_store.py @@ -0,0 +1,376 @@ +import tempfile +from pathlib import Path + +import pytest +from cryptography.fernet import Fernet +from django.core.exceptions import ImproperlyConfigured +from django.test import override_settings + +from sentry.utils.security.encrypted_field_key_store import FernetKeyStore + + +@pytest.fixture(autouse=True) +def reset_key_store(): + """Reset the FernetKeyStore state before each test.""" + original_keys = FernetKeyStore._keys + original_is_loaded = FernetKeyStore._is_loaded + + FernetKeyStore._keys = {} + FernetKeyStore._is_loaded = False + + yield + + FernetKeyStore._keys = original_keys + FernetKeyStore._is_loaded = original_is_loaded + + +@pytest.fixture +def temp_keys_dir(): + """Create a temporary directory for key files.""" + with tempfile.TemporaryDirectory() as temp_dir: + yield Path(temp_dir) + + +@pytest.fixture +def valid_fernet_key(): + """Generate a valid Fernet key.""" + return Fernet.generate_key() + + +@pytest.fixture +def fernet_keys_store(valid_fernet_key): + """Single key for testing. Mocks the FernetKeyStore._keys attribute.""" + key_id = "key_id_1" + original_keys = FernetKeyStore._keys + original_is_loaded = FernetKeyStore._is_loaded + + # Mock the key store + FernetKeyStore._keys = {key_id: Fernet(valid_fernet_key)} + FernetKeyStore._is_loaded = True + + yield key_id, valid_fernet_key + + # Restore original state + FernetKeyStore._keys = original_keys + FernetKeyStore._is_loaded = original_is_loaded + + +@pytest.fixture +def multi_fernet_keys_store(): + """Multiple keys for testing key rotation. Mocks the FernetKeyStore._keys attribute.""" + key1 = Fernet.generate_key() + key2 = Fernet.generate_key() + key3 = Fernet.generate_key() + keys_dict = { + "key_primary": Fernet(key1), + "key_secondary": Fernet(key2), + "key_tertiary": Fernet(key3), + } + + original_keys = FernetKeyStore._keys + original_is_loaded = FernetKeyStore._is_loaded + + # Mock the key store + FernetKeyStore._keys = keys_dict + FernetKeyStore._is_loaded = True + + yield {"key_primary": key1, "key_secondary": key2, "key_tertiary": key3} + + # Restore original state + FernetKeyStore._keys = original_keys + FernetKeyStore._is_loaded = original_is_loaded + + +class TestPathToKeys: + def test_path_to_keys_with_valid_path(self): + """Test _path_to_keys returns Path when settings configured.""" + test_path = "/path/to/keys" + with override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": test_path}): + result = FernetKeyStore._path_to_keys() + assert result == Path(test_path) + assert isinstance(result, Path) + + def test_path_to_keys_with_none(self): + """Test _path_to_keys returns None when no path configured.""" + with override_settings(DATABASE_ENCRYPTION_SETTINGS={}): + result = FernetKeyStore._path_to_keys() + assert result is None + + +class TestLoadKeys: + def test_load_keys_no_path_configured(self): + """Test load_keys when no keys directory is configured.""" + with override_settings(DATABASE_ENCRYPTION_SETTINGS={}): + FernetKeyStore.load_keys() + + assert FernetKeyStore._keys is None + assert FernetKeyStore._is_loaded is True + + def test_load_keys_directory_not_exists(self, tmp_path): + """Test load_keys raises error when directory doesn't exist.""" + non_existent_path = tmp_path / "non_existent" + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(non_existent_path)} + ): + with pytest.raises(ImproperlyConfigured, match="Key directory not found"): + FernetKeyStore.load_keys() + + def test_load_keys_path_is_file_not_directory(self, tmp_path): + """Test load_keys raises error when path is a file, not a directory.""" + file_path = tmp_path / "keyfile.txt" + file_path.write_text("not a directory") + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(file_path)} + ): + with pytest.raises(ImproperlyConfigured, match="Key directory not found"): + FernetKeyStore.load_keys() + + def test_load_keys_empty_directory(self, temp_keys_dir): + """Test load_keys with empty directory.""" + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + assert FernetKeyStore._keys == {} + assert FernetKeyStore._is_loaded is True + + def test_load_keys_single_valid_key(self, temp_keys_dir, valid_fernet_key): + """Test load_keys with a single valid key file.""" + key_file = temp_keys_dir / "key_1" + key_file.write_text(valid_fernet_key.decode("utf-8")) + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + assert len(FernetKeyStore._keys) == 1 + assert "key_1" in FernetKeyStore._keys + assert isinstance(FernetKeyStore._keys["key_1"], Fernet) + assert FernetKeyStore._is_loaded is True + + def test_load_keys_multiple_valid_keys(self, temp_keys_dir): + """Test load_keys with multiple valid key files.""" + key1 = Fernet.generate_key() + key2 = Fernet.generate_key() + key3 = Fernet.generate_key() + + (temp_keys_dir / "primary_key").write_text(key1.decode("utf-8")) + (temp_keys_dir / "secondary_key").write_text(key2.decode("utf-8")) + (temp_keys_dir / "tertiary_key").write_text(key3.decode("utf-8")) + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + assert len(FernetKeyStore._keys) == 3 + assert "primary_key" in FernetKeyStore._keys + assert "secondary_key" in FernetKeyStore._keys + assert "tertiary_key" in FernetKeyStore._keys + assert FernetKeyStore._is_loaded is True + + def test_load_keys_skips_hidden_files(self, temp_keys_dir, valid_fernet_key): + """Test load_keys skips hidden files (starting with .).""" + visible_key = temp_keys_dir / "visible_key" + hidden_key = temp_keys_dir / ".hidden_key" + + visible_key.write_text(valid_fernet_key.decode("utf-8")) + hidden_key.write_text(valid_fernet_key.decode("utf-8")) + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + assert len(FernetKeyStore._keys) == 1 + assert "visible_key" in FernetKeyStore._keys + assert ".hidden_key" not in FernetKeyStore._keys + + def test_load_keys_skips_subdirectories(self, temp_keys_dir, valid_fernet_key): + """Test load_keys ignores subdirectories.""" + key_file = temp_keys_dir / "key_1" + key_file.write_text(valid_fernet_key.decode("utf-8")) + + # Create a subdirectory + subdir = temp_keys_dir / "subdir" + subdir.mkdir() + (subdir / "nested_key").write_text(valid_fernet_key.decode("utf-8")) + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + assert len(FernetKeyStore._keys) == 1 + assert "key_1" in FernetKeyStore._keys + assert "subdir" not in FernetKeyStore._keys + + def test_load_keys_empty_file_skipped(self, temp_keys_dir, valid_fernet_key): + """Test load_keys skips empty key files.""" + valid_key_file = temp_keys_dir / "valid_key" + empty_key_file = temp_keys_dir / "empty_key" + whitespace_key_file = temp_keys_dir / "whitespace_key" + + valid_key_file.write_text(valid_fernet_key.decode("utf-8")) + empty_key_file.write_text("") + whitespace_key_file.write_text(" \n \t ") + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + # Only valid key should be loaded, empty files are skipped + assert len(FernetKeyStore._keys) == 1 + assert "valid_key" in FernetKeyStore._keys + assert "empty_key" not in FernetKeyStore._keys + assert "whitespace_key" not in FernetKeyStore._keys + + def test_load_keys_invalid_fernet_key(self, temp_keys_dir, valid_fernet_key): + """Test load_keys handles invalid Fernet keys gracefully.""" + valid_key_file = temp_keys_dir / "valid_key" + invalid_key_file = temp_keys_dir / "invalid_key" + + valid_key_file.write_text(valid_fernet_key.decode("utf-8")) + invalid_key_file.write_text("this-is-not-a-valid-fernet-key") + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + # Only valid key should be loaded, invalid key is skipped + assert len(FernetKeyStore._keys) == 1 + assert "valid_key" in FernetKeyStore._keys + assert "invalid_key" not in FernetKeyStore._keys + + def test_load_keys_with_newlines_and_whitespace(self, temp_keys_dir): + """Test load_keys strips whitespace from key content.""" + key = Fernet.generate_key() + key_file = temp_keys_dir / "key_with_whitespace" + key_file.write_text(f" {key.decode('utf-8')} \n\n") + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + FernetKeyStore.load_keys() + + assert len(FernetKeyStore._keys) == 1 + assert "key_with_whitespace" in FernetKeyStore._keys + + +class TestGetFernetForKeyId: + def test_get_fernet_for_key_id_auto_loads_keys(self, temp_keys_dir, valid_fernet_key): + """Test get_fernet_for_key_id auto-loads keys on first access.""" + key_file = temp_keys_dir / "auto_load_key" + key_file.write_text(valid_fernet_key.decode("utf-8")) + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} + ): + # Keys not loaded yet + assert FernetKeyStore._is_loaded is False + + fernet = FernetKeyStore.get_fernet_for_key_id("auto_load_key") + + # Should have auto-loaded + assert FernetKeyStore._is_loaded is True + assert isinstance(fernet, Fernet) + + def test_get_fernet_for_key_id_returns_fernet_instance(self, fernet_keys_store): + """Test get_fernet_for_key_id returns Fernet instance for valid key_id.""" + key_id, _fernet_key = fernet_keys_store + + fernet = FernetKeyStore.get_fernet_for_key_id(key_id) + + assert isinstance(fernet, Fernet) + + def test_get_fernet_for_key_id_raises_when_keys_none(self): + """Test get_fernet_for_key_id raises error when keys are None (misconfigured).""" + # Simulate no keys directory configured + FernetKeyStore._keys = None + FernetKeyStore._is_loaded = True + + with pytest.raises(ValueError, match="Fernet encryption keys are not loaded"): + FernetKeyStore.get_fernet_for_key_id("any_key") + + def test_get_fernet_for_key_id_raises_when_key_not_found(self, fernet_keys_store): + """Test get_fernet_for_key_id raises error when key_id doesn't exist.""" + _key_id, _fernet_key = fernet_keys_store + + with pytest.raises(ValueError, match="Encryption key with ID 'missing_key' not found"): + FernetKeyStore.get_fernet_for_key_id("missing_key") + + +class TestGetPrimaryFernet: + def test_get_primary_fernet_returns_tuple(self, fernet_keys_store): + """Test get_primary_fernet returns (key_id, Fernet) tuple.""" + key_id, _fernet_key = fernet_keys_store + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "fernet_primary_key_id": key_id, + } + ): + returned_key_id, fernet = FernetKeyStore.get_primary_fernet() + + assert returned_key_id == key_id + assert isinstance(fernet, Fernet) + + def test_get_primary_fernet_raises_when_not_configured(self): + """Test get_primary_fernet raises error when primary key ID not configured.""" + with override_settings(DATABASE_ENCRYPTION_SETTINGS={}): + with pytest.raises(ValueError, match="Fernet primary key ID is not configured"): + FernetKeyStore.get_primary_fernet() + + def test_get_primary_fernet_raises_when_key_not_found(self, fernet_keys_store): + """Test get_primary_fernet raises error when primary key doesn't exist.""" + _key_id, _fernet_key = fernet_keys_store + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "fernet_primary_key_id": "nonexistent_primary", + } + ): + with pytest.raises( + ValueError, match="Encryption key with ID 'nonexistent_primary' not found" + ): + FernetKeyStore.get_primary_fernet() + + def test_get_primary_fernet_with_multiple_keys(self, multi_fernet_keys_store): + """Test get_primary_fernet returns correct key when multiple keys exist.""" + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "fernet_primary_key_id": "key_secondary", + } + ): + key_id, fernet = FernetKeyStore.get_primary_fernet() + + assert key_id == "key_secondary" + assert isinstance(fernet, Fernet) + + def test_get_primary_fernet_auto_loads_if_needed(self, temp_keys_dir): + """Test get_primary_fernet auto-loads keys if not already loaded.""" + key = Fernet.generate_key() + (temp_keys_dir / "primary").write_text(key.decode("utf-8")) + + with override_settings( + DATABASE_ENCRYPTION_SETTINGS={ + "fernet_keys_location": str(temp_keys_dir), + "fernet_primary_key_id": "primary", + } + ): + # Keys not loaded yet + assert FernetKeyStore._is_loaded is False + + key_id, fernet = FernetKeyStore.get_primary_fernet() + + # Should have auto-loaded + assert FernetKeyStore._is_loaded is True + assert key_id == "primary" + assert isinstance(fernet, Fernet) From f59fad4e40b4652a615e2567e4136d03c7251c22 Mon Sep 17 00:00:00 2001 From: Vjeran Grozdanic Date: Tue, 18 Nov 2025 12:14:43 +0100 Subject: [PATCH 2/5] prevent stale keys --- src/sentry/utils/security/encrypted_field_key_store.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sentry/utils/security/encrypted_field_key_store.py b/src/sentry/utils/security/encrypted_field_key_store.py index 3a762c14aabac5..f5c568c2d48a4f 100644 --- a/src/sentry/utils/security/encrypted_field_key_store.py +++ b/src/sentry/utils/security/encrypted_field_key_store.py @@ -37,6 +37,9 @@ def load_keys(cls) -> None: if not path.exists() or not path.is_dir(): raise ImproperlyConfigured(f"Key directory not found: {path}") + # Clear the keys dictionary to avoid stale data + cls._keys = {} + for file_path in path.iterdir(): if file_path.is_file(): # Skip hidden files From a322b0cb800f99d4587576b880c17f54aa3464b6 Mon Sep 17 00:00:00 2001 From: Vjeran Grozdanic Date: Tue, 18 Nov 2025 13:33:47 +0100 Subject: [PATCH 3/5] use option to control encrpytion method --- src/sentry/db/models/fields/encryption.py | 9 +- .../security/encrypted_field_key_store.py | 2 +- .../db/models/fields/test_encryption.py | 132 +++++++----------- 3 files changed, 62 insertions(+), 81 deletions(-) diff --git a/src/sentry/db/models/fields/encryption.py b/src/sentry/db/models/fields/encryption.py index 15bd48aeb3ecda..f035dc493f7e39 100644 --- a/src/sentry/db/models/fields/encryption.py +++ b/src/sentry/db/models/fields/encryption.py @@ -7,9 +7,9 @@ import sentry_sdk from cryptography.fernet import InvalidToken -from django.conf import settings from django.db.models import CharField, Field +from sentry import options from sentry.utils.security.encrypted_field_key_store import FernetKeyStore logger = logging.getLogger(__name__) @@ -102,7 +102,12 @@ def get_prep_value(self, value: Any) -> str | None: if value is None: return value - encryption_method = settings.DATABASE_ENCRYPTION_SETTINGS["method"] + # Get the encryption method from the options + # xxx(vgrozdanic): this is a temporary solution during a rollout + # so that we can quickly rollback if needed. + encryption_method = options.get("database.encryption.method") + if encryption_method not in self._encryption_handlers: + raise ValueError(f"Invalid encryption method: {encryption_method}") handler = self._encryption_handlers[encryption_method] return handler["encrypt"](value) diff --git a/src/sentry/utils/security/encrypted_field_key_store.py b/src/sentry/utils/security/encrypted_field_key_store.py index f5c568c2d48a4f..81f15a97792f22 100644 --- a/src/sentry/utils/security/encrypted_field_key_store.py +++ b/src/sentry/utils/security/encrypted_field_key_store.py @@ -10,7 +10,7 @@ class FernetKeyStore: - _keys = {} + _keys: dict | None = {} _is_loaded = False @classmethod diff --git a/tests/sentry/db/models/fields/test_encryption.py b/tests/sentry/db/models/fields/test_encryption.py index 581725d0f901b8..4fa5466a392507 100644 --- a/tests/sentry/db/models/fields/test_encryption.py +++ b/tests/sentry/db/models/fields/test_encryption.py @@ -11,6 +11,7 @@ EncryptedCharField, EncryptedField, ) +from sentry.testutils.helpers import override_options from sentry.utils.security.encrypted_field_key_store import FernetKeyStore ENCRYPTION_METHODS = ("plaintext", "fernet") @@ -69,7 +70,7 @@ def multi_fernet_keys_store(): def test_plaintext_encryption(): - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + with override_options({"database.encryption.method": "plaintext"}): field = EncryptedField() # Test encryption (should return marker:base64 encoded string) @@ -95,11 +96,9 @@ def test_fernet_encryption_without_key(): FernetKeyStore._is_loaded = True try: - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": "test_key", - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": "test_key"}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() with pytest.raises(ValueError, match="Fernet encryption keys are not loaded"): @@ -111,11 +110,9 @@ def test_fernet_encryption_without_key(): def test_fernet_encryption_with_key(multi_fernet_keys_store): """Test Fernet encryption with a valid key using new format.""" - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": "key_primary", - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": "key_primary"}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() @@ -150,11 +147,9 @@ def test_fernet_encryption_with_key(multi_fernet_keys_store): def test_fernet_key_rotation(multi_fernet_keys_store): """Test that data encrypted with different keys can be decrypted.""" - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": "key_primary", - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": "key_primary"}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() @@ -182,11 +177,9 @@ def test_fernet_format_without_key_id_rejected(fernet_keys_store): """Test that Fernet format without key_id is rejected.""" key_id, fernet_key = fernet_keys_store - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() @@ -206,27 +199,24 @@ def test_encryption_method_switching(fernet_keys_store): key_id, _fernet_key = fernet_keys_store # encrypt with Fernet - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() encrypted_fernet = field.get_prep_value("fernet encrypted") # encrypt with plain text - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + with override_options({"database.encryption.method": "plaintext"}): field = EncryptedField() encrypted_plain = field.get_prep_value("plain text value") # assert that both can be decrypted independently of the encryption method for encryption_method in ENCRYPTION_METHODS: - settings_dict = {"method": encryption_method} - if encryption_method == "fernet": - settings_dict["fernet_primary_key_id"] = key_id - - with override_settings(DATABASE_ENCRYPTION_SETTINGS=settings_dict): + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": encryption_method}), + ): field = EncryptedField() # Should decrypt fernet value @@ -242,11 +232,9 @@ def test_fernet_non_utf_8_chars(fernet_keys_store): """Test that different encrypted field types work correctly.""" key_id, _fernet_key = fernet_keys_store - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): text_field = EncryptedField() invalid_utf_8 = b"\xc0" @@ -265,7 +253,7 @@ def test_fernet_non_utf_8_chars(fernet_keys_store): def test_keysets_not_implemented(): """Test that keysets method raises NotImplementedError.""" - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "keysets"}): + with override_options({"database.encryption.method": "keysets"}): field = EncryptedField() with pytest.raises(NotImplementedError, match="Keysets encryption not yet implemented"): @@ -276,11 +264,9 @@ def test_fernet_marker_handling(fernet_keys_store): """Test that the fernet marker is handled correctly.""" key_id, _fernet_key = fernet_keys_store - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() @@ -301,7 +287,7 @@ def test_fernet_marker_handling(fernet_keys_store): def test_data_without_marker(): """Test handling of unencrypted data without method marker.""" - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + with override_options({"database.encryption.method": "plaintext"}): field = EncryptedField() # Simulate unencrypted plain text data (no marker) @@ -321,7 +307,7 @@ def test_to_python_conversion(): assert field.to_python(None) is None # Test encrypted format - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + with override_options({"database.encryption.method": "plaintext"}): encrypted = f"{MARKER_PLAINTEXT}:{base64.b64encode(b'test bytes').decode('ascii')}" assert field.to_python(encrypted) == b"test bytes" @@ -332,11 +318,10 @@ def test_non_utf8_data_handling(fernet_keys_store): invalid_value = b"\xc0" # invalid UTF-8 char for encryption_method in ENCRYPTION_METHODS: - settings_dict = {"method": encryption_method} - if encryption_method == "fernet": - settings_dict["fernet_primary_key_id"] = key_id - - with override_settings(DATABASE_ENCRYPTION_SETTINGS=settings_dict): + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": encryption_method}), + ): field = EncryptedField() result = field.to_python(invalid_value) assert result == invalid_value @@ -360,11 +345,10 @@ def test_encryption_decryption_roundtrip(encryption_method, test_value, fernet_k """Test that encryption and decryption work correctly in roundtrip.""" key_id, _fernet_key = fernet_keys_store - settings_dict = {"method": encryption_method} - if encryption_method == "fernet": - settings_dict["fernet_primary_key_id"] = key_id - - with override_settings(DATABASE_ENCRYPTION_SETTINGS=settings_dict): + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": encryption_method}), + ): field = EncryptedField() encrypted = field.get_prep_value(test_value) @@ -383,16 +367,14 @@ def test_marker_format_consistency(fernet_keys_store): key_id, _fernet_key = fernet_keys_store field = EncryptedField() - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + with override_options({"database.encryption.method": "plaintext"}): encrypted = field.get_prep_value("test") assert encrypted is not None assert encrypted.startswith(f"{MARKER_PLAINTEXT}:") - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): encrypted = field.get_prep_value("test") assert encrypted is not None @@ -403,11 +385,9 @@ def test_fernet_missing_key_decryption(fernet_keys_store): """Test that decryption fails gracefully when key_id is not found.""" key_id, _fernet_key = fernet_keys_store - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() @@ -424,11 +404,9 @@ def test_fernet_format_with_plaintext_data(fernet_keys_store): """Test that data in fernet format but containing plain text (not encrypted) falls back correctly.""" key_id, _fernet_key = fernet_keys_store - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): field = EncryptedField() @@ -463,11 +441,9 @@ def test_encrypted_char_field_fernet_end_to_end(fernet_keys_store): """Test complete save/retrieve cycle with EncryptedField.""" key_id, _fernet_key = fernet_keys_store - with override_settings( - DATABASE_ENCRYPTION_SETTINGS={ - "method": "fernet", - "fernet_primary_key_id": key_id, - } + with ( + override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_primary_key_id": key_id}), + override_options({"database.encryption.method": "fernet"}), ): test_data = "This is sensitive data that should be encrypted" @@ -493,7 +469,7 @@ def test_encrypted_char_field_fernet_end_to_end(fernet_keys_store): @pytest.mark.django_db def test_encrypted_char_field_plaintext_end_to_end(): """Test complete save/retrieve cycle with EncryptedCharField.""" - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + with override_options({"database.encryption.method": "plaintext"}): test_data = "This is plain text data" model_instance = EncryptedFieldModel.objects.create(data=test_data) @@ -516,7 +492,7 @@ def test_encrypted_char_field_plaintext_end_to_end(): @pytest.mark.django_db def test_encrypted_char_field_null_value(): - with override_settings(DATABASE_ENCRYPTION_SETTINGS={"method": "plaintext"}): + with override_options({"database.encryption.method": "plaintext"}): model_instance = EncryptedFieldModel.objects.create(data=None) assert model_instance.id is not None From a175b840dd3fe988eb677db2db46e4b6ef9e437e Mon Sep 17 00:00:00 2001 From: Vjeran Grozdanic Date: Tue, 18 Nov 2025 14:27:23 +0100 Subject: [PATCH 4/5] mypy fixes --- .../security/encrypted_field_key_store.py | 2 +- .../test_encrypted_field_key_store.py | 87 ++++++++++++------- 2 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/sentry/utils/security/encrypted_field_key_store.py b/src/sentry/utils/security/encrypted_field_key_store.py index 81f15a97792f22..00ba5725542f17 100644 --- a/src/sentry/utils/security/encrypted_field_key_store.py +++ b/src/sentry/utils/security/encrypted_field_key_store.py @@ -10,7 +10,7 @@ class FernetKeyStore: - _keys: dict | None = {} + _keys: dict[str, Fernet] | None = {} _is_loaded = False @classmethod diff --git a/tests/sentry/utils/security/test_encrypted_field_key_store.py b/tests/sentry/utils/security/test_encrypted_field_key_store.py index 482c67c4ac8161..6a28c446765fa9 100644 --- a/tests/sentry/utils/security/test_encrypted_field_key_store.py +++ b/tests/sentry/utils/security/test_encrypted_field_key_store.py @@ -1,4 +1,5 @@ import tempfile +from collections.abc import Generator from pathlib import Path import pytest @@ -10,7 +11,7 @@ @pytest.fixture(autouse=True) -def reset_key_store(): +def reset_key_store() -> Generator[None]: """Reset the FernetKeyStore state before each test.""" original_keys = FernetKeyStore._keys original_is_loaded = FernetKeyStore._is_loaded @@ -25,20 +26,20 @@ def reset_key_store(): @pytest.fixture -def temp_keys_dir(): +def temp_keys_dir() -> Generator[Path]: """Create a temporary directory for key files.""" with tempfile.TemporaryDirectory() as temp_dir: yield Path(temp_dir) @pytest.fixture -def valid_fernet_key(): +def valid_fernet_key() -> bytes: """Generate a valid Fernet key.""" return Fernet.generate_key() @pytest.fixture -def fernet_keys_store(valid_fernet_key): +def fernet_keys_store(valid_fernet_key: bytes) -> Generator[tuple[str, bytes]]: """Single key for testing. Mocks the FernetKeyStore._keys attribute.""" key_id = "key_id_1" original_keys = FernetKeyStore._keys @@ -56,7 +57,7 @@ def fernet_keys_store(valid_fernet_key): @pytest.fixture -def multi_fernet_keys_store(): +def multi_fernet_keys_store() -> Generator[dict[str, bytes]]: """Multiple keys for testing key rotation. Mocks the FernetKeyStore._keys attribute.""" key1 = Fernet.generate_key() key2 = Fernet.generate_key() @@ -82,7 +83,7 @@ def multi_fernet_keys_store(): class TestPathToKeys: - def test_path_to_keys_with_valid_path(self): + def test_path_to_keys_with_valid_path(self) -> None: """Test _path_to_keys returns Path when settings configured.""" test_path = "/path/to/keys" with override_settings(DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": test_path}): @@ -90,7 +91,7 @@ def test_path_to_keys_with_valid_path(self): assert result == Path(test_path) assert isinstance(result, Path) - def test_path_to_keys_with_none(self): + def test_path_to_keys_with_none(self) -> None: """Test _path_to_keys returns None when no path configured.""" with override_settings(DATABASE_ENCRYPTION_SETTINGS={}): result = FernetKeyStore._path_to_keys() @@ -98,7 +99,7 @@ def test_path_to_keys_with_none(self): class TestLoadKeys: - def test_load_keys_no_path_configured(self): + def test_load_keys_no_path_configured(self) -> None: """Test load_keys when no keys directory is configured.""" with override_settings(DATABASE_ENCRYPTION_SETTINGS={}): FernetKeyStore.load_keys() @@ -106,7 +107,7 @@ def test_load_keys_no_path_configured(self): assert FernetKeyStore._keys is None assert FernetKeyStore._is_loaded is True - def test_load_keys_directory_not_exists(self, tmp_path): + def test_load_keys_directory_not_exists(self, tmp_path: Path) -> None: """Test load_keys raises error when directory doesn't exist.""" non_existent_path = tmp_path / "non_existent" @@ -116,7 +117,7 @@ def test_load_keys_directory_not_exists(self, tmp_path): with pytest.raises(ImproperlyConfigured, match="Key directory not found"): FernetKeyStore.load_keys() - def test_load_keys_path_is_file_not_directory(self, tmp_path): + def test_load_keys_path_is_file_not_directory(self, tmp_path: Path) -> None: """Test load_keys raises error when path is a file, not a directory.""" file_path = tmp_path / "keyfile.txt" file_path.write_text("not a directory") @@ -127,17 +128,18 @@ def test_load_keys_path_is_file_not_directory(self, tmp_path): with pytest.raises(ImproperlyConfigured, match="Key directory not found"): FernetKeyStore.load_keys() - def test_load_keys_empty_directory(self, temp_keys_dir): + def test_load_keys_empty_directory(self, temp_keys_dir: Path) -> None: """Test load_keys with empty directory.""" with override_settings( DATABASE_ENCRYPTION_SETTINGS={"fernet_keys_location": str(temp_keys_dir)} ): FernetKeyStore.load_keys() + assert FernetKeyStore._keys is not None assert FernetKeyStore._keys == {} assert FernetKeyStore._is_loaded is True - def test_load_keys_single_valid_key(self, temp_keys_dir, valid_fernet_key): + def test_load_keys_single_valid_key(self, temp_keys_dir: Path, valid_fernet_key: bytes) -> None: """Test load_keys with a single valid key file.""" key_file = temp_keys_dir / "key_1" key_file.write_text(valid_fernet_key.decode("utf-8")) @@ -147,12 +149,13 @@ def test_load_keys_single_valid_key(self, temp_keys_dir, valid_fernet_key): ): FernetKeyStore.load_keys() + assert FernetKeyStore._keys is not None assert len(FernetKeyStore._keys) == 1 assert "key_1" in FernetKeyStore._keys assert isinstance(FernetKeyStore._keys["key_1"], Fernet) assert FernetKeyStore._is_loaded is True - def test_load_keys_multiple_valid_keys(self, temp_keys_dir): + def test_load_keys_multiple_valid_keys(self, temp_keys_dir: Path) -> None: """Test load_keys with multiple valid key files.""" key1 = Fernet.generate_key() key2 = Fernet.generate_key() @@ -167,13 +170,16 @@ def test_load_keys_multiple_valid_keys(self, temp_keys_dir): ): FernetKeyStore.load_keys() + assert FernetKeyStore._keys is not None assert len(FernetKeyStore._keys) == 3 assert "primary_key" in FernetKeyStore._keys assert "secondary_key" in FernetKeyStore._keys assert "tertiary_key" in FernetKeyStore._keys assert FernetKeyStore._is_loaded is True - def test_load_keys_skips_hidden_files(self, temp_keys_dir, valid_fernet_key): + def test_load_keys_skips_hidden_files( + self, temp_keys_dir: Path, valid_fernet_key: bytes + ) -> None: """Test load_keys skips hidden files (starting with .).""" visible_key = temp_keys_dir / "visible_key" hidden_key = temp_keys_dir / ".hidden_key" @@ -186,11 +192,14 @@ def test_load_keys_skips_hidden_files(self, temp_keys_dir, valid_fernet_key): ): FernetKeyStore.load_keys() + assert FernetKeyStore._keys is not None assert len(FernetKeyStore._keys) == 1 assert "visible_key" in FernetKeyStore._keys assert ".hidden_key" not in FernetKeyStore._keys - def test_load_keys_skips_subdirectories(self, temp_keys_dir, valid_fernet_key): + def test_load_keys_skips_subdirectories( + self, temp_keys_dir: Path, valid_fernet_key: bytes + ) -> None: """Test load_keys ignores subdirectories.""" key_file = temp_keys_dir / "key_1" key_file.write_text(valid_fernet_key.decode("utf-8")) @@ -205,11 +214,14 @@ def test_load_keys_skips_subdirectories(self, temp_keys_dir, valid_fernet_key): ): FernetKeyStore.load_keys() + assert FernetKeyStore._keys is not None assert len(FernetKeyStore._keys) == 1 assert "key_1" in FernetKeyStore._keys assert "subdir" not in FernetKeyStore._keys - def test_load_keys_empty_file_skipped(self, temp_keys_dir, valid_fernet_key): + def test_load_keys_empty_file_skipped( + self, temp_keys_dir: Path, valid_fernet_key: bytes + ) -> None: """Test load_keys skips empty key files.""" valid_key_file = temp_keys_dir / "valid_key" empty_key_file = temp_keys_dir / "empty_key" @@ -225,12 +237,15 @@ def test_load_keys_empty_file_skipped(self, temp_keys_dir, valid_fernet_key): FernetKeyStore.load_keys() # Only valid key should be loaded, empty files are skipped + assert FernetKeyStore._keys is not None assert len(FernetKeyStore._keys) == 1 assert "valid_key" in FernetKeyStore._keys assert "empty_key" not in FernetKeyStore._keys assert "whitespace_key" not in FernetKeyStore._keys - def test_load_keys_invalid_fernet_key(self, temp_keys_dir, valid_fernet_key): + def test_load_keys_invalid_fernet_key( + self, temp_keys_dir: Path, valid_fernet_key: bytes + ) -> None: """Test load_keys handles invalid Fernet keys gracefully.""" valid_key_file = temp_keys_dir / "valid_key" invalid_key_file = temp_keys_dir / "invalid_key" @@ -244,11 +259,12 @@ def test_load_keys_invalid_fernet_key(self, temp_keys_dir, valid_fernet_key): FernetKeyStore.load_keys() # Only valid key should be loaded, invalid key is skipped + assert FernetKeyStore._keys is not None assert len(FernetKeyStore._keys) == 1 assert "valid_key" in FernetKeyStore._keys assert "invalid_key" not in FernetKeyStore._keys - def test_load_keys_with_newlines_and_whitespace(self, temp_keys_dir): + def test_load_keys_with_newlines_and_whitespace(self, temp_keys_dir: Path) -> None: """Test load_keys strips whitespace from key content.""" key = Fernet.generate_key() key_file = temp_keys_dir / "key_with_whitespace" @@ -259,12 +275,15 @@ def test_load_keys_with_newlines_and_whitespace(self, temp_keys_dir): ): FernetKeyStore.load_keys() + assert FernetKeyStore._keys is not None assert len(FernetKeyStore._keys) == 1 assert "key_with_whitespace" in FernetKeyStore._keys class TestGetFernetForKeyId: - def test_get_fernet_for_key_id_auto_loads_keys(self, temp_keys_dir, valid_fernet_key): + def test_get_fernet_for_key_id_auto_loads_keys( + self, temp_keys_dir: Path, valid_fernet_key: bytes + ) -> None: """Test get_fernet_for_key_id auto-loads keys on first access.""" key_file = temp_keys_dir / "auto_load_key" key_file.write_text(valid_fernet_key.decode("utf-8")) @@ -279,17 +298,18 @@ def test_get_fernet_for_key_id_auto_loads_keys(self, temp_keys_dir, valid_fernet # Should have auto-loaded assert FernetKeyStore._is_loaded is True - assert isinstance(fernet, Fernet) + assert isinstance(fernet, Fernet) # type: ignore[unreachable] - def test_get_fernet_for_key_id_returns_fernet_instance(self, fernet_keys_store): + def test_get_fernet_for_key_id_returns_fernet_instance( + self, fernet_keys_store: tuple[str, bytes] + ) -> None: """Test get_fernet_for_key_id returns Fernet instance for valid key_id.""" key_id, _fernet_key = fernet_keys_store fernet = FernetKeyStore.get_fernet_for_key_id(key_id) - assert isinstance(fernet, Fernet) - def test_get_fernet_for_key_id_raises_when_keys_none(self): + def test_get_fernet_for_key_id_raises_when_keys_none(self) -> None: """Test get_fernet_for_key_id raises error when keys are None (misconfigured).""" # Simulate no keys directory configured FernetKeyStore._keys = None @@ -298,7 +318,9 @@ def test_get_fernet_for_key_id_raises_when_keys_none(self): with pytest.raises(ValueError, match="Fernet encryption keys are not loaded"): FernetKeyStore.get_fernet_for_key_id("any_key") - def test_get_fernet_for_key_id_raises_when_key_not_found(self, fernet_keys_store): + def test_get_fernet_for_key_id_raises_when_key_not_found( + self, fernet_keys_store: tuple[str, bytes] + ) -> None: """Test get_fernet_for_key_id raises error when key_id doesn't exist.""" _key_id, _fernet_key = fernet_keys_store @@ -307,7 +329,7 @@ def test_get_fernet_for_key_id_raises_when_key_not_found(self, fernet_keys_store class TestGetPrimaryFernet: - def test_get_primary_fernet_returns_tuple(self, fernet_keys_store): + def test_get_primary_fernet_returns_tuple(self, fernet_keys_store: tuple[str, bytes]) -> None: """Test get_primary_fernet returns (key_id, Fernet) tuple.""" key_id, _fernet_key = fernet_keys_store @@ -319,15 +341,16 @@ def test_get_primary_fernet_returns_tuple(self, fernet_keys_store): returned_key_id, fernet = FernetKeyStore.get_primary_fernet() assert returned_key_id == key_id - assert isinstance(fernet, Fernet) - def test_get_primary_fernet_raises_when_not_configured(self): + def test_get_primary_fernet_raises_when_not_configured(self) -> None: """Test get_primary_fernet raises error when primary key ID not configured.""" with override_settings(DATABASE_ENCRYPTION_SETTINGS={}): with pytest.raises(ValueError, match="Fernet primary key ID is not configured"): FernetKeyStore.get_primary_fernet() - def test_get_primary_fernet_raises_when_key_not_found(self, fernet_keys_store): + def test_get_primary_fernet_raises_when_key_not_found( + self, fernet_keys_store: tuple[str, bytes] + ) -> None: """Test get_primary_fernet raises error when primary key doesn't exist.""" _key_id, _fernet_key = fernet_keys_store @@ -341,7 +364,9 @@ def test_get_primary_fernet_raises_when_key_not_found(self, fernet_keys_store): ): FernetKeyStore.get_primary_fernet() - def test_get_primary_fernet_with_multiple_keys(self, multi_fernet_keys_store): + def test_get_primary_fernet_with_multiple_keys( + self, multi_fernet_keys_store: dict[str, bytes] + ) -> None: """Test get_primary_fernet returns correct key when multiple keys exist.""" with override_settings( @@ -354,7 +379,7 @@ def test_get_primary_fernet_with_multiple_keys(self, multi_fernet_keys_store): assert key_id == "key_secondary" assert isinstance(fernet, Fernet) - def test_get_primary_fernet_auto_loads_if_needed(self, temp_keys_dir): + def test_get_primary_fernet_auto_loads_if_needed(self, temp_keys_dir: Path) -> None: """Test get_primary_fernet auto-loads keys if not already loaded.""" key = Fernet.generate_key() (temp_keys_dir / "primary").write_text(key.decode("utf-8")) @@ -372,5 +397,5 @@ def test_get_primary_fernet_auto_loads_if_needed(self, temp_keys_dir): # Should have auto-loaded assert FernetKeyStore._is_loaded is True - assert key_id == "primary" + assert key_id == "primary" # type: ignore[unreachable] assert isinstance(fernet, Fernet) From 4a2b9ea4007eba34313cc5c8032356276cf9b5c1 Mon Sep 17 00:00:00 2001 From: Vjeran Grozdanic Date: Tue, 18 Nov 2025 14:46:56 +0100 Subject: [PATCH 5/5] default to plaintext if the method from option is unknown --- src/sentry/db/models/fields/encryption.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/sentry/db/models/fields/encryption.py b/src/sentry/db/models/fields/encryption.py index f035dc493f7e39..81dc99a10404d0 100644 --- a/src/sentry/db/models/fields/encryption.py +++ b/src/sentry/db/models/fields/encryption.py @@ -106,8 +106,13 @@ def get_prep_value(self, value: Any) -> str | None: # xxx(vgrozdanic): this is a temporary solution during a rollout # so that we can quickly rollback if needed. encryption_method = options.get("database.encryption.method") + # Default to plaintext if method is not recognized if encryption_method not in self._encryption_handlers: - raise ValueError(f"Invalid encryption method: {encryption_method}") + logger.error( + "Unknown encryption method '%s', defaulting to plaintext", encryption_method + ) + encryption_method = "plaintext" + handler = self._encryption_handlers[encryption_method] return handler["encrypt"](value)