diff --git a/pyproject.toml b/pyproject.toml index afa30d5c..9d12bb95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "c2pa-python" -version = "0.11.1" +version = "0.12.0" requires-python = ">=3.10" description = "Python bindings for the C2PA Content Authenticity Initiative (CAI) library" readme = { file = "README.md", content-type = "text/markdown" } diff --git a/requirements-dev.txt b/requirements-dev.txt index cd916d6e..9c47e9c4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -12,4 +12,7 @@ pytest-benchmark>=5.1.0 requests>=2.0.0 # Code formatting -autopep8==2.0.4 # For automatic code formatting \ No newline at end of file +autopep8==2.0.4 # For automatic code formatting + +# Test dependencies (for callback signers) +cryptography==45.0.4 \ No newline at end of file diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 40c15316..7bb25a03 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -5,7 +5,7 @@ import os import warnings from pathlib import Path -from typing import Optional, Union, Callable, Any +from typing import Optional, Union, Callable, Any, overload import time from .lib import dynamically_load_library import mimetypes @@ -534,6 +534,15 @@ def read_ingredient_file( path: Union[str, Path], data_dir: Union[str, Path]) -> str: """Read a C2PA ingredient from a file. + .. deprecated:: 0.11.0 + This function is deprecated and will be removed in a future version. + Please use the Reader class for reading C2PA metadata instead. + Example: + ```python + with Reader(path) as reader: + manifest_json = reader.json() + ``` + Args: path: Path to the file to read data_dir: Directory to write binary resources to @@ -544,6 +553,12 @@ def read_ingredient_file( Raises: C2paError: If there was an error reading the file """ + warnings.warn( + "The read_ingredient_file function is deprecated and will be removed in a future version." + "Please use Reader(path).json() for reading C2PA metadata instead.", + DeprecationWarning, + stacklevel=2) + container = _StringContainer() container._path_str = str(path).encode('utf-8') @@ -578,11 +593,10 @@ def read_file(path: Union[str, Path], C2paError: If there was an error reading the file """ warnings.warn( - "The read_file function is deprecated and will be removed in a future version. " + "The read_file function is deprecated and will be removed in a future version." "Please use the Reader class for reading C2PA metadata instead.", DeprecationWarning, - stacklevel=2 - ) + stacklevel=2) container = _StringContainer() @@ -593,45 +607,66 @@ def read_file(path: Union[str, Path], return _parse_operation_result_for_error(result) +@overload def sign_file( source_path: Union[str, Path], dest_path: Union[str, Path], manifest: str, signer_info: C2paSignerInfo, - data_dir: Optional[Union[str, Path]] = None -) -> str: + return_manifest_as_bytes: bool = False +) -> Union[str, bytes]: + """Sign a file with a C2PA manifest using signer info. + """ + ... + + +@overload +def sign_file( + source_path: Union[str, Path], + dest_path: Union[str, Path], + manifest: str, + signer: 'Signer', + return_manifest_as_bytes: bool = False +) -> Union[str, bytes]: + """Sign a file with a C2PA manifest using a signer. + """ + ... + + +def sign_file( + source_path: Union[str, Path], + dest_path: Union[str, Path], + manifest: str, + signer_or_info: Union[C2paSignerInfo, 'Signer'], + return_manifest_as_bytes: bool = False +) -> Union[str, bytes]: """Sign a file with a C2PA manifest. For now, this function is left here to provide a backwards-compatible API. - .. deprecated:: 0.10.0 - This function is deprecated and will be removed in a future version. - Please use the Builder class for signing and the Reader class for reading signed data instead. - Args: source_path: Path to the source file dest_path: Path to write the signed file to manifest: The manifest JSON string - signer_info: Signing configuration - data_dir: Optional directory to write binary resources to + signer_or_info: Either a signer configuration or a signer object + return_manifest_as_bytes: If True, return manifest bytes instead of JSON string Returns: - The signed manifest as a JSON string + The signed manifest as a JSON string or bytes, depending on return_manifest_as_bytes Raises: C2paError: If there was an error signing the file C2paError.Encoding: If any of the string inputs contain invalid UTF-8 characters C2paError.NotSupported: If the file type cannot be determined """ - warnings.warn( - "The sign_file function is deprecated and will be removed in a future version. " - "Please use the Builder class for signing and the Reader class for reading signed data instead.", - DeprecationWarning, - stacklevel=2 - ) try: - # Create a signer from the signer info - signer = Signer.from_info(signer_info) + # Determine if we have a signer or signer info + if isinstance(signer_or_info, C2paSignerInfo): + signer = Signer.from_info(signer_or_info) + own_signer = True + else: + signer = signer_or_info + own_signer = False # Create a builder from the manifest builder = Builder(manifest) @@ -641,25 +676,32 @@ def sign_file( # Get the MIME type from the file extension mime_type = mimetypes.guess_type(str(source_path))[0] if not mime_type: - raise C2paError.NotSupported(f"Could not determine MIME type for file: {source_path}") - - # Sign the file using the builder - manifest_bytes = builder.sign( - signer=signer, - format=mime_type, - source=source_file, - dest=dest_file - ) + raise C2paError.NotSupported( + f"Could not determine MIME type for file: {source_path}") + + if return_manifest_as_bytes: + # Convert Python streams to Stream objects for internal signing + source_stream = Stream(source_file) + dest_stream = Stream(dest_file) - # If we have manifest bytes and a data directory, write them - if manifest_bytes and data_dir: - manifest_path = os.path.join(str(data_dir), 'manifest.json') - with open(manifest_path, 'wb') as f: - f.write(manifest_bytes) + # Use the builder's internal signing logic to get manifest + # bytes + manifest_bytes = builder._sign_internal( + signer, mime_type, source_stream, dest_stream) + + return manifest_bytes + else: + # Sign the file using the builder + builder.sign( + signer=signer, + format=mime_type, + source=source_file, + dest=dest_file + ) - # Read the signed manifest from the destination file - with Reader(dest_path) as reader: - return reader.json() + # Read the signed manifest from the destination file + with Reader(dest_path) as reader: + return reader.json() except Exception as e: # Clean up destination file if it exists and there was an error @@ -675,7 +717,7 @@ def sign_file( # Ensure resources are cleaned up if 'builder' in locals(): builder.close() - if 'signer' in locals(): + if 'signer' in locals() and own_signer: signer.close() @@ -692,6 +734,22 @@ class Stream: # of the stream ID ensures uniqueness even after counter reset _MAX_STREAM_ID = 2**31 - 1 + # Class-level error messages to avoid multiple creation + _ERROR_MESSAGES = { + 'stream_error': "Error cleaning up stream: {}", + 'callback_error': "Error cleaning up callback {}: {}", + 'cleanup_error': "Error during cleanup: {}", + 'read': "Stream is closed or not initialized during read operation", + 'memory_error': "Memory error during stream operation: {}", + 'read_error': "Error during read operation: {}", + 'seek': "Stream is closed or not initialized during seek operation", + 'seek_error': "Error during seek operation: {}", + 'write': "Stream is closed or not initialized during write operation", + 'write_error': "Error during write operation: {}", + 'flush': "Stream is closed or not initialized during flush operation", + 'flush_error': "Error during flush operation: {}" + } + def __init__(self, file): """Initialize a new Stream wrapper around a file-like object. @@ -701,18 +759,16 @@ def __init__(self, file): Raises: TypeError: If the file object doesn't implement all required methods """ - # Initialize _closed first to prevent AttributeError during garbage collection + # Initialize _closed first to prevent AttributeError + # during garbage collection self._closed = False self._initialized = False self._stream = None - # Generate unique stream ID with timestamp - timestamp = int(time.time() * 1000) # milliseconds since epoch - - # Safely increment stream ID with overflow protection + # Generate unique stream ID using object ID and counter if Stream._next_stream_id >= Stream._MAX_STREAM_ID: - Stream._next_stream_id = 0 # Reset to 0 if we hit the maximum - self._stream_id = f"{timestamp}-{Stream._next_stream_id}" + Stream._next_stream_id = 0 + self._stream_id = f"{id(self)}-{Stream._next_stream_id}" Stream._next_stream_id += 1 # Rest of the existing initialization code... @@ -916,7 +972,7 @@ def close(self): _lib.c2pa_release_stream(self._stream) except Exception as e: print( - self._error_messages['stream_error'].format( + Stream._ERROR_MESSAGES['stream_error'].format( str(e)), file=sys.stderr) finally: self._stream = None @@ -928,13 +984,13 @@ def close(self): setattr(self, attr, None) except Exception as e: print( - self._error_messages['callback_error'].format( + Stream._ERROR_MESSAGES['callback_error'].format( attr, str(e)), file=sys.stderr) # Note: We don't close self._file as we don't own it except Exception as e: print( - self._error_messages['cleanup_error'].format( + Stream._ERROR_MESSAGES['cleanup_error'].format( str(e)), file=sys.stderr) finally: self._closed = True @@ -962,6 +1018,19 @@ def initialized(self) -> bool: class Reader: """High-level wrapper for C2PA Reader operations.""" + # Class-level error messages to avoid multiple creation + _ERROR_MESSAGES = { + 'unsupported': "Unsupported format", + 'io_error': "IO error: {}", + 'manifest_error': "Invalid manifest data: must be bytes", + 'reader_error': "Failed to create reader: {}", + 'cleanup_error': "Error during cleanup: {}", + 'stream_error': "Error cleaning up stream: {}", + 'file_error': "Error cleaning up file: {}", + 'reader_cleanup_error': "Error cleaning up reader: {}", + 'encoding_error': "Invalid UTF-8 characters in input: {}" + } + def __init__(self, format_or_path: Union[str, Path], @@ -981,33 +1050,13 @@ def __init__(self, self._reader = None self._own_stream = None - self._error_messages = { - 'unsupported': "Unsupported format", - 'ioError': "IO error: {}", - 'manifestError': "Invalid manifest data: must be bytes", - 'readerError': "Failed to create reader: {}", - 'cleanupError': "Error during cleanup: {}", - 'streamError': "Error cleaning up stream: {}", - 'fileError': "Error cleaning up file: {}", - 'readerCleanupError': "Error cleaning up reader: {}", - 'encodingError': "Invalid UTF-8 characters in input: {}" - } # Check for unsupported format if format_or_path == "badFormat": - raise C2paError.NotSupported(self._error_messages['unsupported']) + raise C2paError.NotSupported(Reader._ERROR_MESSAGES['unsupported']) if stream is None: # Create a stream from the file path - - # Check if mimetypes is already imported to avoid duplicate imports - # This is important because mimetypes initialization can be expensive - # and we want to reuse the existing module if it's already loaded - if 'mimetypes' not in sys.modules: - import mimetypes - else: - mimetypes = sys.modules['mimetypes'] - path = str(format_or_path) mime_type = mimetypes.guess_type( path)[0] @@ -1017,7 +1066,7 @@ def __init__(self, self._mime_type_str = mime_type.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding( - self._error_messages['encoding_error'].format( + Reader._ERROR_MESSAGES['encoding_error'].format( str(e))) try: @@ -1038,7 +1087,7 @@ def __init__(self, if error: raise C2paError(error) raise C2paError( - self._error_messages['reader_error'].format("Unknown error")) + Reader._ERROR_MESSAGES['reader_error'].format("Unknown error")) # Store the file to close it later self._file = file @@ -1049,7 +1098,7 @@ def __init__(self, if hasattr(self, '_file'): self._file.close() raise C2paError.Io( - self._error_messages['io_error'].format( + Reader._ERROR_MESSAGES['io_error'].format( str(e))) elif isinstance(stream, str): # If stream is a string, treat it as a path and try to open it @@ -1063,7 +1112,8 @@ def __init__(self, self._format_str, self._own_stream._stream) else: if not isinstance(manifest_data, bytes): - raise TypeError(self._error_messages['manifest_error']) + raise TypeError( + Reader._ERROR_MESSAGES['manifest_error']) manifest_array = ( ctypes.c_ubyte * len(manifest_data))( @@ -1084,7 +1134,7 @@ def __init__(self, if error: raise C2paError(error) raise C2paError( - self._error_messages['reader_error'].format("Unknown error")) + Reader._ERROR_MESSAGES['reader_error'].format("Unknown error")) self._file = file except Exception as e: @@ -1093,7 +1143,7 @@ def __init__(self, if hasattr(self, '_file'): self._file.close() raise C2paError.Io( - self._error_messages['io_error'].format( + Reader._ERROR_MESSAGES['io_error'].format( str(e))) else: # Use the provided stream @@ -1106,7 +1156,8 @@ def __init__(self, self._format_str, stream_obj._stream) else: if not isinstance(manifest_data, bytes): - raise TypeError(self._error_messages['manifest_error']) + raise TypeError( + Reader._ERROR_MESSAGES['manifest_error']) manifest_array = ( ctypes.c_ubyte * len(manifest_data))( @@ -1121,7 +1172,7 @@ def __init__(self, if error: raise C2paError(error) raise C2paError( - self._error_messages['reader_error'].format("Unknown error")) + Reader._ERROR_MESSAGES['reader_error'].format("Unknown error")) def __enter__(self): return self @@ -1151,7 +1202,7 @@ def close(self): _lib.c2pa_reader_free(self._reader) except Exception as e: print( - self._error_messages['reader_cleanup'].format( + Reader._ERROR_MESSAGES['reader_cleanup_error'].format( str(e)), file=sys.stderr) finally: self._reader = None @@ -1162,7 +1213,7 @@ def close(self): self._own_stream.close() except Exception as e: print( - self._error_messages['stream_error'].format( + Reader._ERROR_MESSAGES['stream_error'].format( str(e)), file=sys.stderr) finally: self._own_stream = None @@ -1173,7 +1224,7 @@ def close(self): self._file.close() except Exception as e: print( - self._error_messages['file_error'].format( + Reader._ERROR_MESSAGES['file_error'].format( str(e)), file=sys.stderr) finally: self._file = None @@ -1183,7 +1234,7 @@ def close(self): self._strings.clear() except Exception as e: print( - self._error_messages['cleanup_error'].format( + Reader._ERROR_MESSAGES['cleanup_error'].format( str(e)), file=sys.stderr) finally: self._closed = True @@ -1236,6 +1287,20 @@ def resource_to_stream(self, uri: str, stream: Any) -> int: class Signer: """High-level wrapper for C2PA Signer operations.""" + # Class-level error messages to avoid multiple creation + _ERROR_MESSAGES = { + 'closed_error': "Signer is closed", + 'cleanup_error': "Error during cleanup: {}", + 'signer_cleanup': "Error cleaning up signer: {}", + 'size_error': "Error getting reserve size: {}", + 'callback_error': "Error in signer callback: {}", + 'info_error': "Error creating signer from info: {}", + 'invalid_data': "Invalid data for signing: {}", + 'invalid_certs': "Invalid certificate data: {}", + 'invalid_tsa': "Invalid TSA URL: {}", + 'encoding_error': "Invalid UTF-8 characters in input: {}" + } + def __init__(self, signer_ptr: ctypes.POINTER(C2paSigner)): """Initialize a new Signer instance. @@ -1244,17 +1309,6 @@ def __init__(self, signer_ptr: ctypes.POINTER(C2paSigner)): """ self._signer = signer_ptr self._closed = False - self._error_messages = { - 'closed_error': "Signer is closed", - 'cleanup_error': "Error during cleanup: {}", - 'signer_cleanup': "Error cleaning up signer: {}", - 'size_error': "Error getting reserve size: {}", - 'callback_error': "Error in signer callback: {}", - 'info_error': "Error creating signer from info: {}", - 'invalid_data': "Invalid data for signing: {}", - 'invalid_certs': "Invalid certificate data: {}", - 'invalid_tsa': "Invalid TSA URL: {}" - } @classmethod def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer': @@ -1269,10 +1323,6 @@ def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer': Raises: C2paError: If there was an error creating the signer """ - # Validate signer info before creating - if not signer_info.sign_cert or not signer_info.private_key: - raise C2paError("Missing certificate or private key") - signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info)) if not signer_ptr: @@ -1281,7 +1331,7 @@ def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer': # More detailed error message when possible raise C2paError(error) raise C2paError( - "Failed to create signer from configured signer info") + "Failed to create signer from configured signer_info") return cls(signer_ptr) @@ -1311,37 +1361,86 @@ def from_callback( # Validate inputs before creating if not certs: raise C2paError( - cls._error_messages['invalid_certs'].format("Missing certificate data")) + cls._ERROR_MESSAGES['invalid_certs'].format("Missing certificate data")) if tsa_url and not tsa_url.startswith(('http://', 'https://')): raise C2paError( - cls._error_messages['invalid_tsa'].format("Invalid TSA URL format")) + cls._ERROR_MESSAGES['invalid_tsa'].format("Invalid TSA URL format")) # Create a wrapper callback that handles errors and memory management - def wrapped_callback(data: bytes) -> bytes: + def wrapped_callback( + context, + data_ptr, + data_len, + signed_bytes_ptr, + signed_len): + # Returns -1 on error as it is what the native code expects. + # The reason is that otherwise we ping-pong errors between native code and Python code, + # which can become tedious in handling. So we let the native code deal with it and + # raise the errors accordingly, since it already does checks. try: + if not data_ptr or data_len <= 0 or not signed_bytes_ptr or signed_len <= 0: + # Error: invalid input, invalid so return -1, + # native code will handle it! + return -1 + + # Validate buffer sizes before memory operations + if data_len > 1024 * 1024: # 1MB limit + return -1 + + # Recover signed data (copy, to avoid lifetime issues) + temp_buffer = (ctypes.c_ubyte * data_len)() + ctypes.memmove(temp_buffer, data_ptr, data_len) + data = bytes(temp_buffer) + if not data: - raise ValueError("Empty data provided for signing") - return callback(data) + # Error: empty data, invalid so return -1, + # native code will also handle it! + return -1 + + # Call the user's callback + signature = callback(data) + if not signature: + # Error: empty signature, invalid so return -1, + # native code will handle that too! + return -1 + + # Copy the signature back to the C buffer + # (since callback is used in native code) + actual_len = min(len(signature), signed_len) + # Use memmove for efficient memory copying instead of + # byte-by-byte loop + ctypes.memmove(signed_bytes_ptr, signature, actual_len) + + # Native code expects the signed len to be returned, we oblige + return actual_len except Exception as e: print( - cls._error_messages['callback_error'].format( + cls._ERROR_MESSAGES['callback_error'].format( str(e)), file=sys.stderr) - raise C2paError.Signature(str(e)) + # Error: exception raised, invalid so return -1, + # native code will handle the error when seeing -1 + return -1 - # Encode strings with error handling + # Encode strings with error handling in case it's invalid UTF8 try: - certs_bytes = certs.encode('utf-8') - tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None + # Only encode if not already bytes, avoid unnecessary encoding + certs_bytes = certs.encode( + 'utf-8') if isinstance(certs, str) else certs + tsa_url_bytes = tsa_url.encode( + 'utf-8') if tsa_url and isinstance(tsa_url, str) else tsa_url except UnicodeError as e: raise C2paError.Encoding( - cls._error_messages['encoding_error'].format( + cls._ERROR_MESSAGES['encoding_error'].format( str(e))) + # Create the callback object using the callback function + callback_cb = SignerCallback(wrapped_callback) + # Create the signer with the wrapped callback signer_ptr = _lib.c2pa_signer_create( - None, # context - SignerCallback(wrapped_callback), + None, + callback_cb, alg, certs_bytes, tsa_url_bytes @@ -1353,12 +1452,20 @@ def wrapped_callback(data: bytes) -> bytes: raise C2paError(error) raise C2paError("Failed to create signer") - return cls(signer_ptr) + # Create and return the signer instance with the callback + signer_instance = cls(signer_ptr) + + # Keep callback alive on the object to prevent gc of the callback + # So the callback will live as long as the signer leaves, + # and there is a 1:1 relationship between signer and callback. + signer_instance._callback_cb = callback_cb + + return signer_instance def __enter__(self): """Context manager entry.""" if self._closed: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Signer._ERROR_MESSAGES['closed_error']) return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -1381,13 +1488,13 @@ def close(self): _lib.c2pa_signer_free(self._signer) except Exception as e: print( - self._error_messages['signer_cleanup'].format( + Signer._ERROR_MESSAGES['signer_cleanup'].format( str(e)), file=sys.stderr) finally: self._signer = None except Exception as e: print( - self._error_messages['cleanup_error'].format( + Signer._ERROR_MESSAGES['cleanup_error'].format( str(e)), file=sys.stderr) finally: self._closed = True @@ -1402,7 +1509,7 @@ def reserve_size(self) -> int: C2paError: If there was an error getting the size """ if self._closed or not self._signer: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Signer._ERROR_MESSAGES['closed_error']) try: result = _lib.c2pa_signer_reserve_size(self._signer) @@ -1415,7 +1522,9 @@ def reserve_size(self) -> int: return result except Exception as e: - raise C2paError(self._error_messages['size_error'].format(str(e))) + raise C2paError( + Signer._ERROR_MESSAGES['size_error'].format( + str(e))) @property def closed(self) -> bool: @@ -1430,6 +1539,22 @@ def closed(self) -> bool: class Builder: """High-level wrapper for C2PA Builder operations.""" + # Class-level error messages to avoid multiple creation + _ERROR_MESSAGES = { + 'builder_error': "Failed to create builder: {}", + 'cleanup_error': "Error during cleanup: {}", + 'builder_cleanup': "Error cleaning up builder: {}", + 'closed_error': "Builder is closed", + 'manifest_error': "Invalid manifest data: must be string or dict", + 'url_error': "Error setting remote URL: {}", + 'resource_error': "Error adding resource: {}", + 'ingredient_error': "Error adding ingredient: {}", + 'archive_error': "Error writing archive: {}", + 'sign_error': "Error during signing: {}", + 'encoding_error': "Invalid UTF-8 characters in manifest: {}", + 'json_error': "Failed to serialize manifest JSON: {}" + } + def __init__(self, manifest_json: Any): """Initialize a new Builder instance. @@ -1442,34 +1567,20 @@ def __init__(self, manifest_json: Any): C2paError.Json: If the manifest JSON cannot be serialized """ self._builder = None - self._error_messages = { - 'builder_error': "Failed to create builder: {}", - 'cleanup_error': "Error during cleanup: {}", - 'builder_cleanup': "Error cleaning up builder: {}", - 'closed_error': "Builder is closed", - 'manifest_error': "Invalid manifest data: must be string or dict", - 'url_error': "Error setting remote URL: {}", - 'resource_error': "Error adding resource: {}", - 'ingredient_error': "Error adding ingredient: {}", - 'archive_error': "Error writing archive: {}", - 'sign_error': "Error during signing: {}", - 'encoding_error': "Invalid UTF-8 characters in manifest: {}", - 'json_error': "Failed to serialize manifest JSON: {}" - } if not isinstance(manifest_json, str): try: manifest_json = json.dumps(manifest_json) except (TypeError, ValueError) as e: raise C2paError.Json( - self._error_messages['json_error'].format( + Builder._ERROR_MESSAGES['json_error'].format( str(e))) try: json_str = manifest_json.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding( - self._error_messages['encoding_error'].format( + Builder._ERROR_MESSAGES['encoding_error'].format( str(e))) self._builder = _lib.c2pa_builder_from_json(json_str) @@ -1479,7 +1590,7 @@ def __init__(self, manifest_json: Any): if error: raise C2paError(error) raise C2paError( - self._error_messages['builder_error'].format("Unknown error")) + Builder._ERROR_MESSAGES['builder_error'].format("Unknown error")) @classmethod def from_json(cls, manifest_json: Any) -> 'Builder': @@ -1547,13 +1658,13 @@ def close(self): _lib.c2pa_builder_free(self._builder) except Exception as e: print( - self._error_messages['builder_cleanup'].format( + Builder._ERROR_MESSAGES['builder_cleanup'].format( str(e)), file=sys.stderr) finally: self._builder = None except Exception as e: print( - self._error_messages['cleanup_error'].format( + Builder._ERROR_MESSAGES['cleanup_error'].format( str(e)), file=sys.stderr) finally: self._closed = True @@ -1577,7 +1688,7 @@ def set_no_embed(self): This is useful when creating cloud or sidecar manifests. """ if not self._builder: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Builder._ERROR_MESSAGES['closed_error']) _lib.c2pa_builder_set_no_embed(self._builder) def set_remote_url(self, remote_url: str): @@ -1593,7 +1704,7 @@ def set_remote_url(self, remote_url: str): C2paError: If there was an error setting the remote URL """ if not self._builder: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Builder._ERROR_MESSAGES['closed_error']) url_str = remote_url.encode('utf-8') result = _lib.c2pa_builder_set_remote_url(self._builder, url_str) @@ -1603,7 +1714,7 @@ def set_remote_url(self, remote_url: str): if error: raise C2paError(error) raise C2paError( - self._error_messages['url_error'].format("Unknown error")) + Builder._ERROR_MESSAGES['url_error'].format("Unknown error")) def add_resource(self, uri: str, stream: Any): """Add a resource to the builder. @@ -1616,7 +1727,7 @@ def add_resource(self, uri: str, stream: Any): C2paError: If there was an error adding the resource """ if not self._builder: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Builder._ERROR_MESSAGES['closed_error']) uri_str = uri.encode('utf-8') with Stream(stream) as stream_obj: @@ -1628,7 +1739,7 @@ def add_resource(self, uri: str, stream: Any): if error: raise C2paError(error) raise C2paError( - self._error_messages['resource_error'].format("Unknown error")) + Builder._ERROR_MESSAGES['resource_error'].format("Unknown error")) def add_ingredient(self, ingredient_json: str, format: str, source: Any): """Add an ingredient to the builder. @@ -1643,14 +1754,14 @@ def add_ingredient(self, ingredient_json: str, format: str, source: Any): C2paError.Encoding: If the ingredient JSON contains invalid UTF-8 characters """ if not self._builder: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Builder._ERROR_MESSAGES['closed_error']) try: ingredient_str = ingredient_json.encode('utf-8') format_str = format.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding( - self._error_messages['encoding_error'].format( + Builder._ERROR_MESSAGES['encoding_error'].format( str(e))) source_stream = Stream(source) @@ -1662,7 +1773,7 @@ def add_ingredient(self, ingredient_json: str, format: str, source: Any): if error: raise C2paError(error) raise C2paError( - self._error_messages['ingredient_error'].format("Unknown error")) + Builder._ERROR_MESSAGES['ingredient_error'].format("Unknown error")) def add_ingredient_from_stream( self, @@ -1681,14 +1792,14 @@ def add_ingredient_from_stream( C2paError.Encoding: If the ingredient JSON or format contains invalid UTF-8 characters """ if not self._builder: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Builder._ERROR_MESSAGES['closed_error']) try: ingredient_str = ingredient_json.encode('utf-8') format_str = format.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding( - self._error_messages['encoding_error'].format( + Builder._ERROR_MESSAGES['encoding_error'].format( str(e))) with Stream(source) as source_stream: @@ -1700,7 +1811,7 @@ def add_ingredient_from_stream( if error: raise C2paError(error) raise C2paError( - self._error_messages['ingredient_error'].format("Unknown error")) + Builder._ERROR_MESSAGES['ingredient_error'].format("Unknown error")) def to_archive(self, stream: Any): """Write an archive of the builder to a stream. @@ -1712,7 +1823,7 @@ def to_archive(self, stream: Any): C2paError: If there was an error writing the archive """ if not self._builder: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Builder._ERROR_MESSAGES['closed_error']) with Stream(stream) as stream_obj: result = _lib.c2pa_builder_to_archive( @@ -1723,14 +1834,14 @@ def to_archive(self, stream: Any): if error: raise C2paError(error) raise C2paError( - self._error_messages['archive_error'].format("Unknown error")) + Builder._ERROR_MESSAGES['archive_error'].format("Unknown error")) def _sign_internal( self, signer: Signer, format: str, source_stream: Stream, - dest_stream: Stream) -> int: + dest_stream: Stream) -> tuple[int, bytes]: """Internal signing logic shared between sign() and sign_file() methods, to use same native calls but expose different API surface. @@ -1741,13 +1852,13 @@ def _sign_internal( dest_stream: The destination stream Returns: - Size of C2PA data + A tuple of (size of C2PA data, manifest bytes) Raises: C2paError: If there was an error during signing """ if not self._builder: - raise C2paError(self._error_messages['closed_error']) + raise C2paError(Builder._ERROR_MESSAGES['closed_error']) try: format_str = format.encode('utf-8') @@ -1768,11 +1879,28 @@ def _sign_internal( if error: raise C2paError(error) - if manifest_bytes_ptr: - # Free the manifest bytes pointer if it was allocated - _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) + # Capture the manifest bytes if available + manifest_bytes = b"" + if manifest_bytes_ptr and result > 0: + try: + # Convert the C pointer to Python bytes + temp_buffer = (ctypes.c_ubyte * result)() + ctypes.memmove(temp_buffer, manifest_bytes_ptr, result) + manifest_bytes = bytes(temp_buffer) + except Exception: + # If there's any error accessing the memory, just return + # empty bytes + manifest_bytes = b"" + finally: + # Always free the C-allocated memory, + # even if we failed to copy manifest bytes + try: + _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) + except Exception: + # Ignore errors during cleanup + pass - return result + return manifest_bytes finally: # Ensure both streams are cleaned up source_stream.close() @@ -1800,14 +1928,14 @@ def sign( dest_stream = Stream(dest) # Use the internal stream-base signing logic - self._sign_internal(signer, format, source_stream, dest_stream) + return self._sign_internal(signer, format, source_stream, dest_stream) def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path], - signer: Signer) -> int: + signer: Signer) -> tuple[int, bytes]: """Sign a file and write the signed data to an output file. Args: @@ -1816,7 +1944,7 @@ def sign_file(self, signer: The signer to use Returns: - Size of C2PA data + A tuple of (size of C2PA data, manifest bytes) Raises: C2paError: If there was an error during signing @@ -1824,7 +1952,8 @@ def sign_file(self, # Get the MIME type from the file extension mime_type = mimetypes.guess_type(str(source_path))[0] if not mime_type: - raise C2paError.NotSupported(f"Could not determine MIME type for file: {source_path}") + raise C2paError.NotSupported( + f"Could not determine MIME type for file: {source_path}") # Open source and destination files with open(source_path, 'rb') as source_file, open(dest_path, 'wb') as dest_file: @@ -1833,7 +1962,8 @@ def sign_file(self, dest_stream = Stream(dest_file) # Use the internal stream-base signing logic - return self._sign_internal(signer, mime_type, source_stream, dest_stream) + return self._sign_internal( + signer, mime_type, source_stream, dest_stream) def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]: @@ -1882,6 +2012,14 @@ def create_signer( ) -> Signer: """Create a signer from a callback function. + .. deprecated:: 0.11.0 + This function is deprecated and will be removed in a future version. + Please use the Signer class method instead. + Example: + ```python + signer = Signer.from_callback(callback, alg, certs, tsa_url) + ``` + Args: callback: Function that signs data and returns the signature alg: The signing algorithm to use @@ -1895,34 +2033,26 @@ def create_signer( C2paError: If there was an error creating the signer C2paError.Encoding: If the certificate data or TSA URL contains invalid UTF-8 characters """ - try: - certs_bytes = certs.encode('utf-8') - tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None - except UnicodeError as e: - raise C2paError.Encoding( - f"Invalid UTF-8 characters in certificate data or TSA URL: {str(e)}") - - signer_ptr = _lib.c2pa_signer_create( - None, # context - SignerCallback(callback), - alg, - certs_bytes, - tsa_url_bytes - ) - - if not signer_ptr: - error = _parse_operation_result_for_error(_lib.c2pa_error()) - if error: - # More detailed error message when possible - raise C2paError(error) - raise C2paError("Failed to create signer") + warnings.warn( + "The create_signer function is deprecated and will be removed in a future version." + "Please use Signer.from_callback() instead.", + DeprecationWarning, + stacklevel=2) - return Signer(signer_ptr) + return Signer.from_callback(callback, alg, certs, tsa_url) def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer: """Create a signer from signer information. + .. deprecated:: 0.11.0 + This function is deprecated and will be removed in a future version. + Please use the Signer class method instead. + Example: + ```python + signer = Signer.from_info(signer_info) + ``` + Args: signer_info: The signer configuration @@ -1932,20 +2062,13 @@ def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer: Raises: C2paError: If there was an error creating the signer """ - signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info)) - - if not signer_ptr: - error = _parse_operation_result_for_error(_lib.c2pa_error()) - if error: - # More detailed error message when possible - raise C2paError(error) - raise C2paError("Failed to create signer from info") - - return Signer(signer_ptr) - + warnings.warn( + "The create_signer_from_info function is deprecated and will be removed in a future version." + "Please use Signer.from_info() instead.", + DeprecationWarning, + stacklevel=2) -# Rename the old create_signer to _create_signer since it's now internal -_create_signer = create_signer + return Signer.from_info(signer_info) def ed25519_sign(data: bytes, private_key: str) -> bytes: @@ -1995,12 +2118,11 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: 'Reader', 'Builder', 'Signer', - 'version', 'load_settings', 'read_file', 'read_ingredient_file', 'sign_file', 'format_embeddable', - 'ed25519_sign', + 'version', 'sdk_version' ] diff --git a/tests/test_unit_tests.py b/tests/test_unit_tests.py index a2a07870..ab07256e 100644 --- a/tests/test_unit_tests.py +++ b/tests/test_unit_tests.py @@ -18,9 +18,14 @@ from unittest.mock import mock_open, patch import ctypes import warnings +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, ec +from cryptography.hazmat.backends import default_backend +import tempfile +import shutil from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version -from c2pa.c2pa import Stream, read_ingredient_file, read_file, sign_file, load_settings +from c2pa.c2pa import Stream, read_ingredient_file, read_file, sign_file, load_settings, create_signer # Suppress deprecation warnings warnings.filterwarnings("ignore", category=DeprecationWarning) @@ -239,6 +244,9 @@ def test_read_cawg_data_file(self): class TestBuilder(unittest.TestCase): def setUp(self): + # Filter deprecation warnings for create_signer function + warnings.filterwarnings("ignore", message="The create_signer function is deprecated") + # Use the fixtures_dir fixture to set up paths self.data_dir = FIXTURES_DIR self.testPath = DEFAULT_TEST_FILE @@ -284,6 +292,21 @@ def setUp(self): ] } + # Define an example ES256 callback signer + self.callback_signer_alg = "Es256" + def callback_signer_es256(data: bytes) -> bytes: + private_key = serialization.load_pem_private_key( + self.key, + password=None, + backend=default_backend() + ) + signature = private_key.sign( + data, + ec.ECDSA(hashes.SHA256()) + ) + return signature + self.callback_signer_es256 = callback_signer_es256 + def test_reserve_size_on_closed_signer(self): self.signer.close() with self.assertRaises(Error): @@ -322,7 +345,7 @@ def test_remote_sign(self): builder = Builder(self.manifestDefinition) builder.set_no_embed() output = io.BytesIO(bytearray()) - result_data = builder.sign(self.signer, "image/jpeg", file, output) + builder.sign(self.signer, "image/jpeg", file, output) output.seek(0) # When set_no_embed() is used, no manifest should be embedded in the file @@ -331,6 +354,21 @@ def test_remote_sign(self): reader = Reader("image/jpeg", output) output.close() + def test_remote_sign_using_returned_bytes(self): + with open(self.testPath, "rb") as file: + builder = Builder(self.manifestDefinition) + builder.set_no_embed() + with io.BytesIO() as output_buffer: + manifest_data = builder.sign( + self.signer, "image/jpeg", file, output_buffer) + output_buffer.seek(0) + read_buffer = io.BytesIO(output_buffer.getvalue()) + + with Reader("image/jpeg", read_buffer, manifest_data) as reader: + manifest_data = reader.json() + self.assertIn("Python Test", manifest_data) + self.assertNotIn("validation_status", manifest_data) + def test_sign_all_files(self): """Test signing all files in both fixtures directories""" signing_dir = os.path.join(self.data_dir, "files-for-signing-tests") @@ -536,16 +574,16 @@ def test_builder_sign_with_duplicate_ingredient(self): # Verify the first ingredient's title matches what we set first_ingredient = active_manifest["ingredients"][0] self.assertEqual(first_ingredient["title"], "Test Ingredient") - + # Verify subsequent labels are unique and have a double underscore with a monotonically inc. index second_ingredient = active_manifest["ingredients"][1] self.assertTrue(second_ingredient["label"].endswith("__1")) third_ingredient = active_manifest["ingredients"][2] self.assertTrue(third_ingredient["label"].endswith("__2")) - + builder.close() - + def test_builder_sign_with_ingredient_from_stream(self): """Test Builder class operations with a real file using stream for ingredient.""" # Test creating builder from JSON @@ -694,7 +732,7 @@ def test_builder_set_remote_url(self): builder.sign(self.signer, "image/jpeg", file, output) output.seek(0) d = output.read() - self.assertIn(b'provenance="http://this_does_not_exist/foo.jpg"', d) + self.assertIn(b'provenance="http://this_does_not_exist/foo.jpg"', d) def test_builder_set_remote_url_no_embed(self): """Test setting the remote url of a builder with no embed flag.""" @@ -717,9 +755,6 @@ def test_builder_set_remote_url_no_embed(self): def test_sign_file(self): """Test signing a file using the sign_file method.""" - import tempfile - import shutil - # Create a temporary directory for the test temp_dir = tempfile.mkdtemp() try: @@ -728,7 +763,7 @@ def test_sign_file(self): # Use the sign_file method builder = Builder(self.manifestDefinition) - result = builder.sign_file( + manifest_bytes = builder.sign_file( source_path=self.testPath, dest_path=output_path, signer=self.signer @@ -748,6 +783,413 @@ def test_sign_file(self): # Clean up the temporary directory shutil.rmtree(temp_dir) + def test_sign_file_callback_signer(self): + """Test signing a file using the sign_file method.""" + + temp_dir = tempfile.mkdtemp() + + try: + output_path = os.path.join(temp_dir, "signed_output.jpg") + + # Use the sign_file method + builder = Builder(self.manifestDefinition) + + # Create signer with callback using create_signer function + signer = create_signer( + callback=self.callback_signer_es256, + alg=SigningAlg.ES256, + certs=self.certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" + ) + + manifest_bytes = builder.sign_file( + source_path=self.testPath, + dest_path=output_path, + signer=signer + ) + + # Verify the output file was created + self.assertTrue(os.path.exists(output_path)) + + # Verify results + self.assertIsInstance(manifest_bytes, bytes) + self.assertGreater(len(manifest_bytes), 0) + + # Read the signed file and verify the manifest + with open(output_path, "rb") as file, Reader("image/jpeg", file) as reader: + json_data = reader.json() + self.assertNotIn("validation_status", json_data) + + # Parse the JSON and verify the signature algorithm + manifest_data = json.loads(json_data) + active_manifest_id = manifest_data["active_manifest"] + active_manifest = manifest_data["manifests"][active_manifest_id] + + self.assertIn("signature_info", active_manifest) + signature_info = active_manifest["signature_info"] + self.assertEqual(signature_info["alg"], self.callback_signer_alg) + + finally: + shutil.rmtree(temp_dir) + + def test_sign_file_callback_signer_managed(self): + """Test signing a file using the sign_file method with context managers.""" + + temp_dir = tempfile.mkdtemp() + + try: + output_path = os.path.join(temp_dir, "signed_output_managed.jpg") + + # Create builder and signer with context managers + with Builder(self.manifestDefinition) as builder, create_signer( + callback=self.callback_signer_es256, + alg=SigningAlg.ES256, + certs=self.certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" + ) as signer: + + # Sign the file + manifest_bytes = builder.sign_file( + source_path=self.testPath, + dest_path=output_path, + signer=signer + ) + + # Verify results + self.assertTrue(os.path.exists(output_path)) + self.assertIsInstance(manifest_bytes, bytes) + self.assertGreater(len(manifest_bytes), 0) + + # Verify signed data can be read + with open(output_path, "rb") as file, Reader("image/jpeg", file) as reader: + json_data = reader.json() + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + + # Parse the JSON and verify the signature algorithm + manifest_data = json.loads(json_data) + active_manifest_id = manifest_data["active_manifest"] + active_manifest = manifest_data["manifests"][active_manifest_id] + + # Verify the signature_info contains the correct algorithm + self.assertIn("signature_info", active_manifest) + signature_info = active_manifest["signature_info"] + self.assertEqual(signature_info["alg"], self.callback_signer_alg) + + finally: + shutil.rmtree(temp_dir) + + def test_sign_file_callback_signer_managed_multiple_uses(self): + """Test that a signer can be used multiple times with context managers.""" + + temp_dir = tempfile.mkdtemp() + + try: + # Create builder and signer with context managers + with Builder(self.manifestDefinition) as builder, create_signer( + callback=self.callback_signer_es256, + alg=SigningAlg.ES256, + certs=self.certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" + ) as signer: + + # First signing operation + output_path_1 = os.path.join(temp_dir, "signed_output_1.jpg") + manifest_bytes_1 = builder.sign_file( + source_path=self.testPath, + dest_path=output_path_1, + signer=signer + ) + + # Verify first signing was successful + self.assertTrue(os.path.exists(output_path_1)) + self.assertIsInstance(manifest_bytes_1, bytes) + self.assertGreater(len(manifest_bytes_1), 0) + + # Second signing operation with the same signer + # This is to verify we don't free the signer or the callback too early + output_path_2 = os.path.join(temp_dir, "signed_output_2.jpg") + manifest_bytes_2 = builder.sign_file( + source_path=self.testPath, + dest_path=output_path_2, + signer=signer + ) + + # Verify second signing was successful + self.assertTrue(os.path.exists(output_path_2)) + self.assertIsInstance(manifest_bytes_2, bytes) + self.assertGreater(len(manifest_bytes_2), 0) + + # Verify both files contain valid C2PA data + for output_path in [output_path_1, output_path_2]: + with open(output_path, "rb") as file, Reader("image/jpeg", file) as reader: + json_data = reader.json() + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + + # Parse the JSON and verify the signature algorithm + manifest_data = json.loads(json_data) + active_manifest_id = manifest_data["active_manifest"] + active_manifest = manifest_data["manifests"][active_manifest_id] + + # Verify the signature_info contains the correct algorithm + self.assertIn("signature_info", active_manifest) + signature_info = active_manifest["signature_info"] + self.assertEqual(signature_info["alg"], self.callback_signer_alg) + + finally: + shutil.rmtree(temp_dir) + + def test_builder_sign_file_callback_signer_from_callback(self): + """Test signing a file using the sign_file method with Signer.from_callback.""" + + temp_dir = tempfile.mkdtemp() + try: + + output_path = os.path.join(temp_dir, "signed_output_from_callback.jpg") + + # Will use the sign_file method + builder = Builder(self.manifestDefinition) + + # Create signer with callback using Signer.from_callback + signer = Signer.from_callback( + callback=self.callback_signer_es256, + alg=SigningAlg.ES256, + certs=self.certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" + ) + + manifest_bytes = builder.sign_file( + source_path=self.testPath, + dest_path=output_path, + signer=signer + ) + + # Verify the output file was created + self.assertTrue(os.path.exists(output_path)) + + # Verify results + self.assertIsInstance(manifest_bytes, bytes) + self.assertGreater(len(manifest_bytes), 0) + + # Read the signed file and verify the manifest + with open(output_path, "rb") as file, Reader("image/jpeg", file) as reader: + json_data = reader.json() + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + + # Parse the JSON and verify the signature algorithm + manifest_data = json.loads(json_data) + active_manifest_id = manifest_data["active_manifest"] + active_manifest = manifest_data["manifests"][active_manifest_id] + + # Verify the signature_info contains the correct algorithm + self.assertIn("signature_info", active_manifest) + signature_info = active_manifest["signature_info"] + self.assertEqual(signature_info["alg"], self.callback_signer_alg) + + finally: + shutil.rmtree(temp_dir) + + def test_sign_file_using_callback_signer_overloads(self): + """Test signing a file using the sign_file function with a Signer object.""" + # Create a temporary directory for the test + temp_dir = tempfile.mkdtemp() + + try: + # Create a temporary output file path + output_path = os.path.join(temp_dir, "signed_output_callback.jpg") + + # Create signer with callback + signer = Signer.from_callback( + callback=self.callback_signer_es256, + alg=SigningAlg.ES256, + certs=self.certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" + ) + + # Overload that returns a JSON string + result_json = sign_file( + self.testPath, + output_path, + self.manifestDefinition, + signer, + False + ) + + # Verify the output file was created + self.assertTrue(os.path.exists(output_path)) + + # Verify the result is JSON + self.assertIsInstance(result_json, str) + self.assertGreater(len(result_json), 0) + + manifest_data = json.loads(result_json) + self.assertIn("manifests", manifest_data) + self.assertIn("active_manifest", manifest_data) + + output_path_bytes = os.path.join(temp_dir, "signed_output_callback_bytes.jpg") + # Overload that returns bytes + result_bytes = sign_file( + self.testPath, + output_path_bytes, + self.manifestDefinition, + signer, + True + ) + + # Verify the output file was created + self.assertTrue(os.path.exists(output_path_bytes)) + + # Verify the result is bytes + self.assertIsInstance(result_bytes, bytes) + self.assertGreater(len(result_bytes), 0) + + # Read the signed file and verify the manifest contains expected content + with open(output_path, "rb") as file: + reader = Reader("image/jpeg", file) + file_manifest_json = reader.json() + self.assertIn("Python Test", file_manifest_json) + self.assertNotIn("validation_status", file_manifest_json) + + finally: + shutil.rmtree(temp_dir) + + def test_sign_file_overloads(self): + """Test that the overloaded sign_file function works with both parameter types.""" + # Create a temporary directory for the test + temp_dir = tempfile.mkdtemp() + try: + # Test with C2paSignerInfo + output_path_1 = os.path.join(temp_dir, "signed_output_1.jpg") + + # Load test certificates and key + with open(os.path.join(self.data_dir, "es256_certs.pem"), "rb") as cert_file: + certs = cert_file.read() + with open(os.path.join(self.data_dir, "es256_private.key"), "rb") as key_file: + key = key_file.read() + + # Create signer info + signer_info = C2paSignerInfo( + alg=b"es256", + sign_cert=certs, + private_key=key, + ta_url=b"http://timestamp.digicert.com" + ) + + # Test with C2paSignerInfo parameter - JSON return + result_1 = sign_file( + self.testPath, + output_path_1, + self.manifestDefinition, + signer_info, + False + ) + + self.assertIsInstance(result_1, str) + self.assertTrue(os.path.exists(output_path_1)) + + # Test with C2paSignerInfo parameter - bytes return + output_path_1_bytes = os.path.join(temp_dir, "signed_output_1_bytes.jpg") + result_1_bytes = sign_file( + self.testPath, + output_path_1_bytes, + self.manifestDefinition, + signer_info, + True + ) + + self.assertIsInstance(result_1_bytes, bytes) + self.assertTrue(os.path.exists(output_path_1_bytes)) + + # Test with Signer object + output_path_2 = os.path.join(temp_dir, "signed_output_2.jpg") + + # Create a signer from the signer info + signer = Signer.from_info(signer_info) + + # Test with Signer parameter - JSON return + result_2 = sign_file( + self.testPath, + output_path_2, + self.manifestDefinition, + signer, + False + ) + + self.assertIsInstance(result_2, str) + self.assertTrue(os.path.exists(output_path_2)) + + # Test with Signer parameter - bytes return + output_path_2_bytes = os.path.join(temp_dir, "signed_output_2_bytes.jpg") + result_2_bytes = sign_file( + self.testPath, + output_path_2_bytes, + self.manifestDefinition, + signer, + True + ) + + self.assertIsInstance(result_2_bytes, bytes) + self.assertTrue(os.path.exists(output_path_2_bytes)) + + # Both JSON results should be similar (same manifest structure) + manifest_1 = json.loads(result_1) + manifest_2 = json.loads(result_2) + + self.assertIn("manifests", manifest_1) + self.assertIn("manifests", manifest_2) + self.assertIn("active_manifest", manifest_1) + self.assertIn("active_manifest", manifest_2) + + # Both bytes results should be non-empty + self.assertGreater(len(result_1_bytes), 0) + self.assertGreater(len(result_2_bytes), 0) + + finally: + # Clean up the temporary directory + shutil.rmtree(temp_dir) + + def test_sign_file_callback_signer_reports_error(self): + """Test signing a file using the sign_file method with a callback that reports an error.""" + + temp_dir = tempfile.mkdtemp() + + try: + output_path = os.path.join(temp_dir, "signed_output.jpg") + + # Use the sign_file method + builder = Builder(self.manifestDefinition) + + # Define a callback that always returns None to simulate an error + def error_callback_signer(data: bytes) -> bytes: + # Could alternatively also raise an error + # raise RuntimeError("Simulated signing error") + return None + + # Create signer with error callback using create_signer function + signer = create_signer( + callback=error_callback_signer, + alg=SigningAlg.ES256, + certs=self.certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" + ) + + # The signing operation should fail due to the error callback + with self.assertRaises(Error): + builder.sign_file( + source_path=self.testPath, + dest_path=output_path, + signer=signer + ) + + # Verify the output file stays empty, + # as no data should have been written + self.assertTrue(os.path.exists(output_path)) + self.assertEqual(os.path.getsize(output_path), 0) + + finally: + shutil.rmtree(temp_dir) class TestStream(unittest.TestCase): def setUp(self): @@ -892,6 +1334,7 @@ def setUp(self): # Filter specific deprecation warnings for legacy API tests warnings.filterwarnings("ignore", message="The read_file function is deprecated") warnings.filterwarnings("ignore", message="The sign_file function is deprecated") + warnings.filterwarnings("ignore", message="The read_ingredient_file function is deprecated") self.data_dir = FIXTURES_DIR self.testPath = DEFAULT_TEST_FILE @@ -903,11 +1346,10 @@ def setUp(self): def tearDown(self): """Clean up temporary files after each test.""" if os.path.exists(self.temp_data_dir): - import shutil shutil.rmtree(self.temp_data_dir) def test_invalid_settings_str(self): - """Test loading a malformed settings string.""" + """Test loading a malformed settings string.""" with self.assertRaises(Error): load_settings(r'{"verify": { "remote_manifest_fetch": false }') @@ -996,8 +1438,7 @@ def test_sign_file(self): self.testPath, output_path, manifest_json, - signer_info, - temp_data_dir + signer_info ) finally: