From a00577bc680580ca17c120148ddcdf566bf05cbd Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Mon, 29 Sep 2025 21:42:05 +0000 Subject: [PATCH 1/4] change file4 --- client_encryption/api_encryption.py | 383 ++++++++++++++-------------- sonar-project.properties | 4 +- 2 files changed, 194 insertions(+), 193 deletions(-) diff --git a/client_encryption/api_encryption.py b/client_encryption/api_encryption.py index 5506887..9c0a6c2 100644 --- a/client_encryption/api_encryption.py +++ b/client_encryption/api_encryption.py @@ -1,192 +1,191 @@ - import inspect - import json - from enum import Enum - from functools import wraps - from warnings import warn - - from client_encryption.field_level_encryption import encrypt_payload as encrypt_field_level, \ - decrypt_payload as decrypt_field_level - from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig - from client_encryption.jwe_encryption import encrypt_payload as encrypt_jwe, decrypt_payload as decrypt_jwe - from client_encryption.jwe_encryption_config import JweEncryptionConfig - from client_encryption.session_key_params import SessionKeyParams - - - class ApiEncryption(object): - - def __init__(self, encryption_conf_file, encryption_type='Mastercard'): - """Load and initialize FieldLevelEncryptionConfig object.""" - - if type(encryption_conf_file) is dict: - if encryption_type == EncryptionType.MASTERCARD.value: - self._encryption_conf = FieldLevelEncryptionConfig(encryption_conf_file) - else: - self._encryption_conf = JweEncryptionConfig(encryption_conf_file) - else: - if encryption_type == EncryptionType.MASTERCARD.value: - with open(encryption_conf_file, encoding='utf-8') as json_file: - self._encryption_conf = FieldLevelEncryptionConfig(json_file.read()) - else: - with open(encryption_conf_file, encoding='utf-8') as json_file: - self._encryption_conf = JweEncryptionConfig(json_file.read()) - - def field_encryption_call_api(self, func): - """Decorator for API call_api. func is APIClient.call_api""" - - @wraps(func) - def call_api_function(*args, **kwargs): - original_parameters = inspect.signature(func.__self__.call_api).parameters - check_type_is_none = original_parameters.get("_check_type") is None - preload_content_is_not_none = original_parameters.get("_preload_content") is not None - if check_type_is_none and preload_content_is_not_none: - kwargs["_preload_content"] = False # version 4.3.1 - return func(*args, **kwargs) - - call_api_function.__fle__ = True - - return call_api_function - - def field_encryption(self, func): - """Decorator for API call_api. func is APIClient.call_api""" - - @wraps(func) - def call_api_function(*args, **kwargs): - """Wrap call_api and add field encryption layer to it.""" - in_body = kwargs.get("body", None) - - in_headers = kwargs.get("headers", None) - - kwargs["body"] = self._encrypt_payload(in_headers, in_body) if in_body else in_body - - response = func(*args, **kwargs) - - response.data = self._decrypt_payload(response.getheaders(), response.response.data) - - return response - - call_api_function.__fle__ = True - return call_api_function - - def _encrypt_payload(self, headers, body): - """Encryption enforcement based on configuration - encrypt and add session key params to header or body""" - - conf = self._encryption_conf - - encrypted_payload = self.encrypt_field_level_payload(headers, conf, body) if type( - conf) is FieldLevelEncryptionConfig else self.encrypt_jwe_payload(conf, body) - - # convert the encrypted_payload to the same data type as the input body - if isinstance(body, str): - return json.dumps(encrypted_payload) - - if isinstance(body, bytes): - return json.dumps(encrypted_payload).encode("utf-8") - - return encrypted_payload - - def _decrypt_payload(self, headers, body): - """Encryption enforcement based on configuration - decrypt using session key params from header or body""" - - conf = self._encryption_conf - - if type(conf) is FieldLevelEncryptionConfig: - return self.decrypt_field_level_payload(headers, conf, body) - else: - return self.decrypt_jwe_payload(conf, body) - - @staticmethod - def encrypt_jwe_payload(conf, body): - return encrypt_jwe(body, conf) - - @staticmethod - def decrypt_jwe_payload(conf, body): - decrypted_body = decrypt_jwe(body, conf) - try: - payload = json.dumps(decrypted_body).encode('utf-8') - except: - payload = decrypted_body - - return payload - - @staticmethod - def decrypt_field_level_payload(headers, conf, body): - """Encryption enforcement based on configuration - decrypt using session key params from header or body""" - params = None - - if conf.use_http_headers: - if conf.iv_field_name in headers and conf.encrypted_key_field_name in headers: - iv = headers.pop(conf.iv_field_name) - encrypted_key = headers.pop(conf.encrypted_key_field_name) - oaep_digest_algo = headers.pop(conf.oaep_padding_digest_algorithm_field_name) \ - if _contains_param(conf.oaep_padding_digest_algorithm_field_name, headers) else None - if _contains_param(conf.encryption_certificate_fingerprint_field_name, headers): - del headers[conf.encryption_certificate_fingerprint_field_name] - if _contains_param(conf.encryption_key_fingerprint_field_name, headers): - del headers[conf.encryption_key_fingerprint_field_name] - - params = SessionKeyParams(conf, encrypted_key, iv, oaep_digest_algo) - else: - # skip decryption and return original body if not iv nor key is in headers - return body - - decrypted_body = decrypt_field_level(body, conf, params) - try: - payload = json.dumps(decrypted_body).encode('utf-8') - except: - payload = decrypted_body - - return payload - - @staticmethod - def encrypt_field_level_payload(headers, conf, body): - if conf.use_http_headers: - params = SessionKeyParams.generate(conf) - - encryption_params = { - conf.iv_field_name: params.iv_value, - conf.encrypted_key_field_name: params.encrypted_key_value - } - if conf.encryption_certificate_fingerprint_field_name: - encryption_params[conf.encryption_certificate_fingerprint_field_name] = \ - conf.encryption_certificate_fingerprint - if conf.encryption_key_fingerprint_field_name: - encryption_params[conf.encryption_key_fingerprint_field_name] = conf.encryption_key_fingerprint - if conf.oaep_padding_digest_algorithm_field_name: - encryption_params[conf.oaep_padding_digest_algorithm_field_name] = conf.oaep_padding_digest_algorithm - - encrypted_payload = encrypt_field_level(body, conf, params) - headers.update(encryption_params) - else: - encrypted_payload = encrypt_field_level(body, conf) - - return encrypted_payload - - - def _contains_param(param_name, headers): return param_name and param_name in headers - - - def add_encryption_layer(api_client, encryption_conf_file, encryption_type='Mastercard'): - """Decorate APIClient.call_api with encryption""" - __check_oauth(api_client) # warn the user if authentication layer is missing/not set - api_encryption = ApiEncryption(encryption_conf_file, encryption_type) - api_client.rest_client.request = api_encryption.field_encryption(api_client.rest_client.request) - api_client.call_api = api_encryption.field_encryption_call_api(api_client.call_api) - - - def __check_oauth(api_client): - try: - api_client.rest_client.request.__wrapped__ - except AttributeError: - __oauth_warn() - - - def __oauth_warn(): - warn("No signing layer detected. Request will be only encrypted without being signed. " - "Please refer to " - "https://github.com/Mastercard/client-encryption-python#integrating-with-mastercard-oauth1-signer-module") - - - class EncryptionType(Enum): - MASTERCARD = 'Mastercard' - JWE = 'JWE' - \ No newline at end of file +import inspect +import json +from enum import Enum +from functools import wraps +from warnings import warn + +from client_encryption.field_level_encryption import encrypt_payload as encrypt_field_level, \ + decrypt_payload as decrypt_field_level +from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig +from client_encryption.jwe_encryption import encrypt_payload as encrypt_jwe, decrypt_payload as decrypt_jwe +from client_encryption.jwe_encryption_config import JweEncryptionConfig +from client_encryption.session_key_params import SessionKeyParams + + +class ApiEncryption(object): + + def __init__(self, encryption_conf_file, encryption_type='Mastercard'): + """Load and initialize FieldLevelEncryptionConfig object.""" + + if type(encryption_conf_file) is dict: + if encryption_type == EncryptionType.MASTERCARD.value: + self._encryption_conf = FieldLevelEncryptionConfig(encryption_conf_file) + else: + self._encryption_conf = JweEncryptionConfig(encryption_conf_file) + else: + if encryption_type == EncryptionType.MASTERCARD.value: + with open(encryption_conf_file, encoding='utf-8') as json_file: + self._encryption_conf = FieldLevelEncryptionConfig(json_file.read()) + else: + with open(encryption_conf_file, encoding='utf-8') as json_file: + self._encryption_conf = JweEncryptionConfig(json_file.read()) + + def field_encryption_call_api(self, func): + """Decorator for API call_api. func is APIClient.call_api""" + + @wraps(func) + def call_api_function(*args, **kwargs): + original_parameters = inspect.signature(func.__self__.call_api).parameters + check_type_is_none = original_parameters.get("_check_type") is None + preload_content_is_not_none = original_parameters.get("_preload_content") is not None + if check_type_is_none and preload_content_is_not_none: + kwargs["_preload_content"] = False # version 4.3.1 + return func(*args, **kwargs) + + call_api_function.__fle__ = True + + return call_api_function + + def field_encryption(self, func): + """Decorator for API call_api. func is APIClient.call_api""" + + @wraps(func) + def call_api_function(*args, **kwargs): + """Wrap call_api and add field encryption layer to it.""" + in_body = kwargs.get("body", None) + + in_headers = kwargs.get("headers", None) + + kwargs["body"] = self._encrypt_payload(in_headers, in_body) if in_body else in_body + + response = func(*args, **kwargs) + + response.data = self._decrypt_payload(response.getheaders(), response.response.data) + + return response + + call_api_function.__fle__ = True + return call_api_function + + def _encrypt_payload(self, headers, body): + """Encryption enforcement based on configuration - encrypt and add session key params to header or body""" + + conf = self._encryption_conf + + encrypted_payload = self.encrypt_field_level_payload(headers, conf, body) if type( + conf) is FieldLevelEncryptionConfig else self.encrypt_jwe_payload(conf, body) + + # convert the encrypted_payload to the same data type as the input body + if isinstance(body, str): + return json.dumps(encrypted_payload) + + if isinstance(body, bytes): + return json.dumps(encrypted_payload).encode("utf-8") + + return encrypted_payload + + def _decrypt_payload(self, headers, body): + """Encryption enforcement based on configuration - decrypt using session key params from header or body""" + + conf = self._encryption_conf + + if type(conf) is FieldLevelEncryptionConfig: + return self.decrypt_field_level_payload(headers, conf, body) + else: + return self.decrypt_jwe_payload(conf, body) + + @staticmethod + def encrypt_jwe_payload(conf, body): + return encrypt_jwe(body, conf) + + @staticmethod + def decrypt_jwe_payload(conf, body): + decrypted_body = decrypt_jwe(body, conf) + try: + payload = json.dumps(decrypted_body).encode('utf-8') + except: + payload = decrypted_body + + return payload + + @staticmethod + def decrypt_field_level_payload(headers, conf, body): + """Encryption enforcement based on configuration - decrypt using session key params from header or body""" + params = None + + if conf.use_http_headers: + if conf.iv_field_name in headers and conf.encrypted_key_field_name in headers: + iv = headers.pop(conf.iv_field_name) + encrypted_key = headers.pop(conf.encrypted_key_field_name) + oaep_digest_algo = headers.pop(conf.oaep_padding_digest_algorithm_field_name) \ + if _contains_param(conf.oaep_padding_digest_algorithm_field_name, headers) else None + if _contains_param(conf.encryption_certificate_fingerprint_field_name, headers): + del headers[conf.encryption_certificate_fingerprint_field_name] + if _contains_param(conf.encryption_key_fingerprint_field_name, headers): + del headers[conf.encryption_key_fingerprint_field_name] + + params = SessionKeyParams(conf, encrypted_key, iv, oaep_digest_algo) + else: + # skip decryption and return original body if not iv nor key is in headers + return body + + decrypted_body = decrypt_field_level(body, conf, params) + try: + payload = json.dumps(decrypted_body).encode('utf-8') + except: + payload = decrypted_body + + return payload + + @staticmethod + def encrypt_field_level_payload(headers, conf, body): + if conf.use_http_headers: + params = SessionKeyParams.generate(conf) + + encryption_params = { + conf.iv_field_name: params.iv_value, + conf.encrypted_key_field_name: params.encrypted_key_value + } + if conf.encryption_certificate_fingerprint_field_name: + encryption_params[conf.encryption_certificate_fingerprint_field_name] = \ + conf.encryption_certificate_fingerprint + if conf.encryption_key_fingerprint_field_name: + encryption_params[conf.encryption_key_fingerprint_field_name] = conf.encryption_key_fingerprint + if conf.oaep_padding_digest_algorithm_field_name: + encryption_params[conf.oaep_padding_digest_algorithm_field_name] = conf.oaep_padding_digest_algorithm + + encrypted_payload = encrypt_field_level(body, conf, params) + headers.update(encryption_params) + else: + encrypted_payload = encrypt_field_level(body, conf) + + return encrypted_payload + + +def _contains_param(param_name, headers): return param_name and param_name in headers + + +def add_encryption_layer(api_client, encryption_conf_file, encryption_type='Mastercard'): + """Decorate APIClient.call_api with encryption""" + __check_oauth(api_client) # warn the user if authentication layer is missing/not set + api_encryption = ApiEncryption(encryption_conf_file, encryption_type) + api_client.rest_client.request = api_encryption.field_encryption(api_client.rest_client.request) + api_client.call_api = api_encryption.field_encryption_call_api(api_client.call_api) + + +def __check_oauth(api_client): + try: + api_client.rest_client.request.__wrapped__ + except AttributeError: + __oauth_warn() + + +def __oauth_warn(): + warn("No signing layer detected. Request will be only encrypted without being signed. " + "Please refer to " + "https://github.com/Mastercard/client-encryption-python#integrating-with-mastercard-oauth1-signer-module") + + +class EncryptionType(Enum): + MASTERCARD = 'Mastercard' + JWE = 'JWE' diff --git a/sonar-project.properties b/sonar-project.properties index 13e8d52..f561c2f 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -5,4 +5,6 @@ sonar.projectName=Python Project with Tests sonar.sources=./client_encryption sonar.tests=./tests sonar.python.coverage.reportPaths=coverage.xml -sonar.python.version=3.8 \ No newline at end of file +sonar.python.version=3.8 +sonar.qualitygate.wait=true +sonar.qualitygate.timeout=300 \ No newline at end of file From 9db4181182c6e077866c1974884167408cccc0f0 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Mon, 29 Sep 2025 21:47:24 +0000 Subject: [PATCH 2/4] rename test file --- tests/{block_test_api_encryption.py => test_api_encryption.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{block_test_api_encryption.py => test_api_encryption.py} (100%) diff --git a/tests/block_test_api_encryption.py b/tests/test_api_encryption.py similarity index 100% rename from tests/block_test_api_encryption.py rename to tests/test_api_encryption.py From 92d7c8e3a5173215a12209b7419ffd64cd5d06aa Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Mon, 29 Sep 2025 21:52:31 +0000 Subject: [PATCH 3/4] new code defn --- sonar-project.properties | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sonar-project.properties b/sonar-project.properties index f561c2f..faa49ea 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -7,4 +7,5 @@ sonar.tests=./tests sonar.python.coverage.reportPaths=coverage.xml sonar.python.version=3.8 sonar.qualitygate.wait=true -sonar.qualitygate.timeout=300 \ No newline at end of file +sonar.qualitygate.timeout=300 +sonar.newCode.referenceBranch=main \ No newline at end of file From ee91d8481dc12fdffb64d285bedf4eedeec125be Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Mon, 29 Sep 2025 22:07:03 +0000 Subject: [PATCH 4/4] move test file --- tests/{test_api_encryption.py => block_test_api_encryption.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_api_encryption.py => block_test_api_encryption.py} (100%) diff --git a/tests/test_api_encryption.py b/tests/block_test_api_encryption.py similarity index 100% rename from tests/test_api_encryption.py rename to tests/block_test_api_encryption.py