Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions authenticationsdk/core/MerchantConfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,17 @@ def validate_MLE_configuration(self):
f"Invalid responseMlePrivateKeyFilePath: {self.responseMlePrivateKeyFilePath}",
self.log_config)

# Validate KID is provided
if self.responseMleKID is None or not self.responseMleKID.strip():
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
"Response MLE is enabled but responseMleKID is not provided.",
self.log_config)
# Validate KID is provided (skip if using P12/PFX as it can be auto-extracted from CyberSource certs)
is_p12_or_pfx = False
if self.responseMlePrivateKeyFilePath and self.responseMlePrivateKeyFilePath.strip():
file_extension = os.path.splitext(self.responseMlePrivateKeyFilePath)[1].lower()
is_p12_or_pfx = file_extension in ('.p12', '.pfx')

if not is_p12_or_pfx: # Only validate KID if NOT using P12/PFX
if self.responseMleKID is None or not self.responseMleKID.strip():
authenticationsdk.util.ExceptionAuth.validate_merchant_details_log(self.logger,
"Response MLE is enabled but responseMleKID is not provided. For non Cybersource generated P12/PFX files, responseMleKID must be explicitly configured.",
self.log_config)

def is_valid_boolean_string(self, value):
"""
Expand Down
12 changes: 8 additions & 4 deletions authenticationsdk/jwt/Token.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from authenticationsdk.payloaddigest.PayLoadDigest import *
from authenticationsdk.core.TokenGeneration import *
from authenticationsdk.util.Cache import *
from authenticationsdk.util.MLEUtility import MLEUtility
import CyberSource.logging.log_factory as LogFactory


class JwtSignatureToken(TokenGeneration):
Expand Down Expand Up @@ -40,8 +42,9 @@ def token_for_get_and_delete(self):
jwt_body = {GlobalLabelParameters.JWT_TIME: self.date}

if self.isResponseMLEforApi:
if self.merchant_config.responseMleKID:
jwt_body['v-c-response-mle-kid'] = self.merchant_config.responseMleKID #check access properly
logger = LogFactory.setup_logger(__name__, self.merchant_config.log_config)
response_mle_kid = MLEUtility.validate_and_auto_extract_response_mle_kid(self.merchant_config, logger)
jwt_body['v-c-response-mle-kid'] = response_mle_kid

# reading the p12 file from cache memory
cache_obj = FileCache()
Expand Down Expand Up @@ -70,8 +73,9 @@ def token_for_post_and_put_and_patch(self):
jwt_body = {GlobalLabelParameters.JWT_DIGEST: digest.decode("utf-8"), GlobalLabelParameters.JWT_ALGORITHM: "SHA-256", GlobalLabelParameters.JWT_TIME: self.date}

if self.isResponseMLEforApi:
if self.merchant_config.responseMleKID:
jwt_body['v-c-response-mle-kid'] = self.merchant_config.responseMleKID
logger = LogFactory.setup_logger(__name__, self.merchant_config.log_config)
response_mle_kid = MLEUtility.validate_and_auto_extract_response_mle_kid(self.merchant_config, logger)
jwt_body['v-c-response-mle-kid'] = response_mle_kid

# reading the p12 file from cache memory
cache_obj = FileCache()
Expand Down
99 changes: 92 additions & 7 deletions authenticationsdk/util/Cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from authenticationsdk.util.CertificateUtility import CertificateUtility
from authenticationsdk.util.GlobalLabelParameters import *
import CyberSource.logging.log_factory as LogFactory
from authenticationsdk.util.Utility import parse_p12_file

import os

class CertInfo:
Expand All @@ -39,6 +41,8 @@ def _initialize_cache(self):
self._p12_cache_lock = threading.RLock()
self.mlecache = {}
self._mle_cache_lock = threading.RLock()
self.p12_parsed_cache = {}
self._p12_parsed_cache_lock = threading.RLock()

def fetch_cached_p12_certificate(self, merchant_config, p12_file_path, key_password):
try:
Expand Down Expand Up @@ -217,18 +221,99 @@ def get_mle_response_private_key(self, merchant_config):
if not file_path:
raise ValueError("Response MLE private key file path not provided")

# Check if key exists in cache
cert_info = self.mlecache.get(cache_key)
# Check if key exists in cache (thread-safe read)
with self._mle_cache_lock:
cert_info = self.mlecache.get(cache_key)

try:
file_timestamp = os.path.getmtime(file_path)
file_timestamp = os.path.getmtime(file_path)
# If not in cache or file was modified, load it
if cert_info is None or cert_info.timestamp != file_timestamp:
self.setup_mle_cache(merchant_config, cache_key, file_path)
if cert_info is None or cert_info.timestamp != file_timestamp:
self.setup_mle_cache(merchant_config, cache_key, file_path)
# Read from cache again after setup (thread-safe)
with self._mle_cache_lock:
cert_info = self.mlecache.get(cache_key)
return cert_info.private_key if cert_info else None

return cert_info.private_key if cert_info else None
except Exception as e:
if FileCache.logger:
FileCache.logger.error(f"Error getting Response MLE private key")
raise ValueError(f"Error getting Response MLE private key: {str(e)}")

def fetch_cached_p12_from_file(self, file_path, password, logger=None, cache_key=None):
"""
Fetches a parsed P12 object from cache or parses and caches it.

Args:
file_path (str): Path to the P12/PFX file
password (str): Password for the P12 file
logger: Logger instance for logging
cache_key (str): Optional custom cache key. Defaults to filePath + identifier

Returns:
list: List of certificates from the P12 file

Raises:
FileNotFoundError: If file doesn't exist
ValueError: If password is incorrect or parsing fails
"""

# Use provided cache key or default
final_cache_key = cache_key or (file_path + GlobalLabelParameters.RESPONSE_MLE_P12_PFX_CACHE_IDENTIFIER)

if logger:
logger.debug(f"Fetching P12/PFX from cache with key: {final_cache_key}")

# Thread-safe cache access with file operations
with self._p12_parsed_cache_lock:
# Check if file exists
if not os.path.exists(file_path):
error_msg = f"File not found: {file_path}"
if logger:
logger.error(error_msg)
raise FileNotFoundError(error_msg)

try:
current_file_last_modified_time = os.path.getmtime(file_path)
except (OSError, IOError) as e:
error_msg = f"Error accessing file {file_path}"
if logger:
logger.error(error_msg)
raise FileNotFoundError(error_msg)

cached_entry = self.p12_parsed_cache.get(final_cache_key)

if cached_entry and cached_entry['timestamp'] == current_file_last_modified_time:
if logger:
logger.debug("P12/PFX found in cache and file not modified")
return cached_entry['p12_object']

# Cache miss or file modified - parse and cache
if logger:
logger.debug(f"P12/PFX not in cache or file modified. Loading from file: {file_path}")

try:
p12_object = parse_p12_file(file_path, password, logger)

# Store in cache with file modification time
self.p12_parsed_cache[final_cache_key] = {
'p12_object': p12_object,
'timestamp': current_file_last_modified_time
}

if logger:
logger.debug("Successfully cached P12/PFX object")
return p12_object

except FileNotFoundError:
# File was deleted during parsing attempt
error_msg = f"File not found during parsing: {file_path}"
if logger:
logger.error(error_msg)
raise
except Exception as e:
error_msg = f"Error parsing P12/PFX file: {str(e)}"
if logger:
logger.error(error_msg)
raise ValueError(error_msg)

1 change: 1 addition & 0 deletions authenticationsdk/util/GlobalLabelParameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,4 @@ class GlobalLabelParameters:
MLE_CACHE_IDENTIFIER_FOR_CONFIG_CERT = "mleCertFromMerchantConfig"
MLE_CACHE_IDENTIFIER_FOR_P12_CERT = "mleCertFromP12"
MLE_CACHE_KEY_IDENTIFIER_FOR_RESPONSE_PRIVATE_KEY = "response_mle_private_key"
RESPONSE_MLE_P12_PFX_CACHE_IDENTIFIER = "_responseMleP12Pfx"
177 changes: 173 additions & 4 deletions authenticationsdk/util/MLEUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,181 @@ def get_serial_number_from_certificate(certificate, merchant_config):
break

if not serial_number:
logger.warning(f"Serial number not found in MLE certificate for alias {merchant_config.requestMleKeyAlias} in {merchant_config.p12KeyFilePath}")
# Use the hex serial number from the certificate as fallback
return format(certificate.serial_number, 'x')
error_msg = f"Serial number not found in MLE certificate for alias {merchant_config.requestMleKeyAlias} in {merchant_config.p12KeyFilePath}"
logger.error(error_msg)
raise ValueError(error_msg)

return serial_number

@staticmethod
def create_json_object(jwe_token):
return json.dumps({"encryptedRequest": jwe_token})
return json.dumps({"encryptedRequest": jwe_token})

@staticmethod
def extract_response_mle_kid(file_path, password, merchant_id, logger):
"""
Extracts the serial number (KID) from a certificate's subject in a P12 file where CN matches the merchantId.

Args:
file_path (str): Path to the P12 file
password (str): Password for the P12 file
merchant_id (str): The merchant ID to match against the CN in the certificate subject
logger: Logger instance for logging

Returns:
str: The serial number extracted from the certificate's subject attributes

Raises:
ValueError: If the certificate with matching CN is not found or serial number is missing
"""
try:
logger.debug(f"Extracting MLE KID from P12 file: {file_path} for merchantId: {merchant_id}")

# Get cached P12 object
cache_obj = MLEUtility.get_cache()
certificates = cache_obj.fetch_cached_p12_from_file(file_path, password, logger)

if not certificates:
error_msg = f"No certificates found in P12 file: {file_path}"
logger.error(error_msg)
raise ValueError(error_msg)

logger.debug(f"Found {len(certificates)} certificate(s) in P12 file")

# Iterate through certificates to find one with matching CN
for i, cert in enumerate(certificates):
try:
# Extract CN from certificate subject
cn_attribute = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
if not cn_attribute:
logger.debug(f"Certificate {i + 1} has no CN in subject, skipping")
continue

cn = cn_attribute[0].value
logger.debug(f"Certificate {i + 1} CN: {cn}")

# Check if CN matches merchantId (case-insensitive)
if cn.lower() == merchant_id.lower():
logger.debug(f"Found certificate with matching CN: {cn}")

# Extract serial number from subject attributes
serial_number_attr = cert.subject.get_attributes_for_oid(NameOID.SERIAL_NUMBER)
if not serial_number_attr:
error_msg = f"Serial number not found in certificate subject for CN={cn}"
logger.error(error_msg)
raise ValueError(error_msg)

serial_number = serial_number_attr[0].value
logger.debug(f"Serial number (MLE KID) extracted: {serial_number}")

return serial_number
except Exception as e:
logger.debug(f"Error processing certificate {i + 1}: {str(e)}")
continue

# If we get here, no matching certificate was found
error_msg = f"No certificate with CN matching merchantId ({merchant_id}) found in P12 file: {file_path}"
logger.error(error_msg)
raise ValueError(error_msg)

except Exception as e:
error_msg = f"Error extracting MLE KID from P12 file: {file_path}: {str(e)}"
logger.error(error_msg)
raise ValueError(error_msg)

@staticmethod
def validate_and_auto_extract_response_mle_kid(merchant_config, logger):
"""
Validates and auto-extracts responseMleKID if necessary.

This function attempts to auto-extract the response MLE KID from CyberSource P12 files,
or uses the manually configured value if provided.

Args:
merchant_config: Merchant configuration object
logger: Logger instance for logging

Returns:
str: The validated or auto-extracted responseMleKID

Raises:
ValueError: If responseMleKID is not available and cannot be auto-extracted
"""
logger.debug("Validating responseMleKID for JWT token generation")

# Variable to store auto-extracted KID
cybs_kid = None

# First, try to auto-extract from CyberSource P12 certificate if applicable
has_valid_file_path = (
hasattr(merchant_config, 'responseMlePrivateKeyFilePath') and
isinstance(merchant_config.responseMlePrivateKeyFilePath, str) and
merchant_config.responseMlePrivateKeyFilePath.strip()
)

if has_valid_file_path:
import os
file_extension = os.path.splitext(merchant_config.responseMlePrivateKeyFilePath)[1].lower()
is_p12_file = file_extension in ('.p12', '.pfx')

if is_p12_file:
logger.debug("P12/PFX file detected, checking if it is a CyberSource certificate")

from authenticationsdk.util.Utility import is_cybersource_p12
is_cybs_p12 = is_cybersource_p12(
merchant_config.responseMlePrivateKeyFilePath,
merchant_config.responseMlePrivateKeyFilePassword,
logger
)

if is_cybs_p12:
logger.debug("Detected CyberSource P12 file, attempting to auto-extract responseMleKID")
try:
cybs_kid = MLEUtility.extract_response_mle_kid(
merchant_config.responseMlePrivateKeyFilePath,
merchant_config.responseMlePrivateKeyFilePassword,
merchant_config.merchant_id,
logger
)
logger.info("Successfully auto-extracted responseMleKID from CyberSource P12 certificate")
except Exception as e:
logger.warning(f"Failed to auto-extract responseMleKID from P12 file: {str(e)}. Will check for manually configured value.")
else:
logger.debug("P12 file is not a CyberSource-generated certificate, skipping auto-extraction")
else:
logger.debug("Private key file is not a P12/PFX file, skipping auto-extraction")
else:
logger.debug("No valid private key file path provided, skipping auto-extraction")

# Get manually configured responseMleKID
configured_kid = None
if (hasattr(merchant_config, 'responseMleKID') and
isinstance(merchant_config.responseMleKID, str) and
merchant_config.responseMleKID.strip()):
configured_kid = merchant_config.responseMleKID.strip()

# Determine which value to use
if not cybs_kid and not configured_kid:
error_msg = (
"responseMleKID is required when response MLE is enabled. "
"Could not auto-extract from certificate and no manual configuration provided. "
"Please provide responseMleKID explicitly in your configuration."
)
logger.error(error_msg)
raise ValueError(error_msg)

if cybs_kid and not configured_kid:
logger.debug("Using auto-extracted responseMleKID from CyberSource P12 certificate")
return cybs_kid

if not cybs_kid and configured_kid:
logger.debug("Using manually configured responseMleKID")
return configured_kid

# Both exist
if cybs_kid != configured_kid:
logger.warning("Auto-extracted responseMleKID does not match manually configured responseMleKID. Using configured value as preference.")
else:
logger.debug("Auto-extracted responseMleKID matches manually configured value")

return configured_kid
Loading